Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/tests/test_backend.py
Show All 31 Lines | def mock_cooking(self): | ||||
'cooker_cls': mcc, | 'cooker_cls': mcc, | ||||
'cooker': mc} | 'cooker': mc} | ||||
def assertTimestampAlmostNow(self, ts, tolerance_secs=1.0): | def assertTimestampAlmostNow(self, ts, tolerance_secs=1.0): | ||||
now = datetime.datetime.now(datetime.timezone.utc) | now = datetime.datetime.now(datetime.timezone.utc) | ||||
creation_delta_secs = (ts - now).total_seconds() | creation_delta_secs = (ts - now).total_seconds() | ||||
self.assertLess(creation_delta_secs, tolerance_secs) | self.assertLess(creation_delta_secs, tolerance_secs) | ||||
def hash_content(self, content): | |||||
obj_id = hashutil.hash_data(content)['sha1'] | |||||
return content, obj_id | |||||
def fake_cook(self, obj_type, result_content, sticky=False): | |||||
content, obj_id = self.hash_content(result_content) | |||||
with self.mock_cooking(): | |||||
self.vault_backend.create_task(obj_type, obj_id, sticky) | |||||
self.vault_backend.cache.add(obj_type, obj_id, b'content') | |||||
self.vault_backend.set_status(obj_type, obj_id, 'done') | |||||
return obj_id, content | |||||
TEST_TYPE = 'revision_gitfast' | TEST_TYPE = 'revision_gitfast' | ||||
TEST_HEX_ID = '4a4b9771542143cf070386f86b4b92d42966bdbc' | TEST_HEX_ID = '4a4b9771542143cf070386f86b4b92d42966bdbc' | ||||
TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) | TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) | ||||
TEST_PROGRESS = ("Mr. White, You're telling me you're cooking again?" | TEST_PROGRESS = ("Mr. White, You're telling me you're cooking again?" | ||||
" \N{ASTONISHED FACE} ") | " \N{ASTONISHED FACE} ") | ||||
TEST_EMAIL = 'ouiche@example.com' | TEST_EMAIL = 'ouiche@example.com' | ||||
▲ Show 20 Lines • Show All 107 Lines • ▼ Show 20 Lines | def test_cook_email_pending_done(self): | ||||
self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) | self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) | ||||
madd.assert_not_called() | madd.assert_not_called() | ||||
msend.assert_not_called() | msend.assert_not_called() | ||||
madd.reset_mock() | madd.reset_mock() | ||||
msend.reset_mock() | msend.reset_mock() | ||||
self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) | self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, | ||||
email=TEST_EMAIL) | |||||
madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) | madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) | ||||
msend.assert_not_called() | msend.assert_not_called() | ||||
madd.reset_mock() | madd.reset_mock() | ||||
msend.reset_mock() | msend.reset_mock() | ||||
self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') | self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') | ||||
self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) | self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, | ||||
email=TEST_EMAIL) | |||||
msend.assert_called_once_with(None, TEST_EMAIL, | msend.assert_called_once_with(None, TEST_EMAIL, | ||||
TEST_TYPE, TEST_OBJ_ID) | TEST_TYPE, TEST_OBJ_ID) | ||||
madd.assert_not_called() | madd.assert_not_called() | ||||
def test_send_all_emails(self): | def test_send_all_emails(self): | ||||
with self.mock_cooking(): | with self.mock_cooking(): | ||||
emails = ('a@example.com', | emails = ('a@example.com', | ||||
'billg@example.com', | 'billg@example.com', | ||||
'test+42@example.org') | 'test+42@example.org') | ||||
for email in emails: | for email in emails: | ||||
self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email) | self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, | ||||
email=email) | |||||
self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') | self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') | ||||
with patch.object(self.vault_backend, 'smtp_server') as m: | with patch.object(self.vault_backend, 'smtp_server') as m: | ||||
self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) | self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) | ||||
sent_emails = {k[0][0] for k in m.send_message.call_args_list} | sent_emails = {k[0][0] for k in m.send_message.call_args_list} | ||||
self.assertEqual({k['To'] for k in sent_emails}, set(emails)) | self.assertEqual({k['To'] for k in sent_emails}, set(emails)) | ||||
Show All 25 Lines | def test_available(self): | ||||
TEST_OBJ_ID)) | TEST_OBJ_ID)) | ||||
self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') | self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') | ||||
self.assertTrue(self.vault_backend.is_available(TEST_TYPE, | self.assertTrue(self.vault_backend.is_available(TEST_TYPE, | ||||
TEST_OBJ_ID)) | TEST_OBJ_ID)) | ||||
def test_fetch(self): | def test_fetch(self): | ||||
self.assertEqual(self.vault_backend.fetch(TEST_TYPE, TEST_OBJ_ID), | self.assertEqual(self.vault_backend.fetch(TEST_TYPE, TEST_OBJ_ID), | ||||
None) | None) | ||||
with self.mock_cooking(): | obj_id, content = self.fake_cook(TEST_TYPE, b'content') | ||||
self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) | |||||
self.vault_backend.cache.add(TEST_TYPE, TEST_OBJ_ID, b'content') | |||||
self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') | |||||
info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) | info = self.vault_backend.task_info(TEST_TYPE, obj_id) | ||||
access_ts_before = info['ts_last_access'] | access_ts_before = info['ts_last_access'] | ||||
self.assertEqual(self.vault_backend.fetch(TEST_TYPE, TEST_OBJ_ID), | self.assertEqual(self.vault_backend.fetch(TEST_TYPE, obj_id), | ||||
b'content') | b'content') | ||||
info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) | info = self.vault_backend.task_info(TEST_TYPE, obj_id) | ||||
access_ts_after = info['ts_last_access'] | access_ts_after = info['ts_last_access'] | ||||
self.assertTimestampAlmostNow(access_ts_after) | self.assertTimestampAlmostNow(access_ts_after) | ||||
self.assertLess(access_ts_before, access_ts_after) | self.assertLess(access_ts_before, access_ts_after) | ||||
def test_cache_expire_oldest(self): | |||||
r = range(1, 10) | |||||
inserted = {} | |||||
for i in r: | |||||
sticky = (i == 5) | |||||
content = b'content%s' % str(i).encode() | |||||
obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) | |||||
inserted[i] = (obj_id, content) | |||||
self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) | |||||
self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) | |||||
self.vault_backend.cache_expire_oldest(n=4) | |||||
should_be_still_here = {2, 3, 5, 8, 9} | |||||
for i in r: | |||||
self.assertEqual(self.vault_backend.is_available( | |||||
TEST_TYPE, inserted[i][0]), i in should_be_still_here) | |||||
def test_cache_expire_until(self): | |||||
r = range(1, 10) | |||||
inserted = {} | |||||
for i in r: | |||||
sticky = (i == 5) | |||||
content = b'content%s' % str(i).encode() | |||||
obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) | |||||
inserted[i] = (obj_id, content) | |||||
if i == 7: | |||||
cutoff_date = datetime.datetime.now() | |||||
self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) | |||||
self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) | |||||
self.vault_backend.cache_expire_until(date=cutoff_date) | |||||
should_be_still_here = {2, 3, 5, 8, 9} | |||||
for i in r: | |||||
self.assertEqual(self.vault_backend.is_available( | |||||
TEST_TYPE, inserted[i][0]), i in should_be_still_here) |