diff --git a/swh/vault/backend.py b/swh/vault/backend.py index bc51160..23cd194 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,551 +1,551 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections from email.mime.text import MIMEText import logging import smtplib from typing import Any, Dict, List, Optional, Tuple import psycopg2.extras import psycopg2.pool import sentry_sdk from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.model.swhids import CoreSWHID from swh.scheduler import get_scheduler from swh.scheduler.utils import create_oneshot_task_dict from swh.storage import get_storage from swh.vault.cache import VaultCache from swh.vault.cookers import COOKER_TYPES, get_cooker_cls from swh.vault.exc import NotFoundExc logger = logging.getLogger(__name__) cooking_task_name = "swh.vault.cooking_tasks.SWHCookingTask" NOTIF_EMAIL_FROM = '"Software Heritage Vault" ' "" NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {bundle_type} {short_id}" NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {bundle_type} {short_id}" NOTIF_EMAIL_BODY_SUCCESS = """ You have requested the following bundle from the Software Heritage Vault: Bundle Type: {bundle_type} Object SWHID: {swhid} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ NOTIF_EMAIL_BODY_FAILURE = """ You have requested the following bundle from the Software Heritage Vault: Bundle Type: {bundle_type} Object SWHID: {swhid} This bundle could not be cooked for the following reason: {progress_msg} We apologize for the inconvenience. --\x20 The Software Heritage Developers """ class VaultBackend: """ Backend for the Software Heritage Vault. """ current_version = 4 def __init__(self, **config): self.config = config self.cache = VaultCache(**config["cache"]) self.scheduler = get_scheduler(**config["scheduler"]) self.storage = get_storage(**config["storage"]) if "db" not in self.config: raise ValueError( "The 'db' configuration entry is missing " "in the vault configuration file" ) db_conn = config["db"] self._pool = psycopg2.pool.ThreadedConnectionPool( config.get("min_pool_conns", 1), config.get("max_pool_conns", 10), db_conn, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() @db_transaction() def progress( self, bundle_type: str, swhid: CoreSWHID, raise_notfound: bool = True, db=None, cur=None, ) -> Optional[Dict[str, Any]]: cur.execute( """ SELECT id, type, swhid, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND swhid = %s""", (bundle_type, str(swhid)), ) res = cur.fetchone() if not res: if raise_notfound: raise NotFoundExc(f"{bundle_type} {swhid} was not found.") return None res["swhid"] = CoreSWHID.from_string(res["swhid"]) return res def _send_task(self, bundle_type: str, swhid: CoreSWHID): """Send a cooking task to the celery scheduler""" task = create_oneshot_task_dict("cook-vault-bundle", bundle_type, str(swhid)) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]["id"] @db_transaction() def create_task( self, bundle_type: str, swhid: CoreSWHID, sticky: bool = False, db=None, cur=None, ): """Create and send a cooking task""" cooker_class = get_cooker_cls(bundle_type, swhid.object_type) cooker = cooker_class(swhid, backend=self, storage=self.storage) if not cooker.check_exists(): raise NotFoundExc(f"{bundle_type} {swhid} was not found.") cur.execute( """ INSERT INTO vault_bundle (type, swhid, sticky) VALUES (%s, %s, %s)""", (bundle_type, str(swhid), sticky), ) db.conn.commit() task_id = self._send_task(bundle_type, swhid) cur.execute( """ UPDATE vault_bundle SET task_id = %s WHERE type = %s AND swhid = %s""", (task_id, bundle_type, str(swhid)), ) @db_transaction() def add_notif_email( self, bundle_type: str, swhid: CoreSWHID, email: str, db=None, cur=None ): """Add an e-mail address to notify when a given bundle is ready""" cur.execute( """ INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND swhid = %s))""", (email, bundle_type, str(swhid)), ) def put_bundle(self, bundle_type: str, swhid: CoreSWHID, bundle) -> bool: self.cache.add(bundle_type, swhid, bundle) return True @db_transaction() def cook( self, bundle_type: str, swhid: CoreSWHID, *, sticky: bool = False, email: Optional[str] = None, db=None, cur=None, ) -> Dict[str, Any]: info = self.progress(bundle_type, swhid, raise_notfound=False) if bundle_type not in COOKER_TYPES: raise NotFoundExc(f"{bundle_type} is an unknown type.") # If there's a failed bundle entry, delete it first. if info is not None and info["task_status"] == "failed": cur.execute( "DELETE FROM vault_bundle WHERE type = %s AND swhid = %s", (bundle_type, str(swhid)), ) db.conn.commit() info = None # If there's no bundle entry, create the task. if info is None: self.create_task(bundle_type, swhid, sticky) if email is not None: # If the task is already done, send the email directly if info is not None and info["task_status"] == "done": self.send_notification( None, email, bundle_type, swhid, info["task_status"] ) # Else, add it to the notification queue else: self.add_notif_email(bundle_type, swhid, email) return self.progress(bundle_type, swhid) @db_transaction() def batch_cook( self, batch: List[Tuple[str, str]], db=None, cur=None ) -> Dict[str, int]: # Import execute_values at runtime only, because it requires # psycopg2 >= 2.7 (only available on postgresql servers) from psycopg2.extras import execute_values for bundle_type, _ in batch: if bundle_type not in COOKER_TYPES: raise NotFoundExc(f"{bundle_type} is an unknown type.") cur.execute( """ INSERT INTO vault_batch (id) VALUES (DEFAULT) RETURNING id""" ) batch_id = cur.fetchone()["id"] # Delete all failed bundles from the batch cur.execute( """ DELETE FROM vault_bundle WHERE task_status = 'failed' AND (type, swhid) IN %s""", (tuple(batch),), ) # Insert all the bundles, return the new ones execute_values( cur, """ INSERT INTO vault_bundle (type, swhid) VALUES %s ON CONFLICT DO NOTHING""", batch, ) # Get the bundle ids and task status cur.execute( """ SELECT id, type, swhid, task_id FROM vault_bundle WHERE (type, swhid) IN %s""", (tuple(batch),), ) bundles = cur.fetchall() # Insert the batch-bundle entries batch_id_bundle_ids = [(batch_id, row["id"]) for row in bundles] execute_values( cur, """ INSERT INTO vault_batch_bundle (batch_id, bundle_id) VALUES %s ON CONFLICT DO NOTHING""", batch_id_bundle_ids, ) db.conn.commit() # Get the tasks to fetch batch_new = [ (row["type"], CoreSWHID.from_string(row["swhid"])) for row in bundles if row["task_id"] is None ] # Send the tasks args_batch = [(bundle_type, swhid) for bundle_type, swhid in batch_new] # TODO: change once the scheduler handles priority tasks tasks = [ create_oneshot_task_dict("swh-vault-batch-cooking", *args) for args in args_batch ] added_tasks = self.scheduler.create_tasks(tasks) tasks_ids_bundle_ids = [ (task_id, bundle_type, swhid) for task_id, (bundle_type, swhid) in zip( [task["id"] for task in added_tasks], batch_new ) ] # Update the task ids execute_values( cur, """ UPDATE vault_bundle SET task_id = s_task_id FROM (VALUES %s) AS sub (s_task_id, s_type, s_swhid) WHERE type = s_type::cook_type AND swhid = s_swhid """, tasks_ids_bundle_ids, ) return {"id": batch_id} @db_transaction() def batch_progress(self, batch_id: int, db=None, cur=None) -> Dict[str, Any]: cur.execute( """ SELECT vault_bundle.id as id, type, swhid, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_batch_bundle LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id WHERE batch_id = %s""", (batch_id,), ) bundles = cur.fetchall() if not bundles: raise NotFoundExc(f"Batch {batch_id} does not exist.") for bundle in bundles: bundle["swhid"] = CoreSWHID.from_string(bundle["swhid"]) counter = collections.Counter(b["status"] for b in bundles) res = { "bundles": bundles, "total": len(bundles), **{k: 0 for k in ("new", "pending", "done", "failed")}, **dict(counter), } return res @db_transaction() def is_available(self, bundle_type: str, swhid: CoreSWHID, db=None, cur=None): """Check whether a bundle is available for retrieval""" info = self.progress(bundle_type, swhid, raise_notfound=False, cur=cur) return ( info is not None and info["task_status"] == "done" and self.cache.is_cached(bundle_type, swhid) ) @db_transaction() def fetch( self, bundle_type: str, swhid: CoreSWHID, raise_notfound=True, db=None, cur=None ) -> Optional[bytes]: """Retrieve a bundle from the cache""" available = self.is_available(bundle_type, swhid, cur=cur) if not available: if raise_notfound: raise NotFoundExc(f"{bundle_type} {swhid} is not available.") return None self.update_access_ts(bundle_type, swhid, cur=cur) return self.cache.get(bundle_type, swhid) @db_transaction() def update_access_ts(self, bundle_type: str, swhid: CoreSWHID, db=None, cur=None): """Update the last access timestamp of a bundle""" cur.execute( """ UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND swhid = %s""", (bundle_type, str(swhid)), ) @db_transaction() def set_status( self, bundle_type: str, swhid: CoreSWHID, status: str, db=None, cur=None ) -> bool: req = ( """ UPDATE vault_bundle SET task_status = %s """ + (""", ts_done = NOW() """ if status == "done" else "") + """WHERE type = %s AND swhid = %s""" ) cur.execute(req, (status, bundle_type, str(swhid))) return True @db_transaction() def set_progress( self, bundle_type: str, swhid: CoreSWHID, progress: str, db=None, cur=None ) -> bool: cur.execute( """ UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND swhid = %s""", (progress, bundle_type, str(swhid)), ) return True @db_transaction() def send_notif(self, bundle_type: str, swhid: CoreSWHID, db=None, cur=None) -> bool: cur.execute( """ SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.swhid = %s""", (bundle_type, str(swhid)), ) for d in cur: self.send_notification( d["id"], d["email"], bundle_type, swhid, status=d["task_status"], progress_msg=d["progress_msg"], ) return True @db_transaction() def send_notification( self, n_id: Optional[int], email: str, bundle_type: str, swhid: CoreSWHID, status: str, progress_msg: Optional[str] = None, db=None, cur=None, ) -> None: """Send the notification of a bundle to a specific e-mail""" short_id = swhid.object_id.hex()[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = "https://archive.softwareheritage.org/api/1/vault/{}/{}/" "raw".format( - bundle_type, swhid + bundle_type.replace("_", "-"), swhid ) if status == "done": text = NOTIF_EMAIL_BODY_SUCCESS.strip() text = text.format(bundle_type=bundle_type, swhid=swhid, url=url) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_SUCCESS.format( bundle_type=bundle_type, short_id=short_id ) elif status == "failed": text = NOTIF_EMAIL_BODY_FAILURE.strip() text = text.format( bundle_type=bundle_type, swhid=swhid, progress_msg=progress_msg ) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_FAILURE.format( bundle_type=bundle_type, short_id=short_id ) else: raise RuntimeError( "send_notification called on a '{}' bundle".format(status) ) msg["From"] = NOTIF_EMAIL_FROM msg["To"] = email self._smtp_send(msg) if n_id is not None: cur.execute( """ DELETE FROM vault_notif_email WHERE id = %s""", (n_id,), ) def _smtp_send(self, msg: MIMEText): smtp_server = smtplib.SMTP(**self.config.get("smtp", {})) try: status = smtp_server.noop()[0] except smtplib.SMTPException: status = -1 if status != 250: error_message = ( f"Unable to send SMTP message '{msg['Subject']}' to " f"{msg['To']}: cannot connect to server" ) logger.error(error_message) sentry_sdk.capture_message(error_message, "error") else: try: # Send the message smtp_server.send_message(msg) except smtplib.SMTPException as exc: logger.exception(exc) error_message = ( f"Unable to send SMTP message '{msg['Subject']}' to " f"{msg['To']}: {exc}" ) sentry_sdk.capture_message(error_message, "error") @db_transaction() def _cache_expire(self, cond, *args, db=None, cur=None) -> None: """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cur.execute( """ DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, swhid """.format( cond ), args, ) for d in cur: self.cache.delete(d["type"], CoreSWHID.from_string(d["swhid"])) @db_transaction() def cache_expire_oldest(self, n=1, by="last_access", db=None, cur=None) -> None: """Expire the `n` oldest bundles""" assert by in ("created", "done", "last_access") filter = """ORDER BY ts_{} LIMIT {}""".format(by, n) return self._cache_expire(filter) @db_transaction() def cache_expire_until(self, date, by="last_access", db=None, cur=None) -> None: """Expire all the bundles until a certain date""" assert by in ("created", "done", "last_access") filter = """AND ts_{} <= %s""".format(by) return self._cache_expire(filter, date) diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py index b06b435..87bce1d 100644 --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -1,414 +1,419 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime import re import smtplib from unittest.mock import MagicMock, patch import attr import psycopg2 import pytest from swh.core.sentry import init_sentry from swh.model.model import Content from swh.model.swhids import CoreSWHID from swh.vault.exc import NotFoundExc from swh.vault.tests.vault_testing import hash_content @contextlib.contextmanager def mock_cooking(vault_backend): with patch.object(vault_backend, "_send_task") as mt: mt.return_value = 42 with patch("swh.vault.backend.get_cooker_cls") as mg: mcc = MagicMock() mc = MagicMock() mg.return_value = mcc mcc.return_value = mc mc.check_exists.return_value = True yield { "_send_task": mt, "get_cooker_cls": mg, "cooker_cls": mcc, "cooker": mc, } def assertTimestampAlmostNow(ts, tolerance_secs=1.0): # noqa now = datetime.datetime.now(datetime.timezone.utc) creation_delta_secs = (ts - now).total_seconds() assert creation_delta_secs < tolerance_secs def fake_cook(backend, bundle_type, result_content, sticky=False): swhid = Content.from_data(result_content).swhid() content, obj_id = hash_content(result_content) with mock_cooking(backend): backend.create_task(bundle_type, swhid, sticky) backend.cache.add(bundle_type, swhid, b"content") backend.set_status(bundle_type, swhid, "done") return swhid, content def fail_cook(backend, bundle_type, swhid, failure_reason): with mock_cooking(backend): backend.create_task(bundle_type, swhid) backend.set_status(bundle_type, swhid, "failed") backend.set_progress(bundle_type, swhid, failure_reason) -TEST_TYPE = "gitfast" +TEST_TYPE = "git_bare" TEST_SWHID = CoreSWHID.from_string("swh:1:rev:4a4b9771542143cf070386f86b4b92d42966bdbc") TEST_PROGRESS = ( "Mr. White, You're telling me you're cooking again? \N{ASTONISHED FACE} " ) TEST_EMAIL = "ouiche@lorraine.fr" @pytest.fixture def swh_vault(swh_vault, sample_data): # make the vault's storage consistent with test data revision = attr.evolve(sample_data.revision, id=TEST_SWHID.object_id) swh_vault.storage.revision_add([revision]) return swh_vault def test_create_task_simple(swh_vault): with mock_cooking(swh_vault) as m: swh_vault.create_task(TEST_TYPE, TEST_SWHID) m["get_cooker_cls"].assert_called_once_with(TEST_TYPE, TEST_SWHID.object_type) args = m["cooker_cls"].call_args[0] assert args[0] == TEST_SWHID assert m["cooker"].check_exists.call_count == 1 assert m["_send_task"].call_count == 1 args = m["_send_task"].call_args[0] assert args[0] == TEST_TYPE assert args[1] == TEST_SWHID info = swh_vault.progress(TEST_TYPE, TEST_SWHID) assert info["swhid"] == TEST_SWHID assert info["type"] == TEST_TYPE assert info["task_status"] == "new" assert info["task_id"] == 42 assertTimestampAlmostNow(info["ts_created"]) assert info["ts_done"] is None assert info["progress_msg"] is None def test_create_fail_duplicate_task(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_SWHID) with pytest.raises(psycopg2.IntegrityError): swh_vault.create_task(TEST_TYPE, TEST_SWHID) def test_create_fail_nonexisting_object(swh_vault): with mock_cooking(swh_vault) as m: m["cooker"].check_exists.side_effect = ValueError("Nothing here.") with pytest.raises(ValueError): swh_vault.create_task(TEST_TYPE, TEST_SWHID) def test_create_set_progress(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_SWHID) info = swh_vault.progress(TEST_TYPE, TEST_SWHID) assert info["progress_msg"] is None swh_vault.set_progress(TEST_TYPE, TEST_SWHID, TEST_PROGRESS) info = swh_vault.progress(TEST_TYPE, TEST_SWHID) assert info["progress_msg"] == TEST_PROGRESS def test_create_set_status(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_SWHID) info = swh_vault.progress(TEST_TYPE, TEST_SWHID) assert info["task_status"] == "new" assert info["ts_done"] is None swh_vault.set_status(TEST_TYPE, TEST_SWHID, "pending") info = swh_vault.progress(TEST_TYPE, TEST_SWHID) assert info["task_status"] == "pending" assert info["ts_done"] is None swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") info = swh_vault.progress(TEST_TYPE, TEST_SWHID) assert info["task_status"] == "done" assertTimestampAlmostNow(info["ts_done"]) def test_create_update_access_ts(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_SWHID) info = swh_vault.progress(TEST_TYPE, TEST_SWHID) access_ts_1 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_1) swh_vault.update_access_ts(TEST_TYPE, TEST_SWHID) info = swh_vault.progress(TEST_TYPE, TEST_SWHID) access_ts_2 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_2) swh_vault.update_access_ts(TEST_TYPE, TEST_SWHID) info = swh_vault.progress(TEST_TYPE, TEST_SWHID) access_ts_3 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_3) assert access_ts_1 < access_ts_2 assert access_ts_2 < access_ts_3 def test_cook_idempotent(swh_vault, sample_data): with mock_cooking(swh_vault): info1 = swh_vault.cook(TEST_TYPE, TEST_SWHID) info2 = swh_vault.cook(TEST_TYPE, TEST_SWHID) info3 = swh_vault.cook(TEST_TYPE, TEST_SWHID) assert info1 == info2 assert info1 == info3 def test_cook_email_pending_done(swh_vault): with mock_cooking(swh_vault), patch.object( swh_vault, "add_notif_email" ) as madd, patch.object(swh_vault, "send_notification") as msend: swh_vault.cook(TEST_TYPE, TEST_SWHID) madd.assert_not_called() msend.assert_not_called() madd.reset_mock() msend.reset_mock() swh_vault.cook(TEST_TYPE, TEST_SWHID, email=TEST_EMAIL) madd.assert_called_once_with(TEST_TYPE, TEST_SWHID, TEST_EMAIL) msend.assert_not_called() madd.reset_mock() msend.reset_mock() swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") swh_vault.cook(TEST_TYPE, TEST_SWHID, email=TEST_EMAIL) msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_SWHID, "done") madd.assert_not_called() def test_send_all_emails(swh_vault): with mock_cooking(swh_vault): emails = ("a@example.com", "billg@example.com", "test+42@example.org") for email in emails: swh_vault.cook(TEST_TYPE, TEST_SWHID, email=email) swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") with patch.object(swh_vault, "_smtp_send") as m: swh_vault.send_notif(TEST_TYPE, TEST_SWHID) sent_emails = {k[0][0] for k in m.call_args_list} assert {k["To"] for k in sent_emails} == set(emails) + download_url = ( + "https://archive.softwareheritage.org/api/1/vault/" + f"{TEST_TYPE.replace('_', '-')}/{str(TEST_SWHID)}/raw" + ) + for e in sent_emails: assert "bot@softwareheritage.org" in e["From"] assert TEST_TYPE in e["Subject"] assert TEST_SWHID.object_id.hex()[:5] in e["Subject"] assert TEST_TYPE in str(e) - assert "https://archive.softwareheritage.org/" in str(e) + assert download_url in str(e) assert TEST_SWHID.object_id.hex()[:5] in str(e) assert "--\x20\n" in str(e) # Well-formated signature!!! # Check that the entries have been deleted and recalling the # function does not re-send the e-mails m.reset_mock() swh_vault.send_notif(TEST_TYPE, TEST_SWHID) m.assert_not_called() def test_send_email_error_no_smtp(swh_vault): reports = [] init_sentry("http://example.org", extra_kwargs={"transport": reports.append}) emails = ("a@example.com", "billg@example.com", "test+42@example.org") with mock_cooking(swh_vault): for email in emails: swh_vault.cook(TEST_TYPE, TEST_SWHID, email=email) swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") swh_vault.send_notif(TEST_TYPE, TEST_SWHID) assert len(reports) == 6 for i, email in enumerate(emails): # first report is the logger.error assert reports[2 * i]["level"] == "error" assert reports[2 * i]["logger"] == "swh.vault.backend" reg = re.compile( - "Unable to send SMTP message 'Bundle ready: gitfast [0-9a-f]{7}' " + f"Unable to send SMTP message 'Bundle ready: {TEST_TYPE} [0-9a-f]{{7}}' " f"to {email.replace('+', '[+]')}: cannot connect to server" ) assert reg.match(reports[2 * i]["logentry"]["message"]) # second is the sentry_sdk.capture_message assert reports[2 * i + 1]["level"] == "error" assert reg.match(reports[2 * i + 1]["message"]) def test_send_email_error_send_failed(swh_vault): reports = [] init_sentry("http://example.org", extra_kwargs={"transport": reports.append}) emails = ("a@example.com", "billg@example.com", "test+42@example.org") with mock_cooking(swh_vault): for email in emails: swh_vault.cook(TEST_TYPE, TEST_SWHID, email=email) swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") with patch("smtplib.SMTP") as MockSMTP: smtp = MockSMTP.return_value smtp.noop.return_value = [250] smtp.send_message.side_effect = smtplib.SMTPHeloError(404, "HELO Failed") swh_vault.send_notif(TEST_TYPE, TEST_SWHID) assert len(reports) == 4 # first one is the captured exception assert reports[0]["level"] == "error" assert reports[0]["exception"]["values"][0]["type"] == "SMTPHeloError" # the following 3 ones are the sentry_sdk.capture_message() calls for i, email in enumerate(emails, start=1): assert reports[i]["level"] == "error" reg = re.compile( - "Unable to send SMTP message 'Bundle ready: gitfast [0-9a-f]{7}' " + f"Unable to send SMTP message 'Bundle ready: {TEST_TYPE} [0-9a-f]{{7}}' " f"to {email.replace('+', '[+]')}: [(]404, 'HELO Failed'[)]" ) assert reg.match(reports[i]["message"]) def test_available(swh_vault): assert not swh_vault.is_available(TEST_TYPE, TEST_SWHID) with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_SWHID) assert not swh_vault.is_available(TEST_TYPE, TEST_SWHID) swh_vault.cache.add(TEST_TYPE, TEST_SWHID, b"content") assert not swh_vault.is_available(TEST_TYPE, TEST_SWHID) swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") assert swh_vault.is_available(TEST_TYPE, TEST_SWHID) def test_fetch(swh_vault): assert swh_vault.fetch(TEST_TYPE, TEST_SWHID, raise_notfound=False) is None with pytest.raises( NotFoundExc, match=f"{TEST_TYPE} {TEST_SWHID} is not available." ): swh_vault.fetch(TEST_TYPE, TEST_SWHID) swhid, content = fake_cook(swh_vault, TEST_TYPE, b"content") info = swh_vault.progress(TEST_TYPE, swhid) access_ts_before = info["ts_last_access"] assert swh_vault.fetch(TEST_TYPE, swhid) == b"content" info = swh_vault.progress(TEST_TYPE, swhid) access_ts_after = info["ts_last_access"] assertTimestampAlmostNow(access_ts_after) assert access_ts_before < access_ts_after def test_cache_expire_oldest(swh_vault): r = range(1, 10) inserted = {} for i in r: sticky = i == 5 content = b"content%s" % str(i).encode() swhid, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) inserted[i] = (swhid, content) swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) swh_vault.cache_expire_oldest(n=4) should_be_still_here = {2, 3, 5, 8, 9} for i in r: assert swh_vault.is_available(TEST_TYPE, inserted[i][0]) == ( i in should_be_still_here ) def test_cache_expire_until(swh_vault): r = range(1, 10) inserted = {} for i in r: sticky = i == 5 content = b"content%s" % str(i).encode() swhid, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) inserted[i] = (swhid, content) if i == 7: cutoff_date = datetime.datetime.now() swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) swh_vault.cache_expire_until(date=cutoff_date) should_be_still_here = {2, 3, 5, 8, 9} for i in r: assert swh_vault.is_available(TEST_TYPE, inserted[i][0]) == ( i in should_be_still_here ) def test_fail_cook_simple(swh_vault): fail_cook(swh_vault, TEST_TYPE, TEST_SWHID, "error42") assert not swh_vault.is_available(TEST_TYPE, TEST_SWHID) info = swh_vault.progress(TEST_TYPE, TEST_SWHID) assert info["progress_msg"] == "error42" def test_send_failure_email(swh_vault): with mock_cooking(swh_vault): swh_vault.cook(TEST_TYPE, TEST_SWHID, email="a@example.com") swh_vault.set_status(TEST_TYPE, TEST_SWHID, "failed") swh_vault.set_progress(TEST_TYPE, TEST_SWHID, "test error") with patch.object(swh_vault, "_smtp_send") as m: swh_vault.send_notif(TEST_TYPE, TEST_SWHID) e = [k[0][0] for k in m.call_args_list][0] assert e["To"] == "a@example.com" assert "bot@softwareheritage.org" in e["From"] assert TEST_TYPE in e["Subject"] assert TEST_SWHID.object_id.hex()[:5] in e["Subject"] assert "fail" in e["Subject"] assert TEST_TYPE in str(e) assert TEST_SWHID.object_id.hex()[:5] in str(e) assert "test error" in str(e) assert "--\x20\n" in str(e) # Well-formated signature def test_retry_failed_bundle(swh_vault): fail_cook(swh_vault, TEST_TYPE, TEST_SWHID, "error42") info = swh_vault.progress(TEST_TYPE, TEST_SWHID) assert info["task_status"] == "failed" with mock_cooking(swh_vault): swh_vault.cook(TEST_TYPE, TEST_SWHID) info = swh_vault.progress(TEST_TYPE, TEST_SWHID) assert info["task_status"] == "new"