Coverage for ivatar/test_access_stats.py: 100%

81 statements  

« prev     ^ index     » next       coverage.py v7.14.2, created at 2026-06-21 23:07 +0000

1from django.test import TransactionTestCase, Client, override_settings 

2from django.contrib.auth.models import User 

3from ivatar.ivataraccount.models import ConfirmedEmail, Photo 

4from ivatar.access_stats import stats_manager 

5from django.core.cache import cache 

6import time 

7from unittest.mock import patch 

8 

9 

10@override_settings( 

11 CACHES={ 

12 "default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}, 

13 "filesystem": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}, 

14 }, 

15 CACHE_RESPONSE=False, 

16 STATS_BATCH_SIZE=100, 

17 STATS_FLUSH_TIMEOUT=5, # Short timeout for testing 

18 ASYNC_ACCESS_COUNT=True, 

19) 

20class BatchStatsTest(TransactionTestCase): 

21 def setUp(self): 

22 self.username = "testuser_" + str(time.time()) 

23 self.password = "password" 

24 self.user = User.objects.create_user( 

25 username=self.username, password=self.password 

26 ) 

27 self.client = Client() 

28 

29 # Create a confirmed email and photo 

30 self.email_address = f"test_{time.time()}@example.com" 

31 self.email = ConfirmedEmail.objects.create( 

32 user=self.user, email=self.email_address, ip_address="127.0.0.1" 

33 ) 

34 

35 # Create a dummy photo 

36 png_data = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x02\x00\x00\x00\x90wS\xde\x00\x00\x00\tpHYs\x00\x00\x0b\x13\x00\x00\x0b\x13\x01\x00\x9a\x9c\x18\x00\x00\x00\nIDATx\x9cc```\x00\x00\x00\x04\x00\x01\xdd\x8d\xb4\x1c\x00\x00\x00\x00IEND\xaeB`\x82" 

37 self.photo = Photo.objects.create( 

38 user=self.user, data=png_data, format="png", ip_address="127.0.0.1" 

39 ) 

40 self.email.photo = self.photo 

41 self.email.save() 

42 

43 # Clear keys for this test 

44 model_name = self.email.__class__.__name__ 

45 pk = self.email.pk 

46 self.email_key = f"stats:access_count:{model_name}:{pk}" 

47 self.start_time_key = f"stats:start_time:{model_name}:{pk}" 

48 

49 cache.delete(self.email_key) 

50 cache.delete(self.start_time_key) 

51 

52 # Ensure stats_manager has clean state 

53 stats_manager._local_dirty_keys.clear() 

54 

55 # Mock threading.Thread to run synchronously 

56 self.thread_patcher = patch("ivatar.access_stats.threading.Thread") 

57 self.mock_thread_cls = self.thread_patcher.start() 

58 

59 # Set up side effect to run target immediately 

60 def side_effect(target=None, args=(), **kwargs): 

61 class SyncThread: 

62 def start(self): 

63 if target: 

64 target(*args) 

65 

66 return SyncThread() 

67 

68 self.mock_thread_cls.side_effect = side_effect 

69 

70 def tearDown(self): 

71 self.thread_patcher.stop() 

72 

73 @override_settings(ASYNC_ACCESS_COUNT=False) 

74 def test_sync_update_when_disabled(self): 

75 """ 

76 Verify that updates are synchronous when feature flag is disabled. 

77 """ 

78 stats_manager.update_access_count(self.email) 

79 

80 # Check DB updated immediately 

81 self.email.refresh_from_db() 

82 self.assertEqual(self.email.access_count, 1) 

83 

84 # Check cache should be empty (or at least not used for this logic) 

85 self.assertIsNone(cache.get(self.email_key)) 

86 

87 def test_normal_increment_cached(self): 

88 """ 

89 Verify that a single request increments cache but does not update DB immediately. 

90 """ 

91 stats_manager.update_access_count(self.email) 

92 

93 # Check cache 

94 self.assertEqual(cache.get(self.email_key), 1) 

95 

96 # Check DB (should be 0) 

97 self.email.refresh_from_db() 

98 self.assertEqual(self.email.access_count, 0) 

99 

100 def test_batch_flush(self): 

101 """ 

102 Verify that hitting the batch limit triggers a DB update. 

103 """ 

104 # Increment 99 times (limit is 100) 

105 for _ in range(99): 

106 stats_manager.update_access_count(self.email) 

107 

108 self.assertEqual(cache.get(self.email_key), 99) 

109 self.email.refresh_from_db() 

110 self.assertEqual(self.email.access_count, 0) 

111 

112 # 100th increment -> Triggers flush (synchronously due to mock) 

113 stats_manager.update_access_count(self.email) 

114 

115 # No wait needed 

116 

117 # Check DB 

118 self.email.refresh_from_db() 

119 self.assertEqual(self.email.access_count, 100) 

120 

121 # Check cache reset 

122 self.assertEqual(cache.get(self.email_key), 0) 

123 

124 def test_time_flush(self): 

125 """ 

126 Verify that the time-based flush works. 

127 Timeout set to 5 seconds in override_settings. 

128 """ 

129 # 1. First increment sets start time 

130 stats_manager.update_access_count(self.email) 

131 

132 # Verify start time is set 

133 self.assertIsNotNone(cache.get(self.start_time_key)) 

134 

135 # Verify no DB update yet 

136 self.email.refresh_from_db() 

137 self.assertEqual(self.email.access_count, 0) 

138 

139 # 2. Wait for timeout > 5s 

140 time.sleep(6) 

141 

142 # 3. Next increment should trigger flush due to timeout (synchronously) 

143 stats_manager.update_access_count(self.email) 

144 

145 # No wait needed 

146 

147 # Check DB: should have 2 updates (1 from before wait + 1 from trigger) 

148 self.email.refresh_from_db() 

149 self.assertEqual(self.email.access_count, 2) 

150 

151 # Check cache reset 

152 self.assertEqual(cache.get(self.email_key), 0) 

153 # Start time should be cleared 

154 self.assertIsNone(cache.get(self.start_time_key)) 

155 

156 def test_atexit_handler(self): 

157 """ 

158 Verify that the cleanup handler flushes remaining counts. 

159 """ 

160 # Add some counts 

161 stats_manager.update_access_count(self.email) 

162 stats_manager.update_access_count(self.email) 

163 stats_manager.update_access_count(self.email) 

164 

165 self.assertEqual(cache.get(self.email_key), 3) 

166 self.email.refresh_from_db() 

167 self.assertEqual(self.email.access_count, 0) 

168 

169 # Manually call the handler 

170 stats_manager.flush_all_dirty_keys() 

171 

172 # Check DB 

173 self.email.refresh_from_db() 

174 self.assertEqual(self.email.access_count, 3) 

175 

176 # Check cache decremented 

177 self.assertEqual(cache.get(self.email_key), 0)