Coverage for ivatar/views.py: 59%

495 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 23:07 +0000

1""" 

2views under / 

3""" 

4 

5import contextlib 

6from io import BytesIO 

7from os import path 

8import hashlib 

9import logging 

10import threading 

11from ivatar.utils import urlopen, Bluesky 

12from ivatar.access_stats import stats_manager 

13from urllib.error import HTTPError, URLError 

14from ssl import SSLError 

15from django.conf import settings 

16from django.views.generic.base import TemplateView, View 

17from django.http import HttpResponse, HttpResponseRedirect 

18from django.http import HttpResponseNotFound, JsonResponse 

19from django.core.exceptions import ObjectDoesNotExist 

20from django.core.cache import cache, caches 

21from django.utils.translation import gettext_lazy as _ 

22from django.urls import reverse_lazy 

23from django.db.models import Q 

24from django.contrib.auth.models import User 

25 

26from PIL import Image 

27 

28from monsterid.id import build_monster as BuildMonster 

29import Identicon 

30from pydenticon5 import Pydenticon5 

31from .robohash import create_robohash 

32from .pagan_optimized import create_optimized_pagan 

33 

34from ivatar.settings import AVATAR_MAX_SIZE, JPEG_QUALITY, DEFAULT_AVATAR_SIZE 

35from .ivataraccount.models import ConfirmedEmail, ConfirmedOpenId 

36from .ivataraccount.models import UnconfirmedEmail, UnconfirmedOpenId 

37from .ivataraccount.models import Photo 

38from .ivataraccount.models import pil_format, file_format 

39from .utils import is_trusted_url, mm_ng, resize_animated_gif 

40 

41# Import OpenTelemetry with graceful degradation 

42from .telemetry_utils import trace_avatar_operation, get_telemetry_metrics 

43 

44avatar_metrics = get_telemetry_metrics() 

45 

46# Initialize loggers 

47logger = logging.getLogger("ivatar") 

48security_logger = logging.getLogger("ivatar.security") 

49 

50 

51def get_size(request, size=DEFAULT_AVATAR_SIZE): 

52 """ 

53 Get size from the URL arguments 

54 """ 

55 sizetemp = None 

56 if "s" in request.GET: 

57 sizetemp = request.GET["s"] 

58 if "size" in request.GET: 

59 sizetemp = request.GET["size"] 

60 if sizetemp: 

61 if sizetemp not in ["", "0"]: 

62 with contextlib.suppress(ValueError): 

63 if int(sizetemp) > 0: 

64 size = int(sizetemp) 

65 size = min(size, int(AVATAR_MAX_SIZE)) 

66 return size 

67 

68 

69class CachingHttpResponse(HttpResponse): 

70 """ 

71 Handle caching of response 

72 """ 

73 

74 def __init__( 

75 self, 

76 uri, 

77 content=b"", 

78 content_type=None, 

79 status=200, # pylint: disable=too-many-arguments 

80 reason=None, 

81 charset=None, 

82 ): 

83 if settings.CACHE_RESPONSE: 

84 caches["filesystem"].set( 

85 uri, 

86 { 

87 "content": content, 

88 "content_type": content_type, 

89 "status": status, 

90 "reason": reason, 

91 "charset": charset, 

92 }, 

93 ) 

94 super().__init__(content, content_type, status, reason, charset) 

95 

96 

97class AvatarImageView(TemplateView): 

98 """ 

99 View to return (binary) image, based on OpenID/Email (both by digest) 

100 """ 

101 

102 # TODO: Do cache resize images!! Memcached? 

103 

104 def options(self, request, *args, **kwargs): 

105 response = HttpResponse("", content_type="text/plain") 

106 response["Allow"] = "404 mm mp retro pagan wavatar monsterid robohash identicon" 

107 return response 

108 

109 @trace_avatar_operation("avatar_request") 

