Coverage for ivatar/test_access_stats.py: 100%
81 statements
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 23:07 +0000
« prev ^ index » next coverage.py v7.14.2, created at 2026-06-21 23:07 +0000
1from django.test import TransactionTestCase, Client, override_settings
2from django.contrib.auth.models import User
3from ivatar.ivataraccount.models import ConfirmedEmail, Photo
4from ivatar.access_stats import stats_manager
5from django.core.cache import cache
6import time
7from unittest.mock import patch
10@override_settings(
11 CACHES={
12 "default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"},
13 "filesystem": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"},
14 },
15 CACHE_RESPONSE=False,
16 STATS_BATCH_SIZE=100,
17 STATS_FLUSH_TIMEOUT=5, # Short timeout for testing
18 ASYNC_ACCESS_COUNT=True,
19)
20class BatchStatsTest(TransactionTestCase):
21 def setUp(self):
22 self.username = "testuser_" + str(time.time())
23 self.password = "password"
24 self.user = User.objects.create_user(
25 username=self.username, password=self.password
26 )
27 self.client = Client()
29 # Create a confirmed email and photo
30 self.email_address = f"test_{time.time()}@example.com"
31 self.email = ConfirmedEmail.objects.create(
32 user=self.user, email=self.email_address, ip_address="127.0.0.1"
33 )
35 # Create a dummy photo
36 png_data = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x02\x00\x00\x00\x90wS\xde\x00\x00\x00\tpHYs\x00\x00\x0b\x13\x00\x00\x0b\x13\x01\x00\x9a\x9c\x18\x00\x00\x00\nIDATx\x9cc```\x00\x00\x00\x04\x00\x01\xdd\x8d\xb4\x1c\x00\x00\x00\x00IEND\xaeB`\x82"
37 self.photo = Photo.objects.create(
38 user=self.user, data=png_data, format="png", ip_address="127.0.0.1"
39 )
40 self.email.photo = self.photo
41 self.email.save()
43 # Clear keys for this test
44 model_name = self.email.__class__.__name__
45 pk = self.email.pk
46 self.email_key = f"stats:access_count:{model_name}:{pk}"
47 self.start_time_key = f"stats:start_time:{model_name}:{pk}"
49 cache.delete(self.email_key)
50 cache.delete(self.start_time_key)
52 # Ensure stats_manager has clean state
53 stats_manager._local_dirty_keys.clear()
55 # Mock threading.Thread to run synchronously
56 self.thread_patcher = patch("ivatar.access_stats.threading.Thread")
57 self.mock_thread_cls = self.thread_patcher.start()
59 # Set up side effect to run target immediately
60 def side_effect(target=None, args=(), **kwargs):
61 class SyncThread:
62 def start(self):
63 if target:
64 target(*args)
66 return SyncThread()
68 self.mock_thread_cls.side_effect = side_effect
70 def tearDown(self):
71 self.thread_patcher.stop()
73 @override_settings(ASYNC_ACCESS_COUNT=False)
74 def test_sync_update_when_disabled(self):
75 """
76 Verify that updates are synchronous when feature flag is disabled.
77 """
78 stats_manager.update_access_count(self.email)
80 # Check DB updated immediately
81 self.email.refresh_from_db()
82 self.assertEqual(self.email.access_count, 1)
84 # Check cache should be empty (or at least not used for this logic)
85 self.assertIsNone(cache.get(self.email_key))
87 def test_normal_increment_cached(self):
88 """
89 Verify that a single request increments cache but does not update DB immediately.
90 """
91 stats_manager.update_access_count(self.email)
93 # Check cache
94 self.assertEqual(cache.get(self.email_key), 1)
96 # Check DB (should be 0)
97 self.email.refresh_from_db()
98 self.assertEqual(self.email.access_count, 0)
100 def test_batch_flush(self):
101 """
102 Verify that hitting the batch limit triggers a DB update.
103 """
104 # Increment 99 times (limit is 100)
105 for _ in range(99):
106 stats_manager.update_access_count(self.email)
108 self.assertEqual(cache.get(self.email_key), 99)
109 self.email.refresh_from_db()
110 self.assertEqual(self.email.access_count, 0)
112 # 100th increment -> Triggers flush (synchronously due to mock)
113 stats_manager.update_access_count(self.email)
115 # No wait needed
117 # Check DB
118 self.email.refresh_from_db()
119 self.assertEqual(self.email.access_count, 100)
121 # Check cache reset
122 self.assertEqual(cache.get(self.email_key), 0)
124 def test_time_flush(self):
125 """
126 Verify that the time-based flush works.
127 Timeout set to 5 seconds in override_settings.
128 """
129 # 1. First increment sets start time
130 stats_manager.update_access_count(self.email)
132 # Verify start time is set
133 self.assertIsNotNone(cache.get(self.start_time_key))
135 # Verify no DB update yet
136 self.email.refresh_from_db()
137 self.assertEqual(self.email.access_count, 0)
139 # 2. Wait for timeout > 5s
140 time.sleep(6)
142 # 3. Next increment should trigger flush due to timeout (synchronously)
143 stats_manager.update_access_count(self.email)
145 # No wait needed
147 # Check DB: should have 2 updates (1 from before wait + 1 from trigger)
148 self.email.refresh_from_db()
149 self.assertEqual(self.email.access_count, 2)
151 # Check cache reset
152 self.assertEqual(cache.get(self.email_key), 0)
153 # Start time should be cleared
154 self.assertIsNone(cache.get(self.start_time_key))
156 def test_atexit_handler(self):
157 """
158 Verify that the cleanup handler flushes remaining counts.
159 """
160 # Add some counts
161 stats_manager.update_access_count(self.email)
162 stats_manager.update_access_count(self.email)
163 stats_manager.update_access_count(self.email)
165 self.assertEqual(cache.get(self.email_key), 3)
166 self.email.refresh_from_db()
167 self.assertEqual(self.email.access_count, 0)
169 # Manually call the handler
170 stats_manager.flush_all_dirty_keys()
172 # Check DB
173 self.email.refresh_from_db()
174 self.assertEqual(self.email.access_count, 3)
176 # Check cache decremented
177 self.assertEqual(cache.get(self.email_key), 0)