diff --git a/sql/swh-vault-schema.sql b/sql/swh-vault-schema.sql index 5265f7a..3f5640e 100644 --- a/sql/swh-vault-schema.sql +++ b/sql/swh-vault-schema.sql @@ -1,42 +1,42 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; insert into dbversion (version, release, description) values (1, now(), 'Initial version'); create domain obj_hash as bytea; create type cook_type as enum ('directory', 'revision_gitfast'); comment on type cook_type is 'Type of the requested bundle'; -create type cook_status as enum ('new', 'pending', 'done'); +create type cook_status as enum ('new', 'pending', 'done', 'failed'); comment on type cook_status is 'Status of the cooking'; create table vault_bundle ( id bigserial primary key, type cook_type not null, -- requested cooking type object_id obj_hash not null, -- requested object ID task_id integer, -- scheduler task id task_status cook_status not null default 'new', -- status of the task sticky boolean not null default false, -- bundle cannot expire ts_created timestamptz not null default now(), -- timestamp of creation ts_done timestamptz, -- timestamp of the cooking result ts_last_access timestamptz not null default now(), -- last access progress_msg text, -- progress message unique(type, object_id) ); create table vault_notif_email ( id bigserial primary key, email text not null, -- e-mail to notify bundle_id bigint not null references vault_bundle(id) ); diff --git a/swh/vault/backend.py b/swh/vault/backend.py index 19f53db..8fdbaa8 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,333 +1,382 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import smtplib import psycopg2 import psycopg2.extras from functools import wraps from email.mime.text import MIMEText from swh.model import hashutil from swh.scheduler.backend import SchedulerBackend from swh.scheduler.utils import create_oneshot_task_dict from swh.vault.cache import VaultCache from swh.vault.cookers import get_cooker from swh.vault.cooking_tasks import SWHCookingTask # noqa cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' NOTIF_EMAIL_FROM = ('"Software Heritage Vault" ' '') -NOTIF_EMAIL_SUBJECT = ("Bundle ready: {obj_type} {short_id}") -NOTIF_EMAIL_BODY = """ +NOTIF_EMAIL_SUBJECT_SUCCESS = ("Bundle ready: {obj_type} {short_id}") +NOTIF_EMAIL_SUBJECT_FAILURE = ("Bundle failed: {obj_type} {short_id}") + +NOTIF_EMAIL_BODY_SUCCESS = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ +NOTIF_EMAIL_BODY_FAILURE = """ +You have requested the following bundle from the Software Heritage +Vault: + +Object Type: {obj_type} +Object ID: {hex_id} + +This bundle could not be cooked for the following reason: + +{progress_msg} + +We apologize for the inconvenience. + +--\x20 +The Software Heritage Developers +""" + class NotFoundExc(Exception): """Bundle was not found.""" pass # TODO: Imported from swh.scheduler.backend. Factorization needed. def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False # TODO: I don't like using None, it's confusing for the user. how about # a NEW_CURSOR object()? if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped # TODO: This has to be factorized with other database base classes and helpers # (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) # The three first methods are imported from swh.scheduler.backend. class VaultBackend: """ Backend for the Software Heritage vault. """ def __init__(self, config): self.config = config self.cache = VaultCache(self.config['cache']) self.db = None self.reconnect() self.smtp_server = smtplib.SMTP('localhost', 25) - self.scheduler = SchedulerBackend( - scheduling_db=self.config['scheduling_db']) + if self.config['scheduling_db'] is not None: + self.scheduler = SchedulerBackend( + scheduling_db=self.config['scheduling_db']) def reconnect(self): """Reconnect to the database.""" if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['db'], cursor_factory=psycopg2.extras.RealDictCursor, ) def close(self): """Close the underlying database connection.""" self.db.close() def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() @autocommit def task_info(self, obj_type, obj_id, cursor=None): """Fetch information from a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' SELECT id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) res = cursor.fetchone() if res: res['object_id'] = bytes(res['object_id']) return res def _send_task(self, args): """Send a cooking task to the celery scheduler""" task = create_oneshot_task_dict('swh-vault-cooking', *args) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]['id'] @autocommit def create_task(self, obj_type, obj_id, sticky=False, cursor=None): """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) hex_id = hashutil.hash_to_hex(obj_id) args = [obj_type, hex_id] cooker_class = get_cooker(obj_type) cooker = cooker_class(*args) if not cooker.check_exists(): raise NotFoundExc("Object {} was not found.".format(hex_id)) cursor.execute(''' INSERT INTO vault_bundle (type, object_id, sticky) VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky)) self.commit() task_id = self._send_task(args) cursor.execute(''' UPDATE vault_bundle SET task_id = %s WHERE type = %s AND object_id = %s''', (task_id, obj_type, obj_id)) @autocommit def add_notif_email(self, obj_type, obj_id, email, cursor=None): """Add an e-mail address to notify when a given bundle is ready""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))''', (email, obj_type, obj_id)) @autocommit def cook_request(self, obj_type, obj_id, *, sticky=False, email=None, cursor=None): """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" info = self.task_info(obj_type, obj_id) + + # If there's a failed bundle entry, delete it first. + if info is not None and info['task_status'] == 'failed': + cursor.execute('''DELETE FROM vault_bundle + WHERE obj_type = %s AND obj_id = %s''', + (obj_type, obj_id)) + self.commit() + info = None + + # If there's no bundle entry, create the task. if info is None: self.create_task(obj_type, obj_id, sticky) + if email is not None: + # If the task is already done, send the email directly if info is not None and info['task_status'] == 'done': self.send_notification(None, email, obj_type, obj_id) + # Else, add it to the notification queue else: self.add_notif_email(obj_type, obj_id, email) + info = self.task_info(obj_type, obj_id) return info @autocommit def is_available(self, obj_type, obj_id, cursor=None): """Check whether a bundle is available for retrieval""" info = self.task_info(obj_type, obj_id, cursor=cursor) return (info is not None and info['task_status'] == 'done' and self.cache.is_cached(obj_type, obj_id)) @autocommit def fetch(self, obj_type, obj_id, cursor=None): """Retrieve a bundle from the cache""" if not self.is_available(obj_type, obj_id, cursor=cursor): return None self.update_access_ts(obj_type, obj_id, cursor=cursor) return self.cache.get(obj_type, obj_id) @autocommit def update_access_ts(self, obj_type, obj_id, cursor=None): """Update the last access timestamp of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) @autocommit def set_status(self, obj_type, obj_id, status, cursor=None): """Set the cooking status of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) req = (''' UPDATE vault_bundle SET task_status = %s ''' + (''', ts_done = NOW() ''' if status == 'done' else '') + '''WHERE type = %s AND object_id = %s''') cursor.execute(req, (status, obj_type, obj_id)) @autocommit def set_progress(self, obj_type, obj_id, progress, cursor=None): """Set the cooking progress of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s''', (progress, obj_type, obj_id)) @autocommit def send_all_notifications(self, obj_type, obj_id, cursor=None): """Send all the e-mails in the notification list of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' - SELECT vault_notif_email.id AS id, email + SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', (obj_type, obj_id)) for d in cursor: - self.send_notification(d['id'], d['email'], obj_type, obj_id) + self.send_notification(d['id'], d['email'], obj_type, obj_id, + status=d['task_status'], + progress_msg=d['progress_msg']) @autocommit - def send_notification(self, n_id, email, obj_type, obj_id, cursor=None): + def send_notification(self, n_id, email, obj_type, obj_id, status, + progress_msg=None, cursor=None): """Send the notification of a bundle to a specific e-mail""" hex_id = hashutil.hash_to_hex(obj_id) short_id = hex_id[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = ('https://archive.softwareheritage.org/api/1/vault/{}/{}/' 'raw'.format(obj_type, hex_id)) - text = NOTIF_EMAIL_BODY.strip() - text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) - msg = MIMEText(text) - msg['Subject'] = (NOTIF_EMAIL_SUBJECT - .format(obj_type=obj_type, short_id=short_id)) + if status == 'done': + text = NOTIF_EMAIL_BODY_SUCCESS.strip() + text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) + msg = MIMEText(text) + msg['Subject'] = (NOTIF_EMAIL_SUBJECT_SUCCESS + .format(obj_type=obj_type, short_id=short_id)) + elif status == 'failed': + text = NOTIF_EMAIL_BODY_FAILURE.strip() + text = text.format(obj_type=obj_type, hex_id=hex_id, + progress_msg=progress_msg) + msg = MIMEText(text) + msg['Subject'] = (NOTIF_EMAIL_SUBJECT_FAILURE + .format(obj_type=obj_type, short_id=short_id)) + else: + raise RuntimeError("send_notification called on a '{}' bundle" + .format(status)) + msg['From'] = NOTIF_EMAIL_FROM msg['To'] = email self._smtp_send(msg) if n_id is not None: cursor.execute(''' DELETE FROM vault_notif_email WHERE id = %s''', (n_id,)) def _smtp_send(self, msg): # Reconnect if needed try: status = self.smtp_server.noop()[0] except: # smtplib.SMTPServerDisconnected status = -1 if status != 250: self.smtp_server.connect() # Send the message self.smtp_server.send_message(msg) @autocommit def _cache_expire(self, cond, *args, cursor=None): """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cursor.execute(''' DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, object_id '''.format(cond), args) for d in cursor: self.cache.delete(d['type'], bytes(d['object_id'])) @autocommit def cache_expire_oldest(self, n=1, by='last_access', cursor=None): """Expire the `n` oldest bundles""" assert by in ('created', 'done', 'last_access') filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n) return self._cache_expire(filter) @autocommit def cache_expire_until(self, date, by='last_access', cursor=None): """Expire all the bundles until a certain date""" assert by in ('created', 'done', 'last_access') filter = '''AND ts_{} <= %s'''.format(by) return self._cache_expire(filter, date) diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index 7fc6d6b..261a35a 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,229 +1,234 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import itertools import os import tarfile import tempfile from pathlib import Path from swh.core import config from swh.model import hashutil from swh.model.from_disk import mode_to_perms, DentryPerms from swh.storage import get_storage from swh.vault.api.client import RemoteVaultClient DEFAULT_CONFIG_PATH = 'vault/cooker' DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/', }, }), 'vault_url': ('str', 'http://localhost:5005/') } class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle """ CACHE_TYPE_KEY = None def __init__(self, obj_type, obj_id): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: storage: the storage object cache: the cache where to store the bundle obj_id: id of the object to be cooked into a bundle. """ self.config = config.load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG) self.obj_type = obj_type self.obj_id = hashutil.hash_to_bytes(obj_id) self.backend = RemoteVaultClient(self.config['vault_url']) self.storage = get_storage(**self.config['storage']) @abc.abstractmethod def check_exists(self): """Checks that the requested object exists and can be cooked. Override this in the cooker implementation. """ raise NotImplemented @abc.abstractmethod def prepare_bundle(self): """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplemented def cook(self): """Cook the requested object into a bundle """ self.backend.set_status(self.obj_type, self.obj_id, 'pending') self.backend.set_progress(self.obj_type, self.obj_id, 'Processing...') content_iter = self.prepare_bundle() # TODO: use proper content streaming - bundle = b''.join(content_iter) - self.backend.put_bundle(self.CACHE_TYPE_KEY, self.obj_id, bundle) - - self.backend.set_status(self.obj_type, self.obj_id, 'done') - self.backend.set_progress(self.obj_type, self.obj_id, None) - self.backend.send_notif(self.obj_type, self.obj_id) + try: + bundle = b''.join(content_iter) + except Exception as e: + self.backend.set_status(self.obj_type, self.obj_id, 'failed') + self.backend.set_progress(self.obj_type, self.obj_id, e.message) + else: + self.backend.put_bundle(self.CACHE_TYPE_KEY, self.obj_id, bundle) + self.backend.set_status(self.obj_type, self.obj_id, 'done') + self.backend.set_progress(self.obj_type, self.obj_id, None) + finally: + self.backend.send_notif(self.obj_type, self.obj_id) SKIPPED_MESSAGE = (b'This content has not been retrieved in the ' b'Software Heritage archive due to its size.') HIDDEN_MESSAGE = (b'This content is hidden.') def get_filtered_file_content(storage, file_data): """Retrieve the file specified by file_data and apply filters for skipped and missing contents. Args: storage: the storage from which to retrieve the object file_data: file entry descriptor as returned by directory_ls() Returns: Bytes containing the specified content. The content will be replaced by a specific message to indicate that the content could not be retrieved (either due to privacy policy or because its size was too big for us to archive it). """ assert file_data['type'] == 'file' if file_data['status'] == 'absent': return SKIPPED_MESSAGE elif file_data['status'] == 'hidden': return HIDDEN_MESSAGE else: return list(storage.content_get([file_data['sha1']]))[0]['data'] def get_tar_bytes(path, arcname=None): path = Path(path) if not arcname: arcname = path.name tar_buffer = io.BytesIO() tar = tarfile.open(fileobj=tar_buffer, mode='w') tar.add(str(path), arcname=arcname) return tar_buffer.getbuffer() class DirectoryBuilder: """Creates a cooked directory from its sha1_git in the db. Warning: This is NOT a directly accessible cooker, but a low-level one that executes the manipulations. """ def __init__(self, storage): self.storage = storage def get_directory_bytes(self, dir_id): # Create temporary folder to retrieve the files into. root = bytes(tempfile.mkdtemp(prefix='directory.', suffix='.cook'), 'utf8') self.build_directory(dir_id, root) # Use the created directory to make a bundle with the data as # a compressed directory. bundle_content = self._create_bundle_content( root, hashutil.hash_to_hex(dir_id)) return bundle_content def build_directory(self, dir_id, root): # Retrieve data from the database. data = self.storage.directory_ls(dir_id, recursive=True) # Split into files and directory data. # TODO(seirl): also handle revision data. data1, data2 = itertools.tee(data, 2) dir_data = (entry['name'] for entry in data1 if entry['type'] == 'dir') file_data = (entry for entry in data2 if entry['type'] == 'file') # Recreate the directory's subtree and then the files into it. self._create_tree(root, dir_data) self._create_files(root, file_data) def _create_tree(self, root, directory_paths): """Create a directory tree from the given paths The tree is created from `root` and each given path in `directory_paths` will be created. """ # Directories are sorted by depth so they are created in the # right order bsep = bytes(os.path.sep, 'utf8') dir_names = sorted( directory_paths, key=lambda x: len(x.split(bsep))) for dir_name in dir_names: os.makedirs(os.path.join(root, dir_name)) def _create_files(self, root, file_datas): """Create the files according to their status. """ # Then create the files for file_data in file_datas: path = os.path.join(root, file_data['name']) content = get_filtered_file_content(self.storage, file_data) self._create_file(path, content, file_data['perms']) def _create_file(self, path, content, mode=0o100644): """Create the given file and fill it with content. """ perms = mode_to_perms(mode) if perms == DentryPerms.symlink: os.symlink(content, path) else: with open(path, 'wb') as f: f.write(content) os.chmod(path, perms.value) def _get_file_content(self, obj_id): """Get the content of the given file. """ content = list(self.storage.content_get([obj_id]))[0]['data'] return content def _create_bundle_content(self, path, hex_dir_id): """Create a bundle from the given directory Args: path: location of the directory to package. hex_dir_id: hex representation of the directory id Returns: bytes that represent the compressed directory as a bundle. """ return get_tar_bytes(path.decode(), hex_dir_id) diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py index 5ba8425..d2e6892 100644 --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -1,284 +1,320 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime import psycopg2 import unittest from unittest.mock import patch from swh.core.tests.db_testing import DbTestFixture from swh.model import hashutil from swh.storage.tests.storage_testing import StorageTestFixture from swh.vault.tests.vault_testing import VaultTestFixture, hash_content class BaseTestBackend(VaultTestFixture, StorageTestFixture, DbTestFixture): @contextlib.contextmanager def mock_cooking(self): with patch.object(self.vault_backend, '_send_task') as mt: mt.return_value = 42 with patch('swh.vault.backend.get_cooker') as mg: mcc = unittest.mock.MagicMock() mc = unittest.mock.MagicMock() mg.return_value = mcc mcc.return_value = mc mc.check_exists.return_value = True yield {'send_task': mt, 'get_cooker': mg, 'cooker_cls': mcc, 'cooker': mc} def assertTimestampAlmostNow(self, ts, tolerance_secs=1.0): # noqa now = datetime.datetime.now(datetime.timezone.utc) creation_delta_secs = (ts - now).total_seconds() self.assertLess(creation_delta_secs, tolerance_secs) def fake_cook(self, obj_type, result_content, sticky=False): content, obj_id = hash_content(result_content) with self.mock_cooking(): self.vault_backend.create_task(obj_type, obj_id, sticky) self.vault_backend.cache.add(obj_type, obj_id, b'content') self.vault_backend.set_status(obj_type, obj_id, 'done') return obj_id, content + def fail_cook(self, obj_type, obj_id, failure_reason): + with self.mock_cooking(): + self.vault_backend.create_task(obj_type, obj_id) + self.vault_backend.set_status(obj_type, obj_id, 'failed') + self.vault_backend.set_progress(obj_type, obj_id, failure_reason) + TEST_TYPE = 'revision_gitfast' TEST_HEX_ID = '4a4b9771542143cf070386f86b4b92d42966bdbc' TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) TEST_PROGRESS = ("Mr. White, You're telling me you're cooking again?" " \N{ASTONISHED FACE} ") TEST_EMAIL = 'ouiche@example.com' class TestBackend(BaseTestBackend, unittest.TestCase): def test_create_task_simple(self): with self.mock_cooking() as m: self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) m['get_cooker'].assert_called_once_with(TEST_TYPE) args = m['cooker_cls'].call_args[0] self.assertEqual(args[0], TEST_TYPE) self.assertEqual(args[1], TEST_HEX_ID) self.assertEqual(m['cooker'].check_exists.call_count, 1) self.assertEqual(m['send_task'].call_count, 1) args = m['send_task'].call_args[0][0] self.assertEqual(args[0], TEST_TYPE) self.assertEqual(args[1], TEST_HEX_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['object_id'], TEST_OBJ_ID) self.assertEqual(info['type'], TEST_TYPE) self.assertEqual(info['task_status'], 'new') self.assertEqual(info['task_id'], 42) self.assertTimestampAlmostNow(info['ts_created']) self.assertEqual(info['ts_done'], None) self.assertEqual(info['progress_msg'], None) def test_create_fail_duplicate_task(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) with self.assertRaises(psycopg2.IntegrityError): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) def test_create_fail_nonexisting_object(self): with self.mock_cooking() as m: m['cooker'].check_exists.side_effect = ValueError('Nothing here.') with self.assertRaises(ValueError): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) def test_create_set_progress(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['progress_msg'], None) self.vault_backend.set_progress(TEST_TYPE, TEST_OBJ_ID, TEST_PROGRESS) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['progress_msg'], TEST_PROGRESS) def test_create_set_status(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['task_status'], 'new') self.assertEqual(info['ts_done'], None) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'pending') info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['task_status'], 'pending') self.assertEqual(info['ts_done'], None) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['task_status'], 'done') self.assertTimestampAlmostNow(info['ts_done']) def test_create_update_access_ts(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_1 = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_1) self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_2 = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_2) self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_3 = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_3) self.assertLess(access_ts_1, access_ts_2) self.assertLess(access_ts_2, access_ts_3) def test_cook_request_idempotent(self): with self.mock_cooking(): info1 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) info2 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) info3 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info1, info2) self.assertEqual(info1, info3) def test_cook_email_pending_done(self): with self.mock_cooking(), \ patch.object(self.vault_backend, 'add_notif_email') as madd, \ patch.object(self.vault_backend, 'send_notification') as msend: self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) madd.assert_not_called() msend.assert_not_called() madd.reset_mock() msend.reset_mock() self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) msend.assert_not_called() madd.reset_mock() msend.reset_mock() self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_OBJ_ID) madd.assert_not_called() def test_send_all_emails(self): with self.mock_cooking(): emails = ('a@example.com', 'billg@example.com', 'test+42@example.org') for email in emails: self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email=email) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') with patch.object(self.vault_backend, 'smtp_server') as m: self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) sent_emails = {k[0][0] for k in m.send_message.call_args_list} self.assertEqual({k['To'] for k in sent_emails}, set(emails)) for e in sent_emails: self.assertIn('info@softwareheritage.org', e['From']) self.assertIn(TEST_TYPE, e['Subject']) self.assertIn(TEST_HEX_ID[:5], e['Subject']) self.assertIn(TEST_TYPE, str(e)) self.assertIn('https://archive.softwareheritage.org/', str(e)) self.assertIn(TEST_HEX_ID[:5], str(e)) self.assertIn('--\x20\n', str(e)) # Well-formated signature!!! # Check that the entries have been deleted and recalling the # function does not re-send the e-mails m.reset_mock() self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) m.assert_not_called() def test_available(self): self.assertFalse(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) self.assertFalse(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) self.vault_backend.cache.add(TEST_TYPE, TEST_OBJ_ID, b'content') self.assertFalse(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') self.assertTrue(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) def test_fetch(self): self.assertEqual(self.vault_backend.fetch(TEST_TYPE, TEST_OBJ_ID), None) obj_id, content = self.fake_cook(TEST_TYPE, b'content') info = self.vault_backend.task_info(TEST_TYPE, obj_id) access_ts_before = info['ts_last_access'] self.assertEqual(self.vault_backend.fetch(TEST_TYPE, obj_id), b'content') info = self.vault_backend.task_info(TEST_TYPE, obj_id) access_ts_after = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_after) self.assertLess(access_ts_before, access_ts_after) def test_cache_expire_oldest(self): r = range(1, 10) inserted = {} for i in r: sticky = (i == 5) content = b'content%s' % str(i).encode() obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) self.vault_backend.cache_expire_oldest(n=4) should_be_still_here = {2, 3, 5, 8, 9} for i in r: self.assertEqual(self.vault_backend.is_available( TEST_TYPE, inserted[i][0]), i in should_be_still_here) def test_cache_expire_until(self): r = range(1, 10) inserted = {} for i in r: sticky = (i == 5) content = b'content%s' % str(i).encode() obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) if i == 7: cutoff_date = datetime.datetime.now() self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) self.vault_backend.cache_expire_until(date=cutoff_date) should_be_still_here = {2, 3, 5, 8, 9} for i in r: self.assertEqual(self.vault_backend.is_available( TEST_TYPE, inserted[i][0]), i in should_be_still_here) + + def test_fail_cook_simple(self): + self.fail_cook(TEST_TYPE, TEST_OBJ_ID, 'error42') + self.assertFalse(self.vault_backend.is_available(TEST_TYPE, + TEST_OBJ_ID)) + info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) + self.assertEqual(info['progress_msg'], 'error42') + + def test_send_failure_email(self): + with self.mock_cooking(): + self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, + email='a@example.com') + + self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'failed') + self.vault_backend.set_progress(TEST_TYPE, TEST_OBJ_ID, 'test error') + + with patch.object(self.vault_backend, 'smtp_server') as m: + self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) + + e = [k[0][0] for k in m.send_message.call_args_list][0] + self.assertEqual(e['To'], 'a@example.com') + + self.assertIn('info@softwareheritage.org', e['From']) + self.assertIn(TEST_TYPE, e['Subject']) + self.assertIn(TEST_HEX_ID[:5], e['Subject']) + self.assertIn('fail', e['Subject']) + self.assertIn(TEST_TYPE, str(e)) + self.assertIn(TEST_HEX_ID[:5], str(e)) + self.assertIn('test error', str(e)) + self.assertIn('--\x20\n', str(e)) # Well-formated signature diff --git a/swh/vault/tests/vault_testing.py b/swh/vault/tests/vault_testing.py index 8e1de1e..561c42d 100644 --- a/swh/vault/tests/vault_testing.py +++ b/swh/vault/tests/vault_testing.py @@ -1,70 +1,71 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile import pathlib from swh.model import hashutil from swh.vault.backend import VaultBackend class VaultTestFixture: """Mix this in a test subject class to get Vault Database testing support. This fixture requires to come before DbTestFixture and StorageTestFixture in the inheritance list as it uses their methods to setup its own internal components. Usage example: class TestVault(VaultTestFixture, StorageTestFixture, DbTestFixture): ... """ TEST_VAULT_DB_NAME = 'softwareheritage-test-vault' @classmethod def setUpClass(cls): if not hasattr(cls, 'DB_TEST_FIXTURE_IMPORTED'): raise RuntimeError("VaultTestFixture needs to be followed by " "DbTestFixture in the inheritance list.") test_dir = pathlib.Path(__file__).absolute().parent test_db_dump = test_dir / '../../../sql/swh-vault-schema.sql' test_db_dump = test_db_dump.absolute() cls.add_db(cls.TEST_VAULT_DB_NAME, str(test_db_dump), 'psql') super().setUpClass() def setUp(self): super().setUp() self.cache_root = tempfile.TemporaryDirectory('vault-cache-') self.vault_config = { 'storage': self.storage_config, - 'db': 'postgresql:///' + self.TEST_VAULT_DB_NAME, 'cache': { 'cls': 'pathslicing', 'args': { 'root': self.cache_root.name, 'slicing': '0:1/1:5', 'allow_delete': True, } - } + }, + 'db': 'postgresql:///' + self.TEST_VAULT_DB_NAME, + 'scheduling_db': None, } self.vault_backend = VaultBackend(self.vault_config) def tearDown(self): self.cache_root.cleanup() self.vault_backend.close() self.reset_storage_tables() self.reset_vault_tables() super().tearDown() def reset_vault_tables(self): excluded = {'dbversion'} self.reset_db_tables(self.TEST_VAULT_DB_NAME, excluded=excluded) def hash_content(content): obj_id = hashutil.hash_data(content)['sha1'] return content, obj_id