Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/tests/test_backend.py
# Copyright (C) 2017 The Software Heritage developers | # Copyright (C) 2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import contextlib | import contextlib | ||||
import datetime | import datetime | ||||
import psycopg2 | import psycopg2 | ||||
import unittest | |||||
from unittest.mock import patch | from unittest.mock import patch, MagicMock | ||||
import pytest | |||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.vault.tests.vault_testing import VaultTestFixture, hash_content | from swh.vault.tests.vault_testing import hash_content | ||||
class BaseTestBackend(VaultTestFixture): | |||||
@contextlib.contextmanager | @contextlib.contextmanager | ||||
def mock_cooking(self): | def mock_cooking(vault_backend): | ||||
with patch.object(self.vault_backend, '_send_task') as mt: | with patch.object(vault_backend, '_send_task') as mt: | ||||
mt.return_value = 42 | mt.return_value = 42 | ||||
with patch('swh.vault.backend.get_cooker') as mg: | with patch('swh.vault.backend.get_cooker') as mg: | ||||
mcc = unittest.mock.MagicMock() | mcc = MagicMock() | ||||
mc = unittest.mock.MagicMock() | mc = MagicMock() | ||||
mg.return_value = mcc | mg.return_value = mcc | ||||
mcc.return_value = mc | mcc.return_value = mc | ||||
mc.check_exists.return_value = True | mc.check_exists.return_value = True | ||||
yield {'send_task': mt, | yield {'send_task': mt, | ||||
'get_cooker': mg, | 'get_cooker': mg, | ||||
'cooker_cls': mcc, | 'cooker_cls': mcc, | ||||
'cooker': mc} | 'cooker': mc} | ||||
def assertTimestampAlmostNow(self, ts, tolerance_secs=1.0): # noqa | def assertTimestampAlmostNow(ts, tolerance_secs=1.0): # noqa | ||||
now = datetime.datetime.now(datetime.timezone.utc) | now = datetime.datetime.now(datetime.timezone.utc) | ||||
creation_delta_secs = (ts - now).total_seconds() | creation_delta_secs = (ts - now).total_seconds() | ||||
self.assertLess(creation_delta_secs, tolerance_secs) | assert creation_delta_secs < tolerance_secs | ||||
def fake_cook(self, obj_type, result_content, sticky=False): | |||||
def fake_cook(backend, obj_type, result_content, sticky=False): | |||||
content, obj_id = hash_content(result_content) | content, obj_id = hash_content(result_content) | ||||
with self.mock_cooking(): | with mock_cooking(backend): | ||||
self.vault_backend.create_task(obj_type, obj_id, sticky) | backend.create_task(obj_type, obj_id, sticky) | ||||
self.vault_backend.cache.add(obj_type, obj_id, b'content') | backend.cache.add(obj_type, obj_id, b'content') | ||||
self.vault_backend.set_status(obj_type, obj_id, 'done') | backend.set_status(obj_type, obj_id, 'done') | ||||
return obj_id, content | return obj_id, content | ||||
def fail_cook(self, obj_type, obj_id, failure_reason): | |||||
with self.mock_cooking(): | def fail_cook(backend, obj_type, obj_id, failure_reason): | ||||
self.vault_backend.create_task(obj_type, obj_id) | with mock_cooking(backend): | ||||
self.vault_backend.set_status(obj_type, obj_id, 'failed') | backend.create_task(obj_type, obj_id) | ||||
self.vault_backend.set_progress(obj_type, obj_id, failure_reason) | backend.set_status(obj_type, obj_id, 'failed') | ||||
backend.set_progress(obj_type, obj_id, failure_reason) | |||||
TEST_TYPE = 'revision_gitfast' | TEST_TYPE = 'revision_gitfast' | ||||
TEST_HEX_ID = '4a4b9771542143cf070386f86b4b92d42966bdbc' | TEST_HEX_ID = '4a4b9771542143cf070386f86b4b92d42966bdbc' | ||||
TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) | TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) | ||||
TEST_PROGRESS = ("Mr. White, You're telling me you're cooking again?" | TEST_PROGRESS = ("Mr. White, You're telling me you're cooking again?" | ||||
" \N{ASTONISHED FACE} ") | " \N{ASTONISHED FACE} ") | ||||
TEST_EMAIL = 'ouiche@example.com' | TEST_EMAIL = 'ouiche@lorraine.fr' | ||||
class TestBackend(BaseTestBackend, unittest.TestCase): | def test_create_task_simple(swh_vault): | ||||
def test_create_task_simple(self): | with mock_cooking(swh_vault) as m: | ||||
with self.mock_cooking() as m: | swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) | ||||
self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) | |||||
m['get_cooker'].assert_called_once_with(TEST_TYPE) | m['get_cooker'].assert_called_once_with(TEST_TYPE) | ||||
args = m['cooker_cls'].call_args[0] | args = m['cooker_cls'].call_args[0] | ||||
self.assertEqual(args[0], TEST_TYPE) | assert args[0] == TEST_TYPE | ||||
ardumont: At some point (prior 09/2018 i think), i don't remember exactly on which diff though (possibly… | |||||
vlorentzUnsubmitted Not Done Inline ActionsThese are pytest-style tests; there is no self. And pytest does introspection magic to print the values of both operands when the test fails. vlorentz: These are pytest-style tests; there is no `self`. And pytest does introspection magic to print… | |||||
ardumontUnsubmitted Not Done Inline ActionsRight, thx. I did not see that. ardumont: Right, thx. I did not see that. | |||||
self.assertEqual(args[1], TEST_HEX_ID) | assert args[1] == TEST_HEX_ID | ||||
self.assertEqual(m['cooker'].check_exists.call_count, 1) | assert m['cooker'].check_exists.call_count == 1 | ||||
assert m['send_task'].call_count == 1 | |||||
self.assertEqual(m['send_task'].call_count, 1) | |||||
args = m['send_task'].call_args[0][0] | args = m['send_task'].call_args[0][0] | ||||
self.assertEqual(args[0], TEST_TYPE) | assert args[0] == TEST_TYPE | ||||
self.assertEqual(args[1], TEST_HEX_ID) | assert args[1] == TEST_HEX_ID | ||||
info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) | |||||
assert info['object_id'] == TEST_OBJ_ID | |||||
assert info['type'] == TEST_TYPE | |||||
assert info['task_status'] == 'new' | |||||
assert info['task_id'] == 42 | |||||
assertTimestampAlmostNow(info['ts_created']) | |||||
info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) | assert info['ts_done'] is None | ||||
self.assertEqual(info['object_id'], TEST_OBJ_ID) | assert info['progress_msg'] is None | ||||
self.assertEqual(info['type'], TEST_TYPE) | |||||
self.assertEqual(info['task_status'], 'new') | |||||
self.assertEqual(info['task_id'], 42) | |||||
self.assertTimestampAlmostNow(info['ts_created']) | |||||
self.assertEqual(info['ts_done'], None) | |||||
self.assertEqual(info['progress_msg'], None) | |||||
def test_create_fail_duplicate_task(self): | |||||
with self.mock_cooking(): | |||||
self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) | |||||
with self.assertRaises(psycopg2.IntegrityError): | |||||
self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) | |||||
def test_create_fail_nonexisting_object(self): | |||||
with self.mock_cooking() as m: | def test_create_fail_duplicate_task(swh_vault): | ||||
with mock_cooking(swh_vault): | |||||
swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) | |||||
with pytest.raises(psycopg2.IntegrityError): | |||||
swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) | |||||
def test_create_fail_nonexisting_object(swh_vault): | |||||
with mock_cooking(swh_vault) as m: | |||||
m['cooker'].check_exists.side_effect = ValueError('Nothing here.') | m['cooker'].check_exists.side_effect = ValueError('Nothing here.') | ||||
with self.assertRaises(ValueError): | with pytest.raises(ValueError): | ||||
self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) | swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) | ||||
def test_create_set_progress(self): | def test_create_set_progress(swh_vault): | ||||
with self.mock_cooking(): | with mock_cooking(swh_vault): | ||||
self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) | swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) | ||||
info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) | info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) | ||||
self.assertEqual(info['progress_msg'], None) | assert info['progress_msg'] is None | ||||
self.vault_backend.set_progress(TEST_TYPE, TEST_OBJ_ID, | swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, | ||||
TEST_PROGRESS) | TEST_PROGRESS) | ||||
info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) | info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) | ||||
self.assertEqual(info['progress_msg'], TEST_PROGRESS) | assert info['progress_msg'] == TEST_PROGRESS | ||||
def test_create_set_status(swh_vault): | |||||
with mock_cooking(swh_vault): | |||||
swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) | |||||
info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) | |||||
assert info['task_status'] == 'new' | |||||
assert info['ts_done'] is None | |||||
swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'pending') | |||||
info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) | |||||
assert info['task_status'] == 'pending' | |||||
assert info['ts_done'] is None | |||||
swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') | |||||
info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) | |||||
assert info['task_status'] == 'done' | |||||
assertTimestampAlmostNow(info['ts_done']) | |||||
def test_create_set_status(self): | def test_create_update_access_ts(swh_vault): | ||||
with self.mock_cooking(): | with mock_cooking(swh_vault): | ||||
self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) | swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) | ||||
info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) | |||||
self.assertEqual(info['task_status'], 'new') | |||||
self.assertEqual(info['ts_done'], None) | |||||
self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'pending') | |||||
info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) | |||||
self.assertEqual(info['task_status'], 'pending') | |||||
self.assertEqual(info['ts_done'], None) | |||||
self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') | |||||
info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) | |||||
self.assertEqual(info['task_status'], 'done') | |||||
self.assertTimestampAlmostNow(info['ts_done']) | |||||
def test_create_update_access_ts(self): | |||||
with self.mock_cooking(): | |||||
self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) | |||||
info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) | info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) | ||||
access_ts_1 = info['ts_last_access'] | access_ts_1 = info['ts_last_access'] | ||||
self.assertTimestampAlmostNow(access_ts_1) | assertTimestampAlmostNow(access_ts_1) | ||||
self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID) | swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) | ||||
info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) | info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) | ||||
access_ts_2 = info['ts_last_access'] | access_ts_2 = info['ts_last_access'] | ||||
self.assertTimestampAlmostNow(access_ts_2) | assertTimestampAlmostNow(access_ts_2) | ||||
self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID) | swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) | ||||
info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) | info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) | ||||
access_ts_3 = info['ts_last_access'] | access_ts_3 = info['ts_last_access'] | ||||
self.assertTimestampAlmostNow(access_ts_3) | assertTimestampAlmostNow(access_ts_3) | ||||
self.assertLess(access_ts_1, access_ts_2) | assert access_ts_1 < access_ts_2 | ||||
self.assertLess(access_ts_2, access_ts_3) | assert access_ts_2 < access_ts_3 | ||||
def test_cook_request_idempotent(self): | |||||
with self.mock_cooking(): | |||||
info1 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) | |||||
info2 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) | |||||
info3 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) | |||||
self.assertEqual(info1, info2) | |||||
self.assertEqual(info1, info3) | |||||
def test_cook_email_pending_done(self): | |||||
with self.mock_cooking(), \ | |||||
patch.object(self.vault_backend, 'add_notif_email') as madd, \ | |||||
patch.object(self.vault_backend, 'send_notification') as msend: | |||||
self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) | def test_cook_request_idempotent(swh_vault): | ||||
with mock_cooking(swh_vault): | |||||
info1 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) | |||||
info2 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) | |||||
info3 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) | |||||
assert info1 == info2 | |||||
assert info1 == info3 | |||||
def test_cook_email_pending_done(swh_vault): | |||||
with mock_cooking(swh_vault), \ | |||||
patch.object(swh_vault, 'add_notif_email') as madd, \ | |||||
patch.object(swh_vault, 'send_notification') as msend: | |||||
swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) | |||||
madd.assert_not_called() | madd.assert_not_called() | ||||
msend.assert_not_called() | msend.assert_not_called() | ||||
madd.reset_mock() | madd.reset_mock() | ||||
msend.reset_mock() | msend.reset_mock() | ||||
self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, | swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, | ||||
email=TEST_EMAIL) | email=TEST_EMAIL) | ||||
madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) | madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) | ||||
msend.assert_not_called() | msend.assert_not_called() | ||||
madd.reset_mock() | madd.reset_mock() | ||||
msend.reset_mock() | msend.reset_mock() | ||||
self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') | swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') | ||||
self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, | swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, | ||||
email=TEST_EMAIL) | email=TEST_EMAIL) | ||||
msend.assert_called_once_with(None, TEST_EMAIL, | msend.assert_called_once_with(None, TEST_EMAIL, | ||||
TEST_TYPE, TEST_OBJ_ID, 'done') | TEST_TYPE, TEST_OBJ_ID, 'done') | ||||
madd.assert_not_called() | madd.assert_not_called() | ||||
def test_send_all_emails(self): | |||||
with self.mock_cooking(): | def test_send_all_emails(swh_vault): | ||||
with mock_cooking(swh_vault): | |||||
emails = ('a@example.com', | emails = ('a@example.com', | ||||
'billg@example.com', | 'billg@example.com', | ||||
'test+42@example.org') | 'test+42@example.org') | ||||
for email in emails: | for email in emails: | ||||
self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, | swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, | ||||
email=email) | email=email) | ||||
self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') | swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') | ||||
with patch.object(self.vault_backend, 'smtp_server') as m: | with patch.object(swh_vault, 'smtp_server') as m: | ||||
self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) | swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) | ||||
sent_emails = {k[0][0] for k in m.send_message.call_args_list} | sent_emails = {k[0][0] for k in m.send_message.call_args_list} | ||||
self.assertEqual({k['To'] for k in sent_emails}, set(emails)) | assert {k['To'] for k in sent_emails} == set(emails) | ||||
for e in sent_emails: | for e in sent_emails: | ||||
self.assertIn('info@softwareheritage.org', e['From']) | assert 'info@softwareheritage.org' in e['From'] | ||||
self.assertIn(TEST_TYPE, e['Subject']) | assert TEST_TYPE in e['Subject'] | ||||
self.assertIn(TEST_HEX_ID[:5], e['Subject']) | assert TEST_HEX_ID[:5] in e['Subject'] | ||||
self.assertIn(TEST_TYPE, str(e)) | assert TEST_TYPE in str(e) | ||||
self.assertIn('https://archive.softwareheritage.org/', str(e)) | assert 'https://archive.softwareheritage.org/' in str(e) | ||||
self.assertIn(TEST_HEX_ID[:5], str(e)) | assert TEST_HEX_ID[:5] in str(e) | ||||
self.assertIn('--\x20\n', str(e)) # Well-formated signature!!! | assert '--\x20\n' in str(e) # Well-formated signature!!! | ||||
# Check that the entries have been deleted and recalling the | # Check that the entries have been deleted and recalling the | ||||
# function does not re-send the e-mails | # function does not re-send the e-mails | ||||
m.reset_mock() | m.reset_mock() | ||||
self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) | swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) | ||||
m.assert_not_called() | m.assert_not_called() | ||||
def test_available(self): | |||||
self.assertFalse(self.vault_backend.is_available(TEST_TYPE, | |||||
TEST_OBJ_ID)) | |||||
with self.mock_cooking(): | |||||
self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) | |||||
self.assertFalse(self.vault_backend.is_available(TEST_TYPE, | |||||
TEST_OBJ_ID)) | |||||
self.vault_backend.cache.add(TEST_TYPE, TEST_OBJ_ID, b'content') | |||||
self.assertFalse(self.vault_backend.is_available(TEST_TYPE, | |||||
TEST_OBJ_ID)) | |||||
self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') | |||||
self.assertTrue(self.vault_backend.is_available(TEST_TYPE, | |||||
TEST_OBJ_ID)) | |||||
def test_fetch(self): | |||||
self.assertEqual(self.vault_backend.fetch(TEST_TYPE, TEST_OBJ_ID), | |||||
None) | |||||
obj_id, content = self.fake_cook(TEST_TYPE, b'content') | |||||
info = self.vault_backend.task_info(TEST_TYPE, obj_id) | def test_available(swh_vault): | ||||
assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) | |||||
with mock_cooking(swh_vault): | |||||
swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) | |||||
assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) | |||||
swh_vault.cache.add(TEST_TYPE, TEST_OBJ_ID, b'content') | |||||
assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) | |||||
swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') | |||||
assert swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) | |||||
def test_fetch(swh_vault): | |||||
assert swh_vault.fetch(TEST_TYPE, TEST_OBJ_ID) is None | |||||
obj_id, content = fake_cook(swh_vault, TEST_TYPE, b'content') | |||||
info = swh_vault.task_info(TEST_TYPE, obj_id) | |||||
access_ts_before = info['ts_last_access'] | access_ts_before = info['ts_last_access'] | ||||
self.assertEqual(self.vault_backend.fetch(TEST_TYPE, obj_id), | assert swh_vault.fetch(TEST_TYPE, obj_id) == b'content' | ||||
b'content') | |||||
info = self.vault_backend.task_info(TEST_TYPE, obj_id) | info = swh_vault.task_info(TEST_TYPE, obj_id) | ||||
access_ts_after = info['ts_last_access'] | access_ts_after = info['ts_last_access'] | ||||
self.assertTimestampAlmostNow(access_ts_after) | assertTimestampAlmostNow(access_ts_after) | ||||
self.assertLess(access_ts_before, access_ts_after) | assert access_ts_before < access_ts_after | ||||
def test_cache_expire_oldest(self): | |||||
def test_cache_expire_oldest(swh_vault): | |||||
r = range(1, 10) | r = range(1, 10) | ||||
inserted = {} | inserted = {} | ||||
for i in r: | for i in r: | ||||
sticky = (i == 5) | sticky = (i == 5) | ||||
content = b'content%s' % str(i).encode() | content = b'content%s' % str(i).encode() | ||||
obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) | obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) | ||||
inserted[i] = (obj_id, content) | inserted[i] = (obj_id, content) | ||||
self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) | swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) | ||||
self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) | swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) | ||||
self.vault_backend.cache_expire_oldest(n=4) | swh_vault.cache_expire_oldest(n=4) | ||||
should_be_still_here = {2, 3, 5, 8, 9} | should_be_still_here = {2, 3, 5, 8, 9} | ||||
for i in r: | for i in r: | ||||
self.assertEqual(self.vault_backend.is_available( | assert swh_vault.is_available( | ||||
TEST_TYPE, inserted[i][0]), i in should_be_still_here) | TEST_TYPE, inserted[i][0]) == (i in should_be_still_here) | ||||
def test_cache_expire_until(self): | def test_cache_expire_until(swh_vault): | ||||
r = range(1, 10) | r = range(1, 10) | ||||
inserted = {} | inserted = {} | ||||
for i in r: | for i in r: | ||||
sticky = (i == 5) | sticky = (i == 5) | ||||
content = b'content%s' % str(i).encode() | content = b'content%s' % str(i).encode() | ||||
obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) | obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) | ||||
inserted[i] = (obj_id, content) | inserted[i] = (obj_id, content) | ||||
if i == 7: | if i == 7: | ||||
cutoff_date = datetime.datetime.now() | cutoff_date = datetime.datetime.now() | ||||
self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) | swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) | ||||
self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) | swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) | ||||
self.vault_backend.cache_expire_until(date=cutoff_date) | swh_vault.cache_expire_until(date=cutoff_date) | ||||
should_be_still_here = {2, 3, 5, 8, 9} | should_be_still_here = {2, 3, 5, 8, 9} | ||||
for i in r: | for i in r: | ||||
self.assertEqual(self.vault_backend.is_available( | assert swh_vault.is_available( | ||||
TEST_TYPE, inserted[i][0]), i in should_be_still_here) | TEST_TYPE, inserted[i][0]) == (i in should_be_still_here) | ||||
def test_fail_cook_simple(swh_vault): | |||||
fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, 'error42') | |||||
assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) | |||||
info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) | |||||
assert info['progress_msg'] == 'error42' | |||||
def test_fail_cook_simple(self): | def test_send_failure_email(swh_vault): | ||||
self.fail_cook(TEST_TYPE, TEST_OBJ_ID, 'error42') | with mock_cooking(swh_vault): | ||||
self.assertFalse(self.vault_backend.is_available(TEST_TYPE, | swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email='a@example.com') | ||||
TEST_OBJ_ID)) | |||||
info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) | |||||
self.assertEqual(info['progress_msg'], 'error42') | |||||
def test_send_failure_email(self): | |||||
with self.mock_cooking(): | |||||
self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, | |||||
email='a@example.com') | |||||
self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'failed') | swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'failed') | ||||
self.vault_backend.set_progress(TEST_TYPE, TEST_OBJ_ID, 'test error') | swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, 'test error') | ||||
with patch.object(self.vault_backend, 'smtp_server') as m: | with patch.object(swh_vault, 'smtp_server') as m: | ||||
self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) | swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) | ||||
e = [k[0][0] for k in m.send_message.call_args_list][0] | e = [k[0][0] for k in m.send_message.call_args_list][0] | ||||
self.assertEqual(e['To'], 'a@example.com') | assert e['To'] == 'a@example.com' | ||||
self.assertIn('info@softwareheritage.org', e['From']) | assert 'info@softwareheritage.org' in e['From'] | ||||
self.assertIn(TEST_TYPE, e['Subject']) | assert TEST_TYPE in e['Subject'] | ||||
self.assertIn(TEST_HEX_ID[:5], e['Subject']) | assert TEST_HEX_ID[:5] in e['Subject'] | ||||
self.assertIn('fail', e['Subject']) | assert 'fail' in e['Subject'] | ||||
self.assertIn(TEST_TYPE, str(e)) | assert TEST_TYPE in str(e) | ||||
self.assertIn(TEST_HEX_ID[:5], str(e)) | assert TEST_HEX_ID[:5] in str(e) | ||||
self.assertIn('test error', str(e)) | assert 'test error' in str(e) | ||||
self.assertIn('--\x20\n', str(e)) # Well-formated signature | assert '--\x20\n' in str(e) # Well-formated signature | ||||
def test_retry_failed_bundle(self): | |||||
self.fail_cook(TEST_TYPE, TEST_OBJ_ID, 'error42') | def test_retry_failed_bundle(swh_vault): | ||||
info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) | fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, 'error42') | ||||
self.assertEqual(info['task_status'], 'failed') | info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) | ||||
with self.mock_cooking(): | assert info['task_status'] == 'failed' | ||||
self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) | with mock_cooking(swh_vault): | ||||
info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) | swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) | ||||
self.assertEqual(info['task_status'], 'new') | info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) | ||||
assert info['task_status'] == 'new' |
At some point (prior 09/2018 i think), i don't remember exactly on which diff though (possibly related to deposit), we agreed it was best to use self.assertEqual instead. Even though, that's not what is really tested here.
On the positive argument that message from self.assertEqual convey better information on the error than the assert one.