Coverage for ivatar/tools/views.py: 66%
143 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 23:07 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 23:07 +0000
1"""
2View classes for ivatar/tools/
3"""
5import hashlib
6import random
8from django.views.generic.edit import FormView
9from django.urls import reverse_lazy as reverse
10from django.shortcuts import render
12import dns.resolver
13import dns.exception
15from libravatar import libravatar_url, parse_user_identity
16from libravatar import SECURE_BASE_URL as LIBRAVATAR_SECURE_BASE_URL
17from libravatar import BASE_URL as LIBRAVATAR_BASE_URL
19from ivatar.settings import SECURE_BASE_URL, BASE_URL, SITE_NAME, DEBUG
20from .forms import (
21 CheckDomainForm,
22 CheckForm,
23) # pylint: disable=relative-beyond-top-level
26class CheckDomainView(FormView):
27 """
28 View class for checking a domain
29 """
31 template_name = "check_domain.html"
32 form_class = CheckDomainForm
33 success_url = reverse("tools_check_domain")
35 def form_valid(self, form):
36 super().form_valid(form)
37 domain = form.cleaned_data["domain"]
38 result = {"avatar_server_http": lookup_avatar_server(domain, False)}
39 if result["avatar_server_http"]:
40 result["avatar_server_http_ipv4"] = lookup_ip_address(
41 result["avatar_server_http"], False
42 )
43 result["avatar_server_http_ipv6"] = lookup_ip_address(
44 result["avatar_server_http"], True
45 )
46 result["avatar_server_https"] = lookup_avatar_server(domain, True)
47 if result["avatar_server_https"]:
48 result["avatar_server_https_ipv4"] = lookup_ip_address(
49 result["avatar_server_https"], False
50 )
51 result["avatar_server_https_ipv6"] = lookup_ip_address(
52 result["avatar_server_https"], True
53 )
54 return render(
55 self.request,
56 self.template_name,
57 {
58 "form": form,
59 "result": result,
60 },
61 )
64class CheckView(FormView):
65 """
66 View class for checking an e-mail or openid address
67 """
69 template_name = "check.html"
70 form_class = CheckForm
71 success_url = reverse("tools_check")
73 def form_valid(self, form):
74 mailurl = None
75 openidurl = None
76 mailurl_secure = None
77 mailurl_secure_256 = None
78 openidurl_secure = None
79 mail_hash = None
80 mail_hash256 = None
81 openid_hash = None
82 super().form_valid(form)
84 if form.cleaned_data["default_url"]:
85 default_url = form.cleaned_data["default_url"]
86 elif (
87 form.cleaned_data["default_opt"]
88 and form.cleaned_data["default_opt"] != "none"
89 ):
90 default_url = form.cleaned_data["default_opt"]
91 else:
92 default_url = None
94 size = form.cleaned_data["size"] if "size" in form.cleaned_data else 80
95 if form.cleaned_data["mail"]:
96 mailurl = libravatar_url(
97 email=form.cleaned_data["mail"], size=size, default=default_url
98 )
99 mailurl = mailurl.replace(LIBRAVATAR_BASE_URL, BASE_URL)
100 mailurl_secure = libravatar_url(
101 email=form.cleaned_data["mail"],
102 size=size,
103 https=True,
104 default=default_url,
105 )
106 mailurl_secure = mailurl_secure.replace(
107 LIBRAVATAR_SECURE_BASE_URL, SECURE_BASE_URL
108 )
109 mail_hash = parse_user_identity(
110 email=form.cleaned_data["mail"], openid=None
111 )[0]
112 hash_obj = hashlib.new("sha256")
113 hash_obj.update(form.cleaned_data["mail"].encode("utf-8"))
114 mail_hash256 = hash_obj.hexdigest()
115 mailurl_secure_256 = mailurl_secure.replace(mail_hash, mail_hash256)
116 if form.cleaned_data["openid"]:
117 if not form.cleaned_data["openid"].startswith(
118 "http://"
119 ) and not form.cleaned_data["openid"].startswith("https://"):
120 form.cleaned_data["openid"] = f'http://{form.cleaned_data["openid"]}'
121 openidurl = libravatar_url(
122 openid=form.cleaned_data["openid"], size=size, default=default_url
123 )
124 openidurl = openidurl.replace(LIBRAVATAR_BASE_URL, BASE_URL)
125 openidurl_secure = libravatar_url(
126 openid=form.cleaned_data["openid"],
127 size=size,
128 https=True,
129 default=default_url,
130 )
131 openidurl_secure = openidurl_secure.replace(
132 LIBRAVATAR_SECURE_BASE_URL, SECURE_BASE_URL
133 )
134 openid_hash = parse_user_identity(
135 openid=form.cleaned_data["openid"], email=None
136 )[0]
138 if "DEVELOPMENT" in SITE_NAME and DEBUG:
139 if mailurl:
140 mailurl = mailurl.replace(
141 "https://avatars.linux-kernel.at",
142 f"http://{self.request.get_host()}",
143 )
144 if mailurl_secure:
145 mailurl_secure = mailurl_secure.replace(
146 "https://avatars.linux-kernel.at",
147 f"http://{self.request.get_host()}",
148 )
149 if mailurl_secure_256:
150 mailurl_secure_256 = mailurl_secure_256.replace(
151 "https://avatars.linux-kernel.at",
152 f"http://{self.request.get_host()}",
153 )
155 if openidurl:
156 openidurl = openidurl.replace(
157 "https://avatars.linux-kernel.at",
158 f"http://{self.request.get_host()}",
159 )
160 if openidurl_secure:
161 openidurl_secure = openidurl_secure.replace(
162 "https://avatars.linux-kernel.at",
163 f"http://{self.request.get_host()}",
164 )
165 print(mailurl, openidurl, mailurl_secure, mailurl_secure_256, openidurl_secure)
167 return render(
168 self.request,
169 self.template_name,
170 {
171 "form": form,
172 "mailurl": mailurl,
173 "openidurl": openidurl,
174 "mailurl_secure": mailurl_secure,
175 "mailurl_secure_256": mailurl_secure_256,
176 "openidurl_secure": openidurl_secure,
177 "mail_hash": mail_hash,
178 "mail_hash256": mail_hash256,
179 "openid_hash": openid_hash,
180 "size": size,
181 },
182 )
185def lookup_avatar_server(domain, https):
186 """
187 Extract the avatar server from an SRV record in the DNS zone
189 The SRV records should look like this:
191 _avatars._tcp.example.com. IN SRV 0 0 80 avatars.example.com
192 _avatars-sec._tcp.example.com. IN SRV 0 0 443 avatars.example.com
193 """
195 if domain and len(domain) > 60:
196 domain = domain[:60]
198 service_name = None
199 if https:
200 service_name = f"_avatars-sec._tcp.{domain}"
201 else:
202 service_name = f"_avatars._tcp.{domain}"
204 try:
205 answers = dns.resolver.resolve(service_name, "SRV")
206 except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
207 return None
208 except dns.exception.DNSException as message:
209 print(f"DNS Error: {message} ({domain})")
210 return None
212 records = []
213 for rdata in answers:
214 record = {
215 "priority": rdata.priority,
216 "weight": rdata.weight,
217 "port": rdata.port,
218 "target": rdata.target.to_text(omit_final_dot=True),
219 }
221 records.append(record)
223 target, port = srv_hostname(records)
225 if target and ((https and port != 443) or (not https and port != 80)):
226 return f"{target}:{port}"
228 return target
231def srv_hostname(records):
232 """
233 Return the right (target, port) pair from a list of SRV records.
234 """
236 if len(records) < 1:
237 return (None, None)
239 if len(records) == 1:
240 ret = records[0]
241 return (ret["target"], ret["port"])
243 # Keep only the servers in the top priority
244 priority_records = []
245 total_weight = 0
246 top_priority = records[0]["priority"] # highest priority = lowest number
248 for ret in records:
249 if ret["priority"] > top_priority:
250 # ignore the record (ret has lower priority)
251 continue
253 # Take care - this if is only a if, if the above if
254 # uses continue at the end. else it should be an elsif
255 if ret["priority"] < top_priority:
256 # reset the priority (ret has higher priority)
257 top_priority = ret["priority"]
258 total_weight = 0
259 priority_records = []
261 total_weight += ret["weight"]
263 if ret["weight"] > 0:
264 priority_records.append((total_weight, ret))
265 else:
266 # zero-weight elements must come first
267 priority_records.insert(0, (0, ret))
269 if len(priority_records) == 1:
270 unused, ret = priority_records[0] # pylint: disable=unused-variable
271 return (ret["target"], ret["port"])
273 # Select first record according to RFC2782 weight ordering algorithm (page 3)
274 random_number = random.randint(0, total_weight)
276 for record in priority_records:
277 weighted_index, ret = record
279 if weighted_index >= random_number:
280 return (ret["target"], ret["port"])
282 print("There is something wrong with our SRV weight ordering algorithm")
283 return (None, None)
286def lookup_ip_address(hostname, ipv6):
287 """
288 Try to get IPv4 or IPv6 addresses for the given hostname
289 """
291 try:
292 qtype = "AAAA" if ipv6 else "A"
293 answers = dns.resolver.resolve(hostname, qtype)
294 for rdata in answers:
295 return rdata.address
296 except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
297 return None
298 except dns.exception.DNSException as message:
299 print(f"DNS Error: {message} ({hostname})")
300 return None
301 return None