110 def get( 

111 self, request, *args, **kwargs 

112 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,too-many-return-statements 

113 """ 

114 Override get from parent class 

115 """ 

116 model = ConfirmedEmail 

117 size = get_size(request) 

118 imgformat = "png" 

119 obj = None 

120 default = None 

121 forcedefault = settings.FORCEDEFAULT 

122 gravatarredirect = settings.DEFAULT_GRAVATARREDIRECT 

123 gravatarproxy = settings.DEFAULT_GRAVATARPROXY 

124 uri = request.build_absolute_uri() 

125 

126 # Check the cache first 

127 if settings.CACHE_RESPONSE: 

128 if centry := caches["filesystem"].get(uri): 

129 # Record cache hit 

130 avatar_metrics.record_cache_hit(size=str(size), format_type=imgformat) 

131 # For DEBUG purpose only 

132 # print('Cached entry for %s' % uri) 

133 return HttpResponse( 

134 centry["content"], 

135 content_type=centry["content_type"], 

136 status=centry["status"], 

137 reason=centry["reason"], 

138 charset=centry["charset"], 

139 ) 

140 else: 

141 # Record cache miss 

142 avatar_metrics.record_cache_miss(size=str(size), format_type=imgformat) 

143 

144 # In case no digest at all is provided, return to home page 

145 if "digest" not in kwargs: 

146 return HttpResponseRedirect(reverse_lazy("home")) 

147 

148 if "d" in request.GET: 

149 default = request.GET["d"] 

150 if "default" in request.GET: 

151 default = request.GET["default"] 

152 

153 if default is not None: 

154 if settings.TRUSTED_DEFAULT_URLS is None: 

155 logger.warning("Query parameter `default` is disabled.") 

156 default = None 

157 elif default.find("://") > 0: 

158 # Check if it's trusted, if not, reset to None 

159 trusted_url = is_trusted_url(default, settings.TRUSTED_DEFAULT_URLS) 

160 

161 if not trusted_url: 

162 security_logger.warning( 

163 f"Default URL is not in trusted URLs: '{default}'; Kicking it!" 

164 ) 

165 default = None 

166 

167 if "f" in request.GET: 

168 if request.GET["f"] == "y": 

169 forcedefault = True 

170 if "forcedefault" in request.GET: 

171 if request.GET["forcedefault"] == "y": 

172 forcedefault = True 

173 

174 if "gravatarredirect" in request.GET: 

175 if request.GET["gravatarredirect"] == "y": 

176 gravatarredirect = True 

177 

178 if "gravatarproxy" in request.GET: 

179 if request.GET["gravatarproxy"] == "n": 

180 gravatarproxy = False 

181 

182 try: 

183 obj = model.objects.get(digest=kwargs["digest"]) 

184 except ObjectDoesNotExist: 

185 try: 

186 obj = model.objects.get(digest_sha256=kwargs["digest"]) 

187 except ObjectDoesNotExist: 

188 model = ConfirmedOpenId 

189 with contextlib.suppress(Exception): 

190 d = kwargs["digest"] # pylint: disable=invalid-name 

191 # OpenID is tricky. http vs. https, versus trailing slash or not 

192 # However, some users eventually have added their variations already 

193 # and therefore we need to use filter() and first() 

194 obj = model.objects.filter( 

195 Q(digest=d) 

196 | Q(alt_digest1=d) 

197 | Q(alt_digest2=d) 

198 | Q(alt_digest3=d) 

199 ).first() 

200 # Handle the special case of Bluesky 

201 if obj: 

202 if obj.bluesky_handle: 

203 return HttpResponseRedirect( 

204 reverse_lazy("blueskyproxy", args=[kwargs["digest"]]) 

205 ) 

206 # If that mail/openid doesn't exist, or has no photo linked to it 

207 if not obj or not obj.photo or forcedefault: 

208 gravatar_url = ( 

209 "https://secure.gravatar.com/avatar/" 

210 + kwargs["digest"] 

211 + "?s=%i" % size 

212 ) 

213 

214 # If we have redirection to Gravatar enabled, this overrides all 

215 # default= settings, except forcedefault! 

216 if gravatarredirect and not forcedefault: 

217 return HttpResponseRedirect(gravatar_url) 

218 

219 # Request to proxy Gravatar image - only if not forcedefault 

220 if gravatarproxy and not forcedefault: 

221 url = ( 

222 reverse_lazy("gravatarproxy", args=[kwargs["digest"]]) 

223 + "?s=%i" % size 

224 ) 

225 # Ensure we do not convert None to string 'None' 

226 if default: 

227 url += f"&default={default}" 

228 return HttpResponseRedirect(url) 

229 

230 # Return the default URL, as specified, or 404 Not Found, if default=404 

231 if default: 

232 # Proxy to gravatar to generate wavatar - lazy me 

233 if str(default) == "wavatar": 

234 url = ( 

235 reverse_lazy("gravatarproxy", args=[kwargs["digest"]]) 

236 + "?s=%i" % size 

237 + f"&default={default}&f=y" 

238 ) 

239 return HttpResponseRedirect(url) 

240 

241 if str(default) == str(404): 

242 return HttpResponseNotFound(_("<h1>Image not found</h1>")) 

243 

244 if str(default) == "monsterid": 

245 monsterdata = BuildMonster(seed=kwargs["digest"], size=(size, size)) 

246 data = BytesIO() 

247 avatar_metrics.record_avatar_generated( 

248 size=str(size), format_type="png", source="monsterid" 

249 ) 

250 return self._return_cached_png(monsterdata, data, uri) 

251 if str(default) == "robohash": 

252 roboset = request.GET.get("robohash") or "any" 

253 data = create_robohash(kwargs["digest"], size, roboset) 

254 avatar_metrics.record_avatar_generated( 

255 size=str(size), format_type="png", source="robohash" 

256 ) 

257 return self._return_cached_response(data, uri) 

258 if str(default) == "retro": 

259 identicon = Identicon.render(kwargs["digest"]) 

260 data = BytesIO() 

261 img = Image.open(BytesIO(identicon)) 

262 img = img.resize((size, size), Image.LANCZOS) 

263 avatar_metrics.record_avatar_generated( 

264 size=str(size), format_type="png", source="retro" 

265 ) 

266 return self._return_cached_png(img, data, uri) 

267 if str(default) == "pagan": 

268 data = create_optimized_pagan(kwargs["digest"], size) 

269 avatar_metrics.record_avatar_generated( 

270 size=str(size), format_type="png", source="pagan" 

271 ) 

272 return self._return_cached_response(data, uri) 

273 if str(default) == "identicon": 

274 p = Pydenticon5() # pylint: disable=invalid-name 

275 # In order to make use of the whole 32 bytes digest, we need to redigest them. 

276 newdigest = hashlib.md5( 

277 bytes(kwargs["digest"], "utf-8") 

278 ).hexdigest() 

279 img = p.draw(newdigest, size, 0) 

280 data = BytesIO() 

281 avatar_metrics.record_avatar_generated( 

282 size=str(size), format_type="png", source="identicon" 

283 ) 

284 return self._return_cached_png(img, data, uri) 

285 if str(default) == "mmng": 

286 mmngimg = mm_ng(idhash=kwargs["digest"], size=size) 

287 data = BytesIO() 

288 avatar_metrics.record_avatar_generated( 

289 size=str(size), format_type="png", source="mmng" 

290 ) 

291 return self._return_cached_png(mmngimg, data, uri) 

292 if str(default) in {"mm", "mp"}: 

293 return self._redirect_static_w_size("mm", size) 

294 return HttpResponseRedirect(default) 

295 

296 return self._redirect_static_w_size("nobody", size) 

297 imgformat = obj.photo.format 

298 photodata = Image.open(BytesIO(obj.photo.data)) 

299 

300 data = BytesIO() 

301 

302 # Animated GIFs need additional handling 

303 if imgformat == "gif" and photodata.is_animated: 

304 # Debug only 

305 # print("Object is animated and has %i frames" % photodata.n_frames) 

306 data = resize_animated_gif(photodata, (size, size)) 

307 else: 

308 # If the image is smaller than what was requested, we need 

309 # to use the function resize 

310 if photodata.size[0] < size or photodata.size[1] < size: 

311 photodata = photodata.resize((size, size), Image.LANCZOS) 

312 else: 

313 photodata.thumbnail((size, size), Image.LANCZOS) 

314 photodata.save(data, pil_format(imgformat), quality=JPEG_QUALITY) 

315 

316 data.seek(0) 

317 stats_manager.update_access_count(obj.photo) 

318 stats_manager.update_access_count(obj) 

319 if imgformat == "jpg": 

320 imgformat = "jpeg" 

321 

322 # Record avatar generation metrics 

323 avatar_metrics.record_avatar_generated( 

324 size=str(size), 

325 format_type=imgformat, 

326 source="uploaded" if obj else "generated", 

327 ) 

328 

329 response = CachingHttpResponse(uri, data, content_type=f"image/{imgformat}") 

330 # Remove Vary header for images since language doesn't matter 

331 response["Vary"] = "" 

332 return response 

333 

334 def _redirect_static_w_size(self, arg0, size): 

335 """ 

336 Helper method to redirect to static image with size i/a 

337 """ 

338 # If mm is explicitly given, we need to catch that 

339 static_img = path.join("static", "img", arg0, f"{str(size)}.png") 

340 if not path.isfile(static_img): 

341 # We trust this exists!!! 

342 static_img = path.join("static", "img", arg0, "512.png") 

343 # We trust static/ is mapped to /static/ 

344 return HttpResponseRedirect(f"/{static_img}") 

345 

346 def _return_cached_response(self, data, uri): 

347 data.seek(0) 

348 response = CachingHttpResponse(uri, data, content_type="image/png") 

349 # Remove Vary header for images since language doesn't matter 

350 response["Vary"] = "" 

351 return response 

352 

353 @trace_avatar_operation("generate_png") 

354 def _return_cached_png(self, arg0, data, uri): 

355 arg0.save(data, "PNG", quality=JPEG_QUALITY) 

356 return self._return_cached_response(data, uri) 

357 

358 

359class GravatarProxyView(View): 

360 """ 

361 Proxy request to Gravatar and return the image from there 

362 """ 

363 

364 # TODO: Do cache images!! Memcached? 

365 

366 @trace_avatar_operation("gravatar_proxy") 

367 def get( 

368 self, request, *args, **kwargs 

369 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements 

370 """ 

371 Override get from parent class 

372 """ 

373 

374 def redir_default(default=None): 

375 url = ( 

376 reverse_lazy("avatar_view", args=[kwargs["digest"]]) 

377 + "?s=%i" % size 

378 + "&forcedefault=y" 

379 ) 

380 if default is not None: 

381 url += f"&default={default}" 

382 return HttpResponseRedirect(url) 

383 

384 size = get_size(request) 

385 gravatarimagedata = None 

386 default = None 

387 

388 with contextlib.suppress(Exception): 

389 if str(request.GET["default"]) != "None": 

390 default = request.GET["default"] 

391 if str(default) != "wavatar": 

392 # This part is special/hackish 

393 # Check if the image returned by Gravatar is their default image, if so, 

394 # redirect to our default instead. 

395 gravatar_test_url = ( 

396 "https://secure.gravatar.com/avatar/" 

397 + kwargs["digest"] 

398 + "?s=%i&d=%i" % (50, 404) 

399 ) 

400 if cache.get(gravatar_test_url) == "default": 

401 # DEBUG only 

402 # print("Cached Gravatar response: Default.") 

403 return redir_default(default) 

404 try: 

405 urlopen(gravatar_test_url) 

406 except HTTPError as exc: 

407 if exc.code == 404: 

408 cache.set(gravatar_test_url, "default", 60) 

409 else: 

410 logger.warning(f"Gravatar test url fetch failed: {exc}") 

411 return redir_default(default) 

412 

413 gravatar_url = ( 

414 "https://secure.gravatar.com/avatar/" + kwargs["digest"] + "?s=%i" % size 

415 ) 

416 if default: 

417 gravatar_url += f"&d={default}" 

418 

419 try: 

420 if cache.get(gravatar_url) == "err": 

421 logger.warning( 

422 f"Cached Gravatar fetch failed with URL error: {gravatar_url}" 

423 ) 

424 avatar_metrics.record_external_request("gravatar", 0) # Cached error 

425 return redir_default(default) 

426 

427 gravatarimagedata = urlopen(gravatar_url) 

428 avatar_metrics.record_external_request("gravatar", 200) 

429 except HTTPError as exc: 

430 if exc.code not in [404, 503]: 

431 logger.warning( 

432 f"Gravatar fetch failed with an unexpected {exc.code} HTTP error: {gravatar_url}" 

433 ) 

434 avatar_metrics.record_external_request("gravatar", exc.code) 

435 cache.set(gravatar_url, "err", 30) 

436 return redir_default(default) 

437 except URLError as exc: 

438 logger.warning(f"Gravatar fetch failed with URL error: {exc.reason}") 

439 avatar_metrics.record_external_request("gravatar", 0) # Network error 

440 cache.set(gravatar_url, "err", 30) 

441 return redir_default(default) 

442 except SSLError as exc: 

443 logger.warning(f"Gravatar fetch failed with SSL error: {exc.reason}") 

444 avatar_metrics.record_external_request("gravatar", 0) # SSL error 

445 cache.set(gravatar_url, "err", 30) 

446 return redir_default(default) 

447 try: 

448 data = BytesIO(gravatarimagedata.read()) 

449 img = Image.open(data) 

450 data.seek(0) 

451 response = HttpResponse( 

452 data.read(), content_type=f"image/{file_format(img.format)}" 

453 ) 

454 # Remove Vary header for images since language doesn't matter 

455 response["Vary"] = "" 

456 return response 

457 

458 except ValueError as exc: 

459 logger.error(f"Value error: {exc}") 

460 return redir_default(default) 

461 

462 # We shouldn't reach this point... But make sure we do something 

463 return redir_default(default) 

464 

465 

466class BlueskyProxyView(View): 

467 """ 

468 Proxy request to Bluesky and return the image from there 

469 """ 

470 

471 @trace_avatar_operation("bluesky_proxy") 

472 def get( 

473 self, request, *args, **kwargs 

474 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements 

475 """ 

476 Override get from parent class 

477 """ 

478 

479 def redir_default(default=None): 

480 url = ( 

481 reverse_lazy("avatar_view", args=[kwargs["digest"]]) 

482 + "?s=%i" % size 

483 + "&forcedefault=y" 

484 ) 

485 if default is not None: 

486 url += f"&default={default}" 

487 return HttpResponseRedirect(url) 

488 

489 size = get_size(request) 

490 logger.debug(f"Bluesky avatar size requested: {size}") 

491 blueskyimagedata = None 

492 default = None 

493 

494 with contextlib.suppress(Exception): 

495 if str(request.GET["default"]) != "None": 

496 default = request.GET["default"] 

497 identity = None 

498 

499 # First check for email, as this is the most common 

500 try: 

501 identity = ConfirmedEmail.objects.filter( 

502 Q(digest=kwargs["digest"]) | Q(digest_sha256=kwargs["digest"]) 

503 ).first() 

504 except Exception as exc: 

505 logger.warning(f"Exception: {exc}") 

506 

507 # If no identity is found in the email table, try the openid table 

508 if not identity: 

509 try: 

510 identity = ConfirmedOpenId.objects.filter( 

511 Q(digest=kwargs["digest"]) 

512 | Q(alt_digest1=kwargs["digest"]) 

513 | Q(alt_digest2=kwargs["digest"]) 

514 | Q(alt_digest3=kwargs["digest"]) 

515 ).first() 

516 except Exception as exc: 

517 logger.warning(f"Exception: {exc}") 

518 

519 # If still no identity is found, redirect to the default 

520 if not identity: 

521 return redir_default(default) 

522 

523 bs = Bluesky() 

524 bluesky_url = None 

525 # Try with the cache first 

526 with contextlib.suppress(Exception): 

527 if cache.get(identity.bluesky_handle): 

528 bluesky_url = cache.get(identity.bluesky_handle) 

529 if not bluesky_url: 

530 try: 

531 bluesky_url = bs.get_avatar(identity.bluesky_handle) 

532 cache.set(identity.bluesky_handle, bluesky_url) 

533 except Exception: # pylint: disable=bare-except 

534 return redir_default(default) 

535 

536 try: 

537 if cache.get(bluesky_url) == "err": 

538 logger.warning( 

539 f"Cached Bluesky fetch failed with URL error: {bluesky_url}" 

540 ) 

541 avatar_metrics.record_external_request("bluesky", 0) # Cached error 

542 return redir_default(default) 

543 

544 blueskyimagedata = urlopen(bluesky_url) 

545 avatar_metrics.record_external_request("bluesky", 200) 

546 except HTTPError as exc: 

547 if exc.code not in [404, 503]: 

548 print( 

549 f"Bluesky fetch failed with an unexpected {exc.code} HTTP error: {bluesky_url}" 

550 ) 

551 avatar_metrics.record_external_request("bluesky", exc.code) 

552 cache.set(bluesky_url, "err", 30) 

553 return redir_default(default) 

554 except URLError as exc: 

555 logger.warning(f"Bluesky fetch failed with URL error: {exc.reason}") 

556 avatar_metrics.record_external_request("bluesky", 0) # Network error 

557 cache.set(bluesky_url, "err", 30) 

558 return redir_default(default) 

559 except SSLError as exc: 

560 logger.warning(f"Bluesky fetch failed with SSL error: {exc.reason}") 

561 avatar_metrics.record_external_request("bluesky", 0) # SSL error 

562 cache.set(bluesky_url, "err", 30) 

563 return redir_default(default) 

564 try: 

565 data = BytesIO(blueskyimagedata.read()) 

566 img = Image.open(data) 

567 img_format = img.format 

568 if max(img.size) > size: 

569 aspect = img.size[0] / float(img.size[1]) 

570 if aspect > 1: 

571 new_size = (size, int(size / aspect)) 

572 else: 

573 new_size = (int(size * aspect), size) 

574 img = img.resize(new_size) 

575 data = BytesIO() 

576 img.save(data, format=img_format) 

577 

578 data.seek(0) 

579 response = HttpResponse( 

580 data.read(), content_type=f"image/{file_format(format)}" 

581 ) 

582 # Remove Vary header for images since language doesn't matter 

583 response["Vary"] = "" 

584 return response 

585 except ValueError as exc: 

586 logger.error(f"Value error: {exc}") 

587 return redir_default(default) 

588 

589 # We shouldn't reach this point... But make sure we do something 

590 return redir_default(default) 

591 

592 

593class StatsView(TemplateView, JsonResponse): 

594 """ 

595 Return stats 

596 """ 

597 

598 def get( 

599 self, request, *args, **kwargs 

600 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements 

601 retval = { 

602 "users": User.objects.count(), 

603 "mails": ConfirmedEmail.objects.count(), 

604 "openids": ConfirmedOpenId.objects.count(), # pylint: disable=no-member 

605 "unconfirmed_mails": UnconfirmedEmail.objects.count(), # pylint: disable=no-member 

606 "unconfirmed_openids": UnconfirmedOpenId.objects.count(), # pylint: disable=no-member 

607 "avatars": Photo.objects.count(), # pylint: disable=no-member 

608 } 

609 

610 # Top 10 viewed avatars 

611 top_photos = Photo.objects.order_by("-access_count")[:10] 

612 top_photos_data = [] 

613 for photo in top_photos: 

614 # Find the associated email or openid with highest access count 

615 associated_emails = photo.emails.all().order_by("-access_count") 

616 associated_openids = photo.openids.all().order_by("-access_count") 

617 

618 # Get the one with highest access count 

619 top_associated = None 

620 if associated_emails and associated_openids: 

621 if ( 

622 associated_emails[0].access_count 

623 >= associated_openids[0].access_count 

624 ): 

625 top_associated = associated_emails[0] 

626 else: 

627 top_associated = associated_openids[0] 

628 elif associated_emails: 

629 top_associated = associated_emails[0] 

630 elif associated_openids: 

631 top_associated = associated_openids[0] 

632 

633 if top_associated: 

634 if hasattr(top_associated, "email"): 

635 # It's a ConfirmedEmail 

636 top_photos_data.append( 

637 { 

638 "access_count": top_associated.access_count, 

639 "avatar_url": f"https://libravatar.org/avatar/{top_associated.digest_sha256}", 

640 } 

641 ) 

642 else: 

643 # It's a ConfirmedOpenId 

644 top_photos_data.append( 

645 { 

646 "access_count": top_associated.access_count, 

647 "avatar_url": f"https://libravatar.org/avatar/{top_associated.digest}", 

648 } 

649 ) 

650 

651 retval["top_viewed_avatars"] = top_photos_data 

652 

653 # Top 10 queried email addresses 

654 top_emails = ConfirmedEmail.objects.order_by("-access_count")[:10] 

655 top_emails_data = [] 

656 for email in top_emails: 

657 top_emails_data.append( 

658 { 

659 "access_count": email.access_count, 

660 "avatar_url": f"https://libravatar.org/avatar/{email.digest_sha256}", 

661 } 

662 ) 

663 

664 retval["top_queried_emails"] = top_emails_data 

665 

666 # Top 10 queried OpenIDs 

667 top_openids = ConfirmedOpenId.objects.order_by("-access_count")[:10] 

668 top_openids_data = [] 

669 for openid in top_openids: 

670 top_openids_data.append( 

671 { 

672 "access_count": openid.access_count, 

673 "avatar_url": f"https://libravatar.org/avatar/{openid.digest}", 

674 } 

675 ) 

676 

677 retval["top_queried_openids"] = top_openids_data 

678 

679 # Photo format distribution 

680 from django.db.models import Count 

681 

682 format_distribution = ( 

683 Photo.objects.values("format") 

684 .annotate(count=Count("format")) 

685 .order_by("-count") 

686 ) 

687 retval["photo_format_distribution"] = list(format_distribution) 

688 

689 # User activity statistics 

690 users_with_multiple_photos = ( 

691 User.objects.annotate(photo_count=Count("photo")) 

692 .filter(photo_count__gt=1) 

693 .count() 

694 ) 

695 users_with_both_email_and_openid = ( 

696 User.objects.filter( 

697 confirmedemail__isnull=False, confirmedopenid__isnull=False 

698 ) 

699 .distinct() 

700 .count() 

701 ) 

702 

703 # Calculate average photos per user 

704 total_photos = Photo.objects.count() 

705 total_users = User.objects.count() 

706 avg_photos_per_user = total_photos / total_users if total_users > 0 else 0 

707 

708 retval["user_activity"] = { 

709 "users_with_multiple_photos": users_with_multiple_photos, 

710 "users_with_both_email_and_openid": users_with_both_email_and_openid, 

711 "average_photos_per_user": round(avg_photos_per_user, 2), 

712 } 

713 

714 # Bluesky handles statistics 

715 bluesky_emails = ConfirmedEmail.objects.filter( 

716 bluesky_handle__isnull=False 

717 ).count() 

718 bluesky_openids = ConfirmedOpenId.objects.filter( 

719 bluesky_handle__isnull=False 

720 ).count() 

721 total_bluesky_handles = bluesky_emails + bluesky_openids 

722 

723 # Top Bluesky handles by access count 

724 retval["bluesky_handles"] = { 

725 "total_bluesky_handles": total_bluesky_handles, 

726 "bluesky_emails": bluesky_emails, 

727 "bluesky_openids": bluesky_openids, 

728 } 

729 

730 # Average photo size statistics using raw SQL 

731 from django.db import connection 

732 

733 with connection.cursor() as cursor: 

734 # SQL to calculate average photo size 

735 cursor.execute(""" 

736 SELECT 

737 COUNT(*) as photo_count, 

738 AVG(LENGTH(data)) as avg_size_bytes 

739 FROM ivataraccount_photo 

740 WHERE data IS NOT NULL 

741 """) 

742 result = cursor.fetchone() 

743 

744 if result and result[0] > 0: 

745 photo_count, avg_size_bytes = result 

746 # Convert to float in case database returns string 

747 avg_size_bytes = float(avg_size_bytes) if avg_size_bytes else 0 

748 avg_size_kb = round(avg_size_bytes / 1024, 2) if avg_size_bytes else 0 

749 avg_size_mb = ( 

750 round(avg_size_bytes / (1024 * 1024), 2) if avg_size_bytes else 0 

751 ) 

752 

753 retval["photo_size_stats"] = { 

754 "average_size_bytes": ( 

755 round(avg_size_bytes, 2) if avg_size_bytes else 0 

756 ), 

757 "average_size_kb": avg_size_kb, 

758 "average_size_mb": avg_size_mb, 

759 "total_photos_analyzed": photo_count, 

760 } 

761 else: 

762 retval["photo_size_stats"] = { 

763 "average_size_bytes": 0, 

764 "average_size_kb": 0, 

765 "average_size_mb": 0, 

766 "total_photos_analyzed": 0, 

767 } 

768 

769 # For potential duplicate photos, we'll check for photos with the same format and size 

770 # Note: This is not definitive - different images can have the same format and size 

771 # but it's a good indicator of potential duplicates that might warrant investigation 

772 with connection.cursor() as cursor: 

773 cursor.execute(""" 

774 SELECT 

775 format, 

776 LENGTH(data) as file_size, 

777 COUNT(*) as count 

778 FROM ivataraccount_photo 

779 WHERE data IS NOT NULL 

780 GROUP BY format, LENGTH(data) 

781 HAVING COUNT(*) > 1 

782 ORDER BY count DESC 

783 LIMIT 10 

784 """) 

785 duplicate_groups = cursor.fetchall() 

786 

787 total_potential_duplicate_photos = sum( 

788 group[2] for group in duplicate_groups 

789 ) 

790 

791 # Convert to list of dictionaries for JSON serialization 

792 duplicate_groups_detail = [ 

793 {"format": group[0], "file_size": group[1], "count": group[2]} 

794 for group in duplicate_groups 

795 ] 

796 

797 retval["potential_duplicate_photos"] = { 

798 "potential_duplicate_groups": len(duplicate_groups), 

799 "total_potential_duplicate_photos": total_potential_duplicate_photos, 

800 "potential_duplicate_groups_detail": duplicate_groups_detail, 

801 "note": "Potential duplicates are identified by matching file format and size - not definitive duplicates", 

802 } 

803 

804 return JsonResponse(retval) 

805 

806 

807# Thread-safe version cache - cached indefinitely since container restarts on changes 

808_version_cache = None 

809_version_cache_lock = threading.Lock() 

810 

811 

812def _get_git_info_from_files(): 

813 """ 

814 Safely extract git information from .git files without subprocess calls 

815 """ 

816 try: 

817 # Get the project root directory 

818 project_root = path.dirname(path.dirname(path.abspath(__file__))) 

819 git_dir = path.join(project_root, ".git") 

820 

821 if not path.exists(git_dir): 

822 return None 

823 

824 # Read HEAD to get current branch/commit 

825 head_file = path.join(git_dir, "HEAD") 

826 if not path.exists(head_file): 

827 return None 

828 

829 with open(head_file) as f: 

830 head_content = f.read().strip() 

831 

832 # Parse HEAD content 

833 if head_content.startswith("ref: "): 

834 # We're on a branch 

835 branch_ref = head_content[5:] # Remove 'ref: ' 

836 branch_name = path.basename(branch_ref) 

837 

838 # Read the commit hash from the ref 

839 ref_file = path.join(git_dir, branch_ref) 

840 if path.exists(ref_file): 

841 with open(ref_file) as f: 

842 commit_hash = f.read().strip() 

843 else: 

844 return None 

845 else: 

846 # Detached HEAD state 

847 commit_hash = head_content 

848 branch_name = "detached" 

849 

850 # Try to get commit date from git log file (if available) 

851 # Optimize: read only the last line instead of entire file 

852 commit_date = None 

853 log_file = path.join(git_dir, "logs", "HEAD") 

854 if path.exists(log_file): 

855 try: 

856 with open(log_file, "rb") as f: 

857 # Seek to end and read backwards to find last line 

858 f.seek(0, 2) # Seek to end 

859 file_size = f.tell() 

860 

861 # Read backwards in chunks to find the last line 

862 chunk_size = min(1024, file_size) 

863 f.seek(max(0, file_size - chunk_size)) 

864 chunk = f.read().decode("utf-8", errors="ignore") 

865 

866 # Find the last non-empty line 

867 lines = chunk.split("\n") 

868 last_line = None 

869 for line in reversed(lines): 

870 if line.strip(): 

871 last_line = line.strip() 

872 break 

873 

874 if last_line: 

875 # Git log format: <old_hash> <new_hash> <author> <timestamp> <timezone> <message> 

876 # The format uses spaces, not tabs 

877 parts = last_line.split() 

878 if len(parts) >= 6: 

879 # Extract timestamp and convert to readable date 

880 # Format: <old_hash> <new_hash> <author_name> <author_email> <timestamp> <timezone> <message> 

881 # We need to find the timestamp which is after the author email 

882 for i, part in enumerate(parts): 

883 if part.isdigit() and len(part) == 10: # Unix timestamp 

884 import datetime 

885 

886 timestamp = int(part) 

887 commit_date = datetime.datetime.fromtimestamp( 

888 timestamp 

889 ).strftime("%Y-%m-%d %H:%M:%S %z") 

890 break 

891 except (ValueError, IndexError, UnicodeDecodeError): 

892 pass 

893 

894 # Fallback: try to get date from commit object if available 

895 if not commit_date and len(commit_hash) == 40: 

896 try: 

897 commit_dir = path.join(git_dir, "objects", commit_hash[:2]) 

898 commit_file = path.join(commit_dir, commit_hash[2:]) 

899 if path.exists(commit_file): 

900 # This would require decompressing the git object, which is complex 

901 # For now, we'll use a placeholder 

902 commit_date = "unknown" 

903 except Exception: 

904 commit_date = "unknown" 

905 

906 # Get deployment date from directory modification time 

907 # Use .git directory as it's updated during deployment (checkout/pull) 

908 deployment_date = None 

909 if path.exists(git_dir): 

910 try: 

911 import datetime 

912 

913 mtime = path.getmtime(git_dir) 

914 deployment_date = datetime.datetime.fromtimestamp(mtime).strftime( 

915 "%Y-%m-%d %H:%M:%S %z" 

916 ) 

917 except Exception: 

918 deployment_date = "unknown" 

919 

920 return { 

921 "commit_hash": commit_hash, 

922 "short_hash": commit_hash[:7] if len(commit_hash) >= 7 else commit_hash, 

923 "branch": branch_name, 

924 "commit_date": commit_date or "unknown", 

925 "deployment_date": deployment_date or "unknown", 

926 "deployment_status": "active", 

927 "version": f"{branch_name}-{commit_hash[:7] if len(commit_hash) >= 7 else commit_hash}", 

928 } 

929 

930 except Exception as exc: 

931 logger.warning(f"Failed to read git info from files: {exc}") 

932 return None 

933 

934 

935def _get_cached_version_info(): 

936 """ 

937 Get cached version information, loading it if not available 

938 Since containers restart on content changes, cache indefinitely 

939 """ 

940 global _version_cache 

941 

942 with _version_cache_lock: 

943 if _version_cache is None: 

944 # Get version info from git files 

945 _version_cache = _get_git_info_from_files() 

946 

947 # If that fails, return error 

948 if _version_cache is None: 

949 _version_cache = { 

950 "error": "Unable to determine version - .git directory not found", 

951 "deployment_status": "unknown", 

952 } 

953 

954 return _version_cache 

955 

956 

957class DeploymentVersionView(View): 

958 """ 

959 View to return deployment version information for CI/CD verification 

960 Uses cached version info to prevent DDoS attacks and improve performance 

961 """ 

962 

963 def get(self, request, *args, **kwargs): 

964 """ 

965 Return cached deployment version information including application version 

966 """ 

967 from django.conf import settings 

968 

969 version_info = _get_cached_version_info() 

970 

971 if "error" in version_info: 

972 # Even on error, include the application version if available 

973 try: 

974 version_info["application_version"] = getattr( 

975 settings, "IVATAR_VERSION", "unknown" 

976 ) 

977 except Exception: 

978 pass 

979 return JsonResponse(version_info, status=500) 

980 

981 # Add application version to the response 

982 try: 

983 version_info["application_version"] = getattr( 

984 settings, "IVATAR_VERSION", "unknown" 

985 ) 

986 except Exception: 

987 version_info["application_version"] = "unknown" 

988 

989 return JsonResponse(version_info)