Coverage for ivatar/tools/views.py: 66%

143 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 23:07 +0000

1""" 

2View classes for ivatar/tools/ 

3""" 

4 

5import hashlib 

6import random 

7 

8from django.views.generic.edit import FormView 

9from django.urls import reverse_lazy as reverse 

10from django.shortcuts import render 

11 

12import dns.resolver 

13import dns.exception 

14 

15from libravatar import libravatar_url, parse_user_identity 

16from libravatar import SECURE_BASE_URL as LIBRAVATAR_SECURE_BASE_URL 

17from libravatar import BASE_URL as LIBRAVATAR_BASE_URL 

18 

19from ivatar.settings import SECURE_BASE_URL, BASE_URL, SITE_NAME, DEBUG 

20from .forms import ( 

21 CheckDomainForm, 

22 CheckForm, 

23) # pylint: disable=relative-beyond-top-level 

24 

25 

26class CheckDomainView(FormView): 

27 """ 

28 View class for checking a domain 

29 """ 

30 

31 template_name = "check_domain.html" 

32 form_class = CheckDomainForm 

33 success_url = reverse("tools_check_domain") 

34 

35 def form_valid(self, form): 

36 super().form_valid(form) 

37 domain = form.cleaned_data["domain"] 

38 result = {"avatar_server_http": lookup_avatar_server(domain, False)} 

39 if result["avatar_server_http"]: 

40 result["avatar_server_http_ipv4"] = lookup_ip_address( 

41 result["avatar_server_http"], False 

42 ) 

43 result["avatar_server_http_ipv6"] = lookup_ip_address( 

44 result["avatar_server_http"], True 

45 ) 

46 result["avatar_server_https"] = lookup_avatar_server(domain, True) 

47 if result["avatar_server_https"]: 

48 result["avatar_server_https_ipv4"] = lookup_ip_address( 

49 result["avatar_server_https"], False 

50 ) 

51 result["avatar_server_https_ipv6"] = lookup_ip_address( 

52 result["avatar_server_https"], True 

53 ) 

54 return render( 

55 self.request, 

56 self.template_name, 

57 { 

58 "form": form, 

59 "result": result, 

60 }, 

61 ) 

62 

63 

64class CheckView(FormView): 

65 """ 

66 View class for checking an e-mail or openid address 

67 """ 

68 

69 template_name = "check.html" 

70 form_class = CheckForm 

71 success_url = reverse("tools_check") 

72 

73 def form_valid(self, form): 

74 mailurl = None 

75 openidurl = None 

76 mailurl_secure = None 

77 mailurl_secure_256 = None 

78 openidurl_secure = None 

79 mail_hash = None 

80 mail_hash256 = None 

81 openid_hash = None 

82 super().form_valid(form) 

83 

84 if form.cleaned_data["default_url"]: 

85 default_url = form.cleaned_data["default_url"] 

86 elif ( 

87 form.cleaned_data["default_opt"] 

88 and form.cleaned_data["default_opt"] != "none" 

89 ): 

90 default_url = form.cleaned_data["default_opt"] 

91 else: 

92 default_url = None 

93 

94 size = form.cleaned_data["size"] if "size" in form.cleaned_data else 80 

95 if form.cleaned_data["mail"]: 

96 mailurl = libravatar_url( 

97 email=form.cleaned_data["mail"], size=size, default=default_url 

98 ) 

99 mailurl = mailurl.replace(LIBRAVATAR_BASE_URL, BASE_URL) 

100 mailurl_secure = libravatar_url( 

101 email=form.cleaned_data["mail"], 

102 size=size, 

103 https=True, 

104 default=default_url, 

105 ) 

106 mailurl_secure = mailurl_secure.replace( 

107 LIBRAVATAR_SECURE_BASE_URL, SECURE_BASE_URL 

108 ) 

109 mail_hash = parse_user_identity( 

110 email=form.cleaned_data["mail"], openid=None 

111 )[0] 

112 hash_obj = hashlib.new("sha256") 

113 hash_obj.update(form.cleaned_data["mail"].encode("utf-8")) 

114 mail_hash256 = hash_obj.hexdigest() 

115 mailurl_secure_256 = mailurl_secure.replace(mail_hash, mail_hash256) 

116 if form.cleaned_data["openid"]: 

117 if not form.cleaned_data["openid"].startswith( 

118 "http://" 

119 ) and not form.cleaned_data["openid"].startswith("https://"): 

120 form.cleaned_data["openid"] = f'http://{form.cleaned_data["openid"]}' 

121 openidurl = libravatar_url( 

122 openid=form.cleaned_data["openid"], size=size, default=default_url 

123 ) 

124 openidurl = openidurl.replace(LIBRAVATAR_BASE_URL, BASE_URL) 

125 openidurl_secure = libravatar_url( 

126 openid=form.cleaned_data["openid"], 

127 size=size, 

128 https=True, 

129 default=default_url, 

130 ) 

131 openidurl_secure = openidurl_secure.replace( 

132 LIBRAVATAR_SECURE_BASE_URL, SECURE_BASE_URL 

133 ) 

134 openid_hash = parse_user_identity( 

135 openid=form.cleaned_data["openid"], email=None 

136 )[0] 

137 

138 if "DEVELOPMENT" in SITE_NAME and DEBUG: 

139 if mailurl: 

