diff --git a/PKG-INFO b/PKG-INFO index 41bc178..d400df1 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.vault -Version: 0.0.4 +Version: 0.0.5 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/sql/swh-vault-schema.sql b/sql/swh-vault-schema.sql index f41311f..5265f7a 100644 --- a/sql/swh-vault-schema.sql +++ b/sql/swh-vault-schema.sql @@ -1,43 +1,42 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; insert into dbversion (version, release, description) values (1, now(), 'Initial version'); create domain obj_hash as bytea; create type cook_type as enum ('directory', 'revision_gitfast'); comment on type cook_type is 'Type of the requested bundle'; create type cook_status as enum ('new', 'pending', 'done'); comment on type cook_status is 'Status of the cooking'; create table vault_bundle ( id bigserial primary key, type cook_type not null, -- requested cooking type object_id obj_hash not null, -- requested object ID - task_uuid uuid not null, -- celery UUID of the cooking task + task_id integer, -- scheduler task id task_status cook_status not null default 'new', -- status of the task - sticky boolean not null default false, -- bundle cannot expire ts_created timestamptz not null default now(), -- timestamp of creation ts_done timestamptz, -- timestamp of the cooking result ts_last_access timestamptz not null default now(), -- last access progress_msg text, -- progress message unique(type, object_id) ); create table vault_notif_email ( id bigserial primary key, email text not null, -- e-mail to notify bundle_id bigint not null references vault_bundle(id) ); diff --git a/swh.vault.egg-info/PKG-INFO b/swh.vault.egg-info/PKG-INFO index 41bc178..d400df1 100644 --- a/swh.vault.egg-info/PKG-INFO +++ b/swh.vault.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.vault -Version: 0.0.4 +Version: 0.0.5 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py index 689c884..16bdca1 100644 --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -1,178 +1,179 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import aiohttp.web import click from swh.core import config from swh.core.api_async import (SWHRemoteAPI, encode_data_server as encode_data, decode_request) from swh.model import hashutil from swh.vault.cookers import COOKER_TYPES from swh.vault.backend import VaultBackend DEFAULT_CONFIG_PATH = 'vault/server' DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'local', 'args': { 'db': 'dbname=softwareheritage-dev', 'objstorage': { 'cls': 'pathslicing', 'args': { 'root': '/srv/softwareheritage/objects', 'slicing': '0:2/2:4/4:6', }, }, }, }), 'cache': ('dict', { 'cls': 'pathslicing', 'args': { 'root': '/srv/softwareheritage/vault', 'slicing': '0:1/1:5', }, }), - 'db': ('str', 'dbname=swh-vault') + 'db': ('str', 'dbname=softwareheritage-vault-dev'), + 'scheduling_db': ('str', 'dbname=softwareheritage-scheduler-dev'), } @asyncio.coroutine def index(request): return aiohttp.web.Response(body="SWH Vault API server") # Web API endpoints @asyncio.coroutine def vault_fetch(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] if not request.app['backend'].is_available(obj_type, obj_id): raise aiohttp.web.HTTPNotFound return encode_data(request.app['backend'].fetch(obj_type, obj_id)) def user_info(task_info): - return {'task_uuid': str(task_info['task_uuid']), + return {'id': task_info['id'], 'status': task_info['task_status'], 'progress_message': task_info['progress_msg'], 'obj_type': task_info['type'], 'obj_id': hashutil.hash_to_hex(task_info['object_id'])} @asyncio.coroutine def vault_cook(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] email = request.query.get('email') sticky = request.query.get('sticky') in ('true', '1') if obj_type not in COOKER_TYPES: raise aiohttp.web.HTTPNotFound info = request.app['backend'].cook_request(obj_type, obj_id, email=email, sticky=sticky) # TODO: return 201 status (Created) once the api supports it return encode_data(user_info(info)) @asyncio.coroutine def vault_progress(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] info = request.app['backend'].task_info(obj_type, obj_id) if not info: raise aiohttp.web.HTTPNotFound return encode_data(user_info(info)) # Cookers endpoints @asyncio.coroutine def set_progress(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] progress = yield from decode_request(request) request.app['backend'].set_progress(obj_type, obj_id, progress) return encode_data(True) # FIXME: success value? @asyncio.coroutine def set_status(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] status = yield from decode_request(request) request.app['backend'].set_status(obj_type, obj_id, status) return encode_data(True) # FIXME: success value? @asyncio.coroutine def put_bundle(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] # TODO: handle streaming properly content = yield from decode_request(request) request.app['backend'].cache.add(obj_type, obj_id, content) return encode_data(True) # FIXME: success value? @asyncio.coroutine def send_notif(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] request.app['backend'].send_all_notifications(obj_type, obj_id) return encode_data(True) # FIXME: success value? # Web server def make_app(config, **kwargs): app = SWHRemoteAPI(**kwargs) app.router.add_route('GET', '/', index) # Endpoints used by the web API app.router.add_route('GET', '/fetch/{type}/{id}', vault_fetch) app.router.add_route('POST', '/cook/{type}/{id}', vault_cook) app.router.add_route('GET', '/progress/{type}/{id}', vault_progress) # Endpoints used by the Cookers app.router.add_route('POST', '/set_progress/{type}/{id}', set_progress) app.router.add_route('POST', '/set_status/{type}/{id}', set_status) app.router.add_route('POST', '/put_bundle/{type}/{id}', put_bundle) app.router.add_route('POST', '/send_notif/{type}/{id}', send_notif) app['backend'] = VaultBackend(config) return app def make_app_from_configfile(config_path=DEFAULT_CONFIG_PATH, **kwargs): cfg = config.load_named_config(config_path, DEFAULT_CONFIG) return make_app(cfg, **kwargs) @click.command() @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5005, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): app = make_app(config.read(config_path, DEFAULT_CONFIG), debug=bool(debug)) aiohttp.web.run_app(app, host=host, port=int(port)) if __name__ == '__main__': launch() diff --git a/swh/vault/backend.py b/swh/vault/backend.py index c379e10..49bebd8 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,320 +1,327 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import smtplib -import celery import psycopg2 import psycopg2.extras from functools import wraps from email.mime.text import MIMEText from swh.model import hashutil -from swh.scheduler.utils import get_task +from swh.scheduler.backend import SchedulerBackend +from swh.scheduler.utils import create_oneshot_task_dict from swh.vault.cache import VaultCache from swh.vault.cookers import get_cooker from swh.vault.cooking_tasks import SWHCookingTask # noqa cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' NOTIF_EMAIL_FROM = ('"Software Heritage Vault" ' '') NOTIF_EMAIL_SUBJECT = ("Bundle ready: {obj_type} {short_id}") NOTIF_EMAIL_BODY = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ # TODO: Imported from swh.scheduler.backend. Factorization needed. def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False # TODO: I don't like using None, it's confusing for the user. how about # a NEW_CURSOR object()? if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped # TODO: This has to be factorized with other database base classes and helpers # (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) # The three first methods are imported from swh.scheduler.backend. class VaultBackend: """ Backend for the Software Heritage vault. """ def __init__(self, config): self.config = config self.cache = VaultCache(self.config['cache']) self.db = None self.reconnect() self.smtp_server = smtplib.SMTP('localhost', 25) + self.scheduler = SchedulerBackend( + scheduling_db=self.config['scheduling_db']) def reconnect(self): """Reconnect to the database.""" if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['db'], cursor_factory=psycopg2.extras.RealDictCursor, ) def close(self): """Close the underlying database connection.""" self.db.close() def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() @autocommit def task_info(self, obj_type, obj_id, cursor=None): """Fetch information from a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' - SELECT id, type, object_id, task_uuid, task_status, sticky, + SELECT id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) res = cursor.fetchone() if res: res['object_id'] = bytes(res['object_id']) return res - @staticmethod - def _send_task(task_uuid, args): + def _send_task(self, args): """Send a cooking task to the celery scheduler""" - task = get_task(cooking_task_name) - task.apply_async(args, task_id=task_uuid) + task = create_oneshot_task_dict('swh-vault-cooking', *args) + added_tasks = self.scheduler.create_tasks([task]) + return added_tasks[0]['id'] @autocommit def create_task(self, obj_type, obj_id, sticky=False, cursor=None): """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) - args = [obj_type, obj_id] + hex_id = hashutil.hash_to_hex(obj_id) + args = [obj_type, hex_id] + cooker_class = get_cooker(obj_type) cooker = cooker_class(*args) cooker.check_exists() - task_uuid = celery.uuid() cursor.execute(''' - INSERT INTO vault_bundle (type, object_id, task_uuid, sticky) - VALUES (%s, %s, %s, %s)''', - (obj_type, obj_id, task_uuid, sticky)) + INSERT INTO vault_bundle (type, object_id, sticky) + VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky)) self.commit() - self._send_task(task_uuid, args) + task_id = self._send_task(args) + + cursor.execute(''' + UPDATE vault_bundle + SET task_id = %s + WHERE type = %s AND object_id = %s''', (task_id, obj_type, obj_id)) @autocommit def add_notif_email(self, obj_type, obj_id, email, cursor=None): """Add an e-mail address to notify when a given bundle is ready""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))''', (email, obj_type, obj_id)) @autocommit def cook_request(self, obj_type, obj_id, *, sticky=False, email=None, cursor=None): """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" info = self.task_info(obj_type, obj_id) if info is None: self.create_task(obj_type, obj_id, sticky) if email is not None: if info is not None and info['task_status'] == 'done': self.send_notification(None, email, obj_type, obj_id) else: self.add_notif_email(obj_type, obj_id, email) info = self.task_info(obj_type, obj_id) return info @autocommit def is_available(self, obj_type, obj_id, cursor=None): """Check whether a bundle is available for retrieval""" info = self.task_info(obj_type, obj_id, cursor=cursor) return (info is not None and info['task_status'] == 'done' and self.cache.is_cached(obj_type, obj_id)) @autocommit def fetch(self, obj_type, obj_id, cursor=None): """Retrieve a bundle from the cache""" if not self.is_available(obj_type, obj_id, cursor=cursor): return None self.update_access_ts(obj_type, obj_id, cursor=cursor) return self.cache.get(obj_type, obj_id) @autocommit def update_access_ts(self, obj_type, obj_id, cursor=None): """Update the last access timestamp of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) @autocommit def set_status(self, obj_type, obj_id, status, cursor=None): """Set the cooking status of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) req = (''' UPDATE vault_bundle SET task_status = %s ''' + (''', ts_done = NOW() ''' if status == 'done' else '') + '''WHERE type = %s AND object_id = %s''') cursor.execute(req, (status, obj_type, obj_id)) @autocommit def set_progress(self, obj_type, obj_id, progress, cursor=None): """Set the cooking progress of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s''', (progress, obj_type, obj_id)) @autocommit def send_all_notifications(self, obj_type, obj_id, cursor=None): """Send all the e-mails in the notification list of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' SELECT vault_notif_email.id AS id, email FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', (obj_type, obj_id)) for d in cursor: self.send_notification(d['id'], d['email'], obj_type, obj_id) @autocommit def send_notification(self, n_id, email, obj_type, obj_id, cursor=None): """Send the notification of a bundle to a specific e-mail""" hex_id = hashutil.hash_to_hex(obj_id) short_id = hex_id[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = ('https://archive.softwareheritage.org/api/1/vault/{}/{}/' 'raw'.format(obj_type, hex_id)) text = NOTIF_EMAIL_BODY.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) msg = MIMEText(text) msg['Subject'] = (NOTIF_EMAIL_SUBJECT .format(obj_type=obj_type, short_id=short_id)) msg['From'] = NOTIF_EMAIL_FROM msg['To'] = email self._smtp_send(msg) if n_id is not None: cursor.execute(''' DELETE FROM vault_notif_email WHERE id = %s''', (n_id,)) def _smtp_send(self, msg): # Reconnect if needed try: status = self.smtp_server.noop()[0] except: # smtplib.SMTPServerDisconnected status = -1 if status != 250: self.smtp_server.connect() # Send the message self.smtp_server.send_message(msg) @autocommit def _cache_expire(self, cond, *args, cursor=None): """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cursor.execute(''' DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, object_id '''.format(cond), args) for d in cursor: self.cache.delete(d['type'], bytes(d['object_id'])) @autocommit def cache_expire_oldest(self, n=1, by='last_access', cursor=None): """Expire the `n` oldest bundles""" assert by in ('created', 'done', 'last_access') filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n) return self._cache_expire(filter) @autocommit def cache_expire_until(self, date, by='last_access', cursor=None): """Expire all the bundles until a certain date""" assert by in ('created', 'done', 'last_access') filter = '''AND ts_{} <= %s'''.format(by) return self._cache_expire(filter, date) diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py index 4d8a7d7..5ba8425 100644 --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -1,284 +1,284 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime import psycopg2 import unittest from unittest.mock import patch from swh.core.tests.db_testing import DbTestFixture from swh.model import hashutil from swh.storage.tests.storage_testing import StorageTestFixture from swh.vault.tests.vault_testing import VaultTestFixture, hash_content class BaseTestBackend(VaultTestFixture, StorageTestFixture, DbTestFixture): @contextlib.contextmanager def mock_cooking(self): with patch.object(self.vault_backend, '_send_task') as mt: + mt.return_value = 42 with patch('swh.vault.backend.get_cooker') as mg: mcc = unittest.mock.MagicMock() mc = unittest.mock.MagicMock() mg.return_value = mcc mcc.return_value = mc mc.check_exists.return_value = True yield {'send_task': mt, 'get_cooker': mg, 'cooker_cls': mcc, 'cooker': mc} def assertTimestampAlmostNow(self, ts, tolerance_secs=1.0): # noqa now = datetime.datetime.now(datetime.timezone.utc) creation_delta_secs = (ts - now).total_seconds() self.assertLess(creation_delta_secs, tolerance_secs) def fake_cook(self, obj_type, result_content, sticky=False): content, obj_id = hash_content(result_content) with self.mock_cooking(): self.vault_backend.create_task(obj_type, obj_id, sticky) self.vault_backend.cache.add(obj_type, obj_id, b'content') self.vault_backend.set_status(obj_type, obj_id, 'done') return obj_id, content TEST_TYPE = 'revision_gitfast' TEST_HEX_ID = '4a4b9771542143cf070386f86b4b92d42966bdbc' TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) TEST_PROGRESS = ("Mr. White, You're telling me you're cooking again?" " \N{ASTONISHED FACE} ") TEST_EMAIL = 'ouiche@example.com' class TestBackend(BaseTestBackend, unittest.TestCase): def test_create_task_simple(self): with self.mock_cooking() as m: self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) m['get_cooker'].assert_called_once_with(TEST_TYPE) args = m['cooker_cls'].call_args[0] self.assertEqual(args[0], TEST_TYPE) - self.assertEqual(args[1], TEST_OBJ_ID) + self.assertEqual(args[1], TEST_HEX_ID) self.assertEqual(m['cooker'].check_exists.call_count, 1) self.assertEqual(m['send_task'].call_count, 1) - args = m['send_task'].call_args[0][1] + args = m['send_task'].call_args[0][0] self.assertEqual(args[0], TEST_TYPE) - self.assertEqual(args[1], TEST_OBJ_ID) + self.assertEqual(args[1], TEST_HEX_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['object_id'], TEST_OBJ_ID) self.assertEqual(info['type'], TEST_TYPE) - self.assertEqual(str(info['task_uuid']), - m['send_task'].call_args[0][0]) self.assertEqual(info['task_status'], 'new') + self.assertEqual(info['task_id'], 42) self.assertTimestampAlmostNow(info['ts_created']) self.assertEqual(info['ts_done'], None) self.assertEqual(info['progress_msg'], None) def test_create_fail_duplicate_task(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) with self.assertRaises(psycopg2.IntegrityError): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) def test_create_fail_nonexisting_object(self): with self.mock_cooking() as m: m['cooker'].check_exists.side_effect = ValueError('Nothing here.') with self.assertRaises(ValueError): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) def test_create_set_progress(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['progress_msg'], None) self.vault_backend.set_progress(TEST_TYPE, TEST_OBJ_ID, TEST_PROGRESS) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['progress_msg'], TEST_PROGRESS) def test_create_set_status(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['task_status'], 'new') self.assertEqual(info['ts_done'], None) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'pending') info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['task_status'], 'pending') self.assertEqual(info['ts_done'], None) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['task_status'], 'done') self.assertTimestampAlmostNow(info['ts_done']) def test_create_update_access_ts(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_1 = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_1) self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_2 = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_2) self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_3 = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_3) self.assertLess(access_ts_1, access_ts_2) self.assertLess(access_ts_2, access_ts_3) def test_cook_request_idempotent(self): with self.mock_cooking(): info1 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) info2 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) info3 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info1, info2) self.assertEqual(info1, info3) def test_cook_email_pending_done(self): with self.mock_cooking(), \ patch.object(self.vault_backend, 'add_notif_email') as madd, \ patch.object(self.vault_backend, 'send_notification') as msend: self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) madd.assert_not_called() msend.assert_not_called() madd.reset_mock() msend.reset_mock() self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) msend.assert_not_called() madd.reset_mock() msend.reset_mock() self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_OBJ_ID) madd.assert_not_called() def test_send_all_emails(self): with self.mock_cooking(): emails = ('a@example.com', 'billg@example.com', 'test+42@example.org') for email in emails: self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email=email) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') with patch.object(self.vault_backend, 'smtp_server') as m: self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) sent_emails = {k[0][0] for k in m.send_message.call_args_list} self.assertEqual({k['To'] for k in sent_emails}, set(emails)) for e in sent_emails: self.assertIn('info@softwareheritage.org', e['From']) self.assertIn(TEST_TYPE, e['Subject']) self.assertIn(TEST_HEX_ID[:5], e['Subject']) self.assertIn(TEST_TYPE, str(e)) self.assertIn('https://archive.softwareheritage.org/', str(e)) self.assertIn(TEST_HEX_ID[:5], str(e)) self.assertIn('--\x20\n', str(e)) # Well-formated signature!!! # Check that the entries have been deleted and recalling the # function does not re-send the e-mails m.reset_mock() self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) m.assert_not_called() def test_available(self): self.assertFalse(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) self.assertFalse(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) self.vault_backend.cache.add(TEST_TYPE, TEST_OBJ_ID, b'content') self.assertFalse(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') self.assertTrue(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) def test_fetch(self): self.assertEqual(self.vault_backend.fetch(TEST_TYPE, TEST_OBJ_ID), None) obj_id, content = self.fake_cook(TEST_TYPE, b'content') info = self.vault_backend.task_info(TEST_TYPE, obj_id) access_ts_before = info['ts_last_access'] self.assertEqual(self.vault_backend.fetch(TEST_TYPE, obj_id), b'content') info = self.vault_backend.task_info(TEST_TYPE, obj_id) access_ts_after = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_after) self.assertLess(access_ts_before, access_ts_after) def test_cache_expire_oldest(self): r = range(1, 10) inserted = {} for i in r: sticky = (i == 5) content = b'content%s' % str(i).encode() obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) self.vault_backend.cache_expire_oldest(n=4) should_be_still_here = {2, 3, 5, 8, 9} for i in r: self.assertEqual(self.vault_backend.is_available( TEST_TYPE, inserted[i][0]), i in should_be_still_here) def test_cache_expire_until(self): r = range(1, 10) inserted = {} for i in r: sticky = (i == 5) content = b'content%s' % str(i).encode() obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) if i == 7: cutoff_date = datetime.datetime.now() self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) self.vault_backend.cache_expire_until(date=cutoff_date) should_be_still_here = {2, 3, 5, 8, 9} for i in r: self.assertEqual(self.vault_backend.is_available( TEST_TYPE, inserted[i][0]), i in should_be_still_here) diff --git a/version.txt b/version.txt index 701351a..e2b857e 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.4-0-g5e0fc9b \ No newline at end of file +v0.0.5-0-ge7ff2ad \ No newline at end of file