Coverage for ivatar/access_stats.py: 80%

106 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 23:07 +0000

1import time 

2import threading 

3import atexit 

4import logging 

5from typing import Set 

6from django.core.cache import cache 

7from django.db import models 

8from django.db.models import F 

9from django.conf import settings 

10from django.apps import apps 

11 

12logger = logging.getLogger("ivatar.access_stats") 

13 

14 

15class AccessStatsManager: 

16 def __init__(self): 

17 self._local_dirty_keys: Set[str] = set() 

18 self._lock = threading.Lock() 

19 atexit.register(self.flush_all_dirty_keys) 

20 

21 def _get_flush_timeout(self) -> int: 

22 return getattr(settings, "STATS_FLUSH_TIMEOUT", 300) 

23 

24 def _get_batch_size(self) -> int: 

25 return getattr(settings, "STATS_BATCH_SIZE", 100) 

26 

27 def _flush_key_sync(self, key: str) -> None: 

28 """ 

29 Synchronously flush a specific cache key to the database. 

30 Uses a distributed lock to prevent race conditions. 

31 """ 

32 lock_key = f"{key}:lock" 

33 # Acquire lock (expire in 60s to prevent deadlocks) 

34 # cache.add returns True if the key was added (lock acquired), False otherwise 

35 if not cache.add(lock_key, "locked", 60): 

36 logger.debug("Could not acquire lock for %s, skipping flush", key) 

37 return 

38 

39 try: 

40 # key format: "stats:access_count:ModelName:pk" 

41 parts = key.split(":") 

42 if len(parts) != 4: 

43 return 

44 

45 model_name = parts[2] 

46 pk = parts[3] 

47 

48 # Get the cached value 

49 try: 

50 count = cache.get(key) 

51 if not count: 

52 return 

53 count = int(count) 

54 except (ValueError, TypeError): 

55 return 

56 

57 if count == 0: 

58 return 

59 

60 # Find the model class 

61 # We assume models are in ivataraccount app 

62 model_class = None 

63 try: 

64 model_class = apps.get_model("ivataraccount", model_name) 

65 except LookupError: 

66 logger.warning("Could not find model %s", model_name) 

67 pass 

68 

69 if not model_class: 

70 return 

71 

72 logger.info( 

73 "Flushing %d access counts for %s:%s to database", count, model_name, pk 

74 ) 

75 

76 # Update DB 

77 model_class.objects.filter(pk=pk).update( 

78 access_count=F("access_count") + count 

79 ) 

80 

81 # Decrement cache 

82 try: 

83 cache.decr(key, count) 

84 except ValueError: 

85 cache.set(key, 0) 

86 

87 # Clear start time 

88 start_time_key = f"stats:start_time:{model_name}:{pk}" 

89 cache.delete(start_time_key) 

90 

91 except Exception as e: 

92 logger.error("Error flushing key %s: %s", key, e) 

93 finally: 

94 # Release lock 

95 cache.delete(lock_key) 

96 

97 def _flush_worker(self, key: str) -> None: 

98 """ 

99 Worker function to flush key and remove from dirty set. 

100 """ 

101 self._flush_key_sync(key) 

102 with self._lock: 

103 if key in self._local_dirty_keys: 

104 self._local_dirty_keys.remove(key) 

105 

106 def flush_all_dirty_keys(self) -> None: 

107 """ 

108 Flush all known dirty keys. Registered with atexit. 

109 """ 

110 with self._lock: 

111 keys_to_flush = list(self._local_dirty_keys) 

112 self._local_dirty_keys.clear() 

113 

114 for key in keys_to_flush: 

115 self._flush_key_sync(key) 

116 

117 def update_access_count(self, obj: models.Model) -> None: 

118 """ 

119 Update access count for an object (Photo, ConfirmedEmail, ConfirmedOpenId). 

120 If ASYNC_ACCESS_COUNT is True, uses Memcached to batch updates. 

121 Otherwise, updates the database directly. 

122 """ 

123 if not getattr(settings, "ASYNC_ACCESS_COUNT", True): 

124 # Fallback to synchronous/direct update 

125 obj.access_count = F("access_count") + 1 

126 obj.save(update_fields=["access_count"]) 

127 return 

128 

129 # Key format: stats:access_count:<model_name>:<pk> 

130 model_name = obj.__class__.__name__ 

131 pk = obj.pk 

132 key = f"stats:access_count:{model_name}:{pk}" 

133 start_time_key = f"stats:start_time:{model_name}:{pk}" 

134 

135 # Track as dirty 

136 with self._lock: 

137 self._local_dirty_keys.add(key) 

138 

139 try: 

140 # Atomic increment. 

141 try: 

142 new_value = cache.incr(key) 

143 except ValueError: 

144 # Key didn't exist, set it to 1 

145 cache.set(key, 1) 

146 new_value = 1 

147 # Set start time for timeout flush 

148 cache.set(start_time_key, time.time(), timeout=None) 

149 

150 # Check triggers 

151 batch_size = self._get_batch_size() 

152 should_flush = False 

153 

154 if new_value >= batch_size: 

155 should_flush = True 

156 else: 

157 # Check time-based trigger 

158 start_time = cache.get(start_time_key) 

159 if start_time: 

160 try: 

161 if time.time() - float(start_time) > self._get_flush_timeout(): 

162 should_flush = True 

163 except (ValueError, TypeError): 

164 pass 

165 

166 if should_flush: 

167 # Run flush in a separate thread to avoid blocking the request 

168 threading.Thread(target=self._flush_worker, args=(key,)).start() 

169 

170 except Exception as e: 

171 # Fallback: update DB directly if cache fails 

172 logger.warning("Failed to update access count cache: %s", e) 

173 obj.access_count = F("access_count") + 1 

174 obj.save(update_fields=["access_count"]) 

175 

176 

177stats_manager = AccessStatsManager()