diff --git a/swh/vault/backend.py b/swh/vault/backend.py index 3eb0670..4dd88cd 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,409 +1,409 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import smtplib import psycopg2.extras from email.mime.text import MIMEText from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.model import hashutil from swh.scheduler.utils import create_oneshot_task_dict from swh.vault.cookers import get_cooker_cls cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' NOTIF_EMAIL_FROM = ('"Software Heritage Vault" ' - '') + '') NOTIF_EMAIL_SUBJECT_SUCCESS = ("Bundle ready: {obj_type} {short_id}") NOTIF_EMAIL_SUBJECT_FAILURE = ("Bundle failed: {obj_type} {short_id}") NOTIF_EMAIL_BODY_SUCCESS = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ NOTIF_EMAIL_BODY_FAILURE = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle could not be cooked for the following reason: {progress_msg} We apologize for the inconvenience. --\x20 The Software Heritage Developers """ class NotFoundExc(Exception): """Bundle was not found.""" pass def batch_to_bytes(batch): return [(obj_type, hashutil.hash_to_bytes(obj_id)) for obj_type, obj_id in batch] class VaultBackend: """ Backend for the Software Heritage vault. """ def __init__(self, db, cache, scheduler, storage=None, **config): self.config = config self.cache = cache self.scheduler = scheduler self.storage = storage self.smtp_server = smtplib.SMTP() self._pool = psycopg2.pool.ThreadedConnectionPool( config.get('min_pool_conns', 1), config.get('max_pool_conns', 10), db, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) @db_transaction() def task_info(self, obj_type, obj_id, db=None, cur=None): """Fetch information from a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cur.execute(''' SELECT id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) res = cur.fetchone() if res: res['object_id'] = bytes(res['object_id']) return res def _send_task(self, *args): """Send a cooking task to the celery scheduler""" task = create_oneshot_task_dict('swh-vault-cooking', *args) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]['id'] @db_transaction() def create_task(self, obj_type, obj_id, sticky=False, db=None, cur=None): """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) hex_id = hashutil.hash_to_hex(obj_id) cooker_class = get_cooker_cls(obj_type) cooker = cooker_class(obj_type, hex_id, backend=self, storage=self.storage) if not cooker.check_exists(): raise NotFoundExc("Object {} was not found.".format(hex_id)) cur.execute(''' INSERT INTO vault_bundle (type, object_id, sticky) VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky)) db.conn.commit() task_id = self._send_task(obj_type, hex_id) cur.execute(''' UPDATE vault_bundle SET task_id = %s WHERE type = %s AND object_id = %s''', (task_id, obj_type, obj_id)) @db_transaction() def add_notif_email(self, obj_type, obj_id, email, db=None, cur=None): """Add an e-mail address to notify when a given bundle is ready""" obj_id = hashutil.hash_to_bytes(obj_id) cur.execute(''' INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))''', (email, obj_type, obj_id)) @db_transaction() def cook_request(self, obj_type, obj_id, *, sticky=False, email=None, db=None, cur=None): """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" obj_id = hashutil.hash_to_bytes(obj_id) info = self.task_info(obj_type, obj_id) # If there's a failed bundle entry, delete it first. if info is not None and info['task_status'] == 'failed': cur.execute('''DELETE FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) db.conn.commit() info = None # If there's no bundle entry, create the task. if info is None: self.create_task(obj_type, obj_id, sticky) if email is not None: # If the task is already done, send the email directly if info is not None and info['task_status'] == 'done': self.send_notification(None, email, obj_type, obj_id, info['task_status']) # Else, add it to the notification queue else: self.add_notif_email(obj_type, obj_id, email) info = self.task_info(obj_type, obj_id) return info @db_transaction() def batch_cook(self, batch, db=None, cur=None): """Cook a batch of bundles and returns the cooking id.""" # Import execute_values at runtime only, because it requires # psycopg2 >= 2.7 (only available on postgresql servers) from psycopg2.extras import execute_values cur.execute(''' INSERT INTO vault_batch (id) VALUES (DEFAULT) RETURNING id''') batch_id = cur.fetchone()['id'] batch = batch_to_bytes(batch) # Delete all failed bundles from the batch cur.execute(''' DELETE FROM vault_bundle WHERE task_status = 'failed' AND (type, object_id) IN %s''', (tuple(batch),)) # Insert all the bundles, return the new ones execute_values(cur, ''' INSERT INTO vault_bundle (type, object_id) VALUES %s ON CONFLICT DO NOTHING''', batch) # Get the bundle ids and task status cur.execute(''' SELECT id, type, object_id, task_id FROM vault_bundle WHERE (type, object_id) IN %s''', (tuple(batch),)) bundles = cur.fetchall() # Insert the batch-bundle entries batch_id_bundle_ids = [(batch_id, row['id']) for row in bundles] execute_values(cur, ''' INSERT INTO vault_batch_bundle (batch_id, bundle_id) VALUES %s ON CONFLICT DO NOTHING''', batch_id_bundle_ids) db.conn.commit() # Get the tasks to fetch batch_new = [(row['type'], bytes(row['object_id'])) for row in bundles if row['task_id'] is None] # Send the tasks args_batch = [(obj_type, hashutil.hash_to_hex(obj_id)) for obj_type, obj_id in batch_new] # TODO: change once the scheduler handles priority tasks tasks = [create_oneshot_task_dict('swh-vault-batch-cooking', *args) for args in args_batch] added_tasks = self.scheduler.create_tasks(tasks) tasks_ids_bundle_ids = zip([task['id'] for task in added_tasks], batch_new) tasks_ids_bundle_ids = [(task_id, obj_type, obj_id) for task_id, (obj_type, obj_id) in tasks_ids_bundle_ids] # Update the task ids execute_values(cur, ''' UPDATE vault_bundle SET task_id = s_task_id FROM (VALUES %s) AS sub (s_task_id, s_type, s_object_id) WHERE type = s_type::cook_type AND object_id = s_object_id ''', tasks_ids_bundle_ids) return batch_id @db_transaction() def batch_info(self, batch_id, db=None, cur=None): """Fetch information from a batch of bundles""" cur.execute(''' SELECT vault_bundle.id as id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_batch_bundle LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id WHERE batch_id = %s''', (batch_id,)) res = cur.fetchall() if res: for d in res: d['object_id'] = bytes(d['object_id']) return res @db_transaction() def is_available(self, obj_type, obj_id, db=None, cur=None): """Check whether a bundle is available for retrieval""" info = self.task_info(obj_type, obj_id, cur=cur) return (info is not None and info['task_status'] == 'done' and self.cache.is_cached(obj_type, obj_id)) @db_transaction() def fetch(self, obj_type, obj_id, db=None, cur=None): """Retrieve a bundle from the cache""" if not self.is_available(obj_type, obj_id, cur=cur): return None self.update_access_ts(obj_type, obj_id, cur=cur) return self.cache.get(obj_type, obj_id) @db_transaction() def update_access_ts(self, obj_type, obj_id, db=None, cur=None): """Update the last access timestamp of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cur.execute(''' UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) @db_transaction() def set_status(self, obj_type, obj_id, status, db=None, cur=None): """Set the cooking status of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) req = (''' UPDATE vault_bundle SET task_status = %s ''' + (''', ts_done = NOW() ''' if status == 'done' else '') + '''WHERE type = %s AND object_id = %s''') cur.execute(req, (status, obj_type, obj_id)) @db_transaction() def set_progress(self, obj_type, obj_id, progress, db=None, cur=None): """Set the cooking progress of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cur.execute(''' UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s''', (progress, obj_type, obj_id)) @db_transaction() def send_all_notifications(self, obj_type, obj_id, db=None, cur=None): """Send all the e-mails in the notification list of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cur.execute(''' SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', (obj_type, obj_id)) for d in cur: self.send_notification(d['id'], d['email'], obj_type, obj_id, status=d['task_status'], progress_msg=d['progress_msg']) @db_transaction() def send_notification(self, n_id, email, obj_type, obj_id, status, progress_msg=None, db=None, cur=None): """Send the notification of a bundle to a specific e-mail""" hex_id = hashutil.hash_to_hex(obj_id) short_id = hex_id[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = ('https://archive.softwareheritage.org/api/1/vault/{}/{}/' 'raw'.format(obj_type, hex_id)) if status == 'done': text = NOTIF_EMAIL_BODY_SUCCESS.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) msg = MIMEText(text) msg['Subject'] = (NOTIF_EMAIL_SUBJECT_SUCCESS .format(obj_type=obj_type, short_id=short_id)) elif status == 'failed': text = NOTIF_EMAIL_BODY_FAILURE.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, progress_msg=progress_msg) msg = MIMEText(text) msg['Subject'] = (NOTIF_EMAIL_SUBJECT_FAILURE .format(obj_type=obj_type, short_id=short_id)) else: raise RuntimeError("send_notification called on a '{}' bundle" .format(status)) msg['From'] = NOTIF_EMAIL_FROM msg['To'] = email self._smtp_send(msg) if n_id is not None: cur.execute(''' DELETE FROM vault_notif_email WHERE id = %s''', (n_id,)) def _smtp_send(self, msg): # Reconnect if needed try: status = self.smtp_server.noop()[0] except smtplib.SMTPException: status = -1 if status != 250: self.smtp_server.connect('localhost', 25) # Send the message self.smtp_server.send_message(msg) @db_transaction() def _cache_expire(self, cond, *args, db=None, cur=None): """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cur.execute(''' DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, object_id '''.format(cond), args) for d in cur: self.cache.delete(d['type'], bytes(d['object_id'])) @db_transaction() def cache_expire_oldest(self, n=1, by='last_access', db=None, cur=None): """Expire the `n` oldest bundles""" assert by in ('created', 'done', 'last_access') filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n) return self._cache_expire(filter) @db_transaction() def cache_expire_until(self, date, by='last_access', db=None, cur=None): """Expire all the bundles until a certain date""" assert by in ('created', 'done', 'last_access') filter = '''AND ts_{} <= %s'''.format(by) return self._cache_expire(filter, date) diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py index dd44a9a..fed6097 100644 --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -1,337 +1,337 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime import psycopg2 from unittest.mock import patch, MagicMock import pytest from swh.model import hashutil from swh.vault.tests.vault_testing import hash_content @contextlib.contextmanager def mock_cooking(vault_backend): with patch.object(vault_backend, '_send_task') as mt: mt.return_value = 42 with patch('swh.vault.backend.get_cooker_cls') as mg: mcc = MagicMock() mc = MagicMock() mg.return_value = mcc mcc.return_value = mc mc.check_exists.return_value = True yield {'_send_task': mt, 'get_cooker_cls': mg, 'cooker_cls': mcc, 'cooker': mc} def assertTimestampAlmostNow(ts, tolerance_secs=1.0): # noqa now = datetime.datetime.now(datetime.timezone.utc) creation_delta_secs = (ts - now).total_seconds() assert creation_delta_secs < tolerance_secs def fake_cook(backend, obj_type, result_content, sticky=False): content, obj_id = hash_content(result_content) with mock_cooking(backend): backend.create_task(obj_type, obj_id, sticky) backend.cache.add(obj_type, obj_id, b'content') backend.set_status(obj_type, obj_id, 'done') return obj_id, content def fail_cook(backend, obj_type, obj_id, failure_reason): with mock_cooking(backend): backend.create_task(obj_type, obj_id) backend.set_status(obj_type, obj_id, 'failed') backend.set_progress(obj_type, obj_id, failure_reason) TEST_TYPE = 'revision_gitfast' TEST_HEX_ID = '4a4b9771542143cf070386f86b4b92d42966bdbc' TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) TEST_PROGRESS = ("Mr. White, You're telling me you're cooking again?" " \N{ASTONISHED FACE} ") TEST_EMAIL = 'ouiche@lorraine.fr' def test_create_task_simple(swh_vault): with mock_cooking(swh_vault) as m: swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) m['get_cooker_cls'].assert_called_once_with(TEST_TYPE) args = m['cooker_cls'].call_args[0] assert args[0] == TEST_TYPE assert args[1] == TEST_HEX_ID assert m['cooker'].check_exists.call_count == 1 assert m['_send_task'].call_count == 1 args = m['_send_task'].call_args[0] assert args[0] == TEST_TYPE assert args[1] == TEST_HEX_ID info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info['object_id'] == TEST_OBJ_ID assert info['type'] == TEST_TYPE assert info['task_status'] == 'new' assert info['task_id'] == 42 assertTimestampAlmostNow(info['ts_created']) assert info['ts_done'] is None assert info['progress_msg'] is None def test_create_fail_duplicate_task(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) with pytest.raises(psycopg2.IntegrityError): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) def test_create_fail_nonexisting_object(swh_vault): with mock_cooking(swh_vault) as m: m['cooker'].check_exists.side_effect = ValueError('Nothing here.') with pytest.raises(ValueError): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) def test_create_set_progress(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info['progress_msg'] is None swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, TEST_PROGRESS) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info['progress_msg'] == TEST_PROGRESS def test_create_set_status(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info['task_status'] == 'new' assert info['ts_done'] is None swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'pending') info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info['task_status'] == 'pending' assert info['ts_done'] is None swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info['task_status'] == 'done' assertTimestampAlmostNow(info['ts_done']) def test_create_update_access_ts(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_1 = info['ts_last_access'] assertTimestampAlmostNow(access_ts_1) swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_2 = info['ts_last_access'] assertTimestampAlmostNow(access_ts_2) swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_3 = info['ts_last_access'] assertTimestampAlmostNow(access_ts_3) assert access_ts_1 < access_ts_2 assert access_ts_2 < access_ts_3 def test_cook_request_idempotent(swh_vault): with mock_cooking(swh_vault): info1 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) info2 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) info3 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) assert info1 == info2 assert info1 == info3 def test_cook_email_pending_done(swh_vault): with mock_cooking(swh_vault), \ patch.object(swh_vault, 'add_notif_email') as madd, \ patch.object(swh_vault, 'send_notification') as msend: swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) madd.assert_not_called() msend.assert_not_called() madd.reset_mock() msend.reset_mock() swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) msend.assert_not_called() madd.reset_mock() msend.reset_mock() swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_OBJ_ID, 'done') madd.assert_not_called() def test_send_all_emails(swh_vault): with mock_cooking(swh_vault): emails = ('a@example.com', 'billg@example.com', 'test+42@example.org') for email in emails: swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email=email) swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') with patch.object(swh_vault, 'smtp_server') as m: swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) sent_emails = {k[0][0] for k in m.send_message.call_args_list} assert {k['To'] for k in sent_emails} == set(emails) for e in sent_emails: - assert 'info@softwareheritage.org' in e['From'] + assert 'bot@softwareheritage.org' in e['From'] assert TEST_TYPE in e['Subject'] assert TEST_HEX_ID[:5] in e['Subject'] assert TEST_TYPE in str(e) assert 'https://archive.softwareheritage.org/' in str(e) assert TEST_HEX_ID[:5] in str(e) assert '--\x20\n' in str(e) # Well-formated signature!!! # Check that the entries have been deleted and recalling the # function does not re-send the e-mails m.reset_mock() swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) m.assert_not_called() def test_available(swh_vault): assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) swh_vault.cache.add(TEST_TYPE, TEST_OBJ_ID, b'content') assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') assert swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) def test_fetch(swh_vault): assert swh_vault.fetch(TEST_TYPE, TEST_OBJ_ID) is None obj_id, content = fake_cook(swh_vault, TEST_TYPE, b'content') info = swh_vault.task_info(TEST_TYPE, obj_id) access_ts_before = info['ts_last_access'] assert swh_vault.fetch(TEST_TYPE, obj_id) == b'content' info = swh_vault.task_info(TEST_TYPE, obj_id) access_ts_after = info['ts_last_access'] assertTimestampAlmostNow(access_ts_after) assert access_ts_before < access_ts_after def test_cache_expire_oldest(swh_vault): r = range(1, 10) inserted = {} for i in r: sticky = (i == 5) content = b'content%s' % str(i).encode() obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) swh_vault.cache_expire_oldest(n=4) should_be_still_here = {2, 3, 5, 8, 9} for i in r: assert swh_vault.is_available( TEST_TYPE, inserted[i][0]) == (i in should_be_still_here) def test_cache_expire_until(swh_vault): r = range(1, 10) inserted = {} for i in r: sticky = (i == 5) content = b'content%s' % str(i).encode() obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) if i == 7: cutoff_date = datetime.datetime.now() swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) swh_vault.cache_expire_until(date=cutoff_date) should_be_still_here = {2, 3, 5, 8, 9} for i in r: assert swh_vault.is_available( TEST_TYPE, inserted[i][0]) == (i in should_be_still_here) def test_fail_cook_simple(swh_vault): fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, 'error42') assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info['progress_msg'] == 'error42' def test_send_failure_email(swh_vault): with mock_cooking(swh_vault): swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email='a@example.com') swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'failed') swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, 'test error') with patch.object(swh_vault, 'smtp_server') as m: swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) e = [k[0][0] for k in m.send_message.call_args_list][0] assert e['To'] == 'a@example.com' - assert 'info@softwareheritage.org' in e['From'] + assert 'bot@softwareheritage.org' in e['From'] assert TEST_TYPE in e['Subject'] assert TEST_HEX_ID[:5] in e['Subject'] assert 'fail' in e['Subject'] assert TEST_TYPE in str(e) assert TEST_HEX_ID[:5] in str(e) assert 'test error' in str(e) assert '--\x20\n' in str(e) # Well-formated signature def test_retry_failed_bundle(swh_vault): fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, 'error42') info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info['task_status'] == 'failed' with mock_cooking(swh_vault): swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info['task_status'] == 'new'