140 mailurl = mailurl.replace( 

141 "https://avatars.linux-kernel.at", 

142 f"http://{self.request.get_host()}", 

143 ) 

144 if mailurl_secure: 

145 mailurl_secure = mailurl_secure.replace( 

146 "https://avatars.linux-kernel.at", 

147 f"http://{self.request.get_host()}", 

148 ) 

149 if mailurl_secure_256: 

150 mailurl_secure_256 = mailurl_secure_256.replace( 

151 "https://avatars.linux-kernel.at", 

152 f"http://{self.request.get_host()}", 

153 ) 

154 

155 if openidurl: 

156 openidurl = openidurl.replace( 

157 "https://avatars.linux-kernel.at", 

158 f"http://{self.request.get_host()}", 

159 ) 

160 if openidurl_secure: 

161 openidurl_secure = openidurl_secure.replace( 

162 "https://avatars.linux-kernel.at", 

163 f"http://{self.request.get_host()}", 

164 ) 

165 print(mailurl, openidurl, mailurl_secure, mailurl_secure_256, openidurl_secure) 

166 

167 return render( 

168 self.request, 

169 self.template_name, 

170 { 

171 "form": form, 

172 "mailurl": mailurl, 

173 "openidurl": openidurl, 

174 "mailurl_secure": mailurl_secure, 

175 "mailurl_secure_256": mailurl_secure_256, 

176 "openidurl_secure": openidurl_secure, 

177 "mail_hash": mail_hash, 

178 "mail_hash256": mail_hash256, 

179 "openid_hash": openid_hash, 

180 "size": size, 

181 }, 

182 ) 

183 

184 

185def lookup_avatar_server(domain, https): 

186 """ 

187 Extract the avatar server from an SRV record in the DNS zone 

188 

189 The SRV records should look like this: 

190 

191 _avatars._tcp.example.com. IN SRV 0 0 80 avatars.example.com 

192 _avatars-sec._tcp.example.com. IN SRV 0 0 443 avatars.example.com 

193 """ 

194 

195 if domain and len(domain) > 60: 

196 domain = domain[:60] 

197 

198 service_name = None 

199 if https: 

200 service_name = f"_avatars-sec._tcp.{domain}" 

201 else: 

202 service_name = f"_avatars._tcp.{domain}" 

203 

204 try: 

205 answers = dns.resolver.resolve(service_name, "SRV") 

206 except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): 

207 return None 

208 except dns.exception.DNSException as message: 

209 print(f"DNS Error: {message} ({domain})") 

210 return None 

211 

212 records = [] 

213 for rdata in answers: 

214 record = { 

215 "priority": rdata.priority, 

216 "weight": rdata.weight, 

217 "port": rdata.port, 

218 "target": rdata.target.to_text(omit_final_dot=True), 

219 } 

220 

221 records.append(record) 

222 

223 target, port = srv_hostname(records) 

224 

225 if target and ((https and port != 443) or (not https and port != 80)): 

226 return f"{target}:{port}" 

227 

228 return target 

229 

230 

231def srv_hostname(records): 

232 """ 

233 Return the right (target, port) pair from a list of SRV records. 

234 """ 

235 

236 if len(records) < 1: 

237 return (None, None) 

238 

239 if len(records) == 1: 

240 ret = records[0] 

241 return (ret["target"], ret["port"]) 

242 

243 # Keep only the servers in the top priority 

244 priority_records = [] 

245 total_weight = 0 

246 top_priority = records[0]["priority"] # highest priority = lowest number 

247 

248 for ret in records: 

249 if ret["priority"] > top_priority: 

250 # ignore the record (ret has lower priority) 

251 continue 

252 

253 # Take care - this if is only a if, if the above if 

254 # uses continue at the end. else it should be an elsif 

255 if ret["priority"] < top_priority: 

256 # reset the priority (ret has higher priority) 

257 top_priority = ret["priority"] 

258 total_weight = 0 

259 priority_records = [] 

260 

261 total_weight += ret["weight"] 

262 

263 if ret["weight"] > 0: 

264 priority_records.append((total_weight, ret)) 

265 else: 

266 # zero-weight elements must come first 

267 priority_records.insert(0, (0, ret)) 

268 

269 if len(priority_records) == 1: 

270 unused, ret = priority_records[0] # pylint: disable=unused-variable 

271 return (ret["target"], ret["port"]) 

272 

273 # Select first record according to RFC2782 weight ordering algorithm (page 3) 

274 random_number = random.randint(0, total_weight) 

275 

276 for record in priority_records: 

277 weighted_index, ret = record 

278 

279 if weighted_index >= random_number: 

280 return (ret["target"], ret["port"]) 

281 

282 print("There is something wrong with our SRV weight ordering algorithm") 

283 return (None, None) 

284 

285 

286def lookup_ip_address(hostname, ipv6): 

287 """ 

288 Try to get IPv4 or IPv6 addresses for the given hostname 

289 """ 

290 

291 try: 

292 qtype = "AAAA" if ipv6 else "A" 

293 answers = dns.resolver.resolve(hostname, qtype) 

294 for rdata in answers: 

295 return rdata.address 

296 except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer): 

297 return None 

298 except dns.exception.DNSException as message: 

299 print(f"DNS Error: {message} ({hostname})") 

300 return None 

301 return None