Coverage for ivatar/access_stats.py: 80%
106 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 23:07 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 23:07 +0000
1import time
2import threading
3import atexit
4import logging
5from typing import Set
6from django.core.cache import cache
7from django.db import models
8from django.db.models import F
9from django.conf import settings
10from django.apps import apps
12logger = logging.getLogger("ivatar.access_stats")
15class AccessStatsManager:
16 def __init__(self):
17 self._local_dirty_keys: Set[str] = set()
18 self._lock = threading.Lock()
19 atexit.register(self.flush_all_dirty_keys)
21 def _get_flush_timeout(self) -> int:
22 return getattr(settings, "STATS_FLUSH_TIMEOUT", 300)
24 def _get_batch_size(self) -> int:
25 return getattr(settings, "STATS_BATCH_SIZE", 100)
27 def _flush_key_sync(self, key: str) -> None:
28 """
29 Synchronously flush a specific cache key to the database.
30 Uses a distributed lock to prevent race conditions.
31 """
32 lock_key = f"{key}:lock"
33 # Acquire lock (expire in 60s to prevent deadlocks)
34 # cache.add returns True if the key was added (lock acquired), False otherwise
35 if not cache.add(lock_key, "locked", 60):
36 logger.debug("Could not acquire lock for %s, skipping flush", key)
37 return
39 try:
40 # key format: "stats:access_count:ModelName:pk"
41 parts = key.split(":")
42 if len(parts) != 4:
43 return
45 model_name = parts[2]
46 pk = parts[3]
48 # Get the cached value
49 try:
50 count = cache.get(key)
51 if not count:
52 return
53 count = int(count)
54 except (ValueError, TypeError):
55 return
57 if count == 0:
58 return
60 # Find the model class
61 # We assume models are in ivataraccount app
62 model_class = None
63 try:
64 model_class = apps.get_model("ivataraccount", model_name)
65 except LookupError:
66 logger.warning("Could not find model %s", model_name)
67 pass
69 if not model_class:
70 return
72 logger.info(
73 "Flushing %d access counts for %s:%s to database", count, model_name, pk
74 )
76 # Update DB
77 model_class.objects.filter(pk=pk).update(
78 access_count=F("access_count") + count
79 )
81 # Decrement cache
82 try:
83 cache.decr(key, count)
84 except ValueError:
85 cache.set(key, 0)
87 # Clear start time
88 start_time_key = f"stats:start_time:{model_name}:{pk}"
89 cache.delete(start_time_key)
91 except Exception as e:
92 logger.error("Error flushing key %s: %s", key, e)
93 finally:
94 # Release lock
95 cache.delete(lock_key)
97 def _flush_worker(self, key: str) -> None:
98 """
99 Worker function to flush key and remove from dirty set.
100 """
101 self._flush_key_sync(key)
102 with self._lock:
103 if key in self._local_dirty_keys:
104 self._local_dirty_keys.remove(key)
106 def flush_all_dirty_keys(self) -> None:
107 """
108 Flush all known dirty keys. Registered with atexit.
109 """
110 with self._lock:
111 keys_to_flush = list(self._local_dirty_keys)
112 self._local_dirty_keys.clear()
114 for key in keys_to_flush:
115 self._flush_key_sync(key)
117 def update_access_count(self, obj: models.Model) -> None:
118 """
119 Update access count for an object (Photo, ConfirmedEmail, ConfirmedOpenId).
120 If ASYNC_ACCESS_COUNT is True, uses Memcached to batch updates.
121 Otherwise, updates the database directly.
122 """
123 if not getattr(settings, "ASYNC_ACCESS_COUNT", True):
124 # Fallback to synchronous/direct update
125 obj.access_count = F("access_count") + 1
126 obj.save(update_fields=["access_count"])
127 return
129 # Key format: stats:access_count:<model_name>:<pk>
130 model_name = obj.__class__.__name__
131 pk = obj.pk
132 key = f"stats:access_count:{model_name}:{pk}"
133 start_time_key = f"stats:start_time:{model_name}:{pk}"
135 # Track as dirty
136 with self._lock:
137 self._local_dirty_keys.add(key)
139 try:
140 # Atomic increment.
141 try:
142 new_value = cache.incr(key)
143 except ValueError:
144 # Key didn't exist, set it to 1
145 cache.set(key, 1)
146 new_value = 1
147 # Set start time for timeout flush
148 cache.set(start_time_key, time.time(), timeout=None)
150 # Check triggers
151 batch_size = self._get_batch_size()
152 should_flush = False
154 if new_value >= batch_size:
155 should_flush = True
156 else:
157 # Check time-based trigger
158 start_time = cache.get(start_time_key)
159 if start_time:
160 try:
161 if time.time() - float(start_time) > self._get_flush_timeout():
162 should_flush = True
163 except (ValueError, TypeError):
164 pass
166 if should_flush:
167 # Run flush in a separate thread to avoid blocking the request
168 threading.Thread(target=self._flush_worker, args=(key,)).start()
170 except Exception as e:
171 # Fallback: update DB directly if cache fails
172 logger.warning("Failed to update access count cache: %s", e)
173 obj.access_count = F("access_count") + 1
174 obj.save(update_fields=["access_count"])
177stats_manager = AccessStatsManager()