diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,4 +1,5 @@ -pytest +pytest < 4 +pytest-postgresql dulwich >= 0.18.7 swh.loader.git >= 0.0.48 swh.storage[testing] diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -50,6 +50,10 @@ vcversioner={}, include_package_data=True, zip_safe=False, + entry_points=''' + [console_scripts] + swh-vault=swh.vault.cli:main + ''', classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", diff --git a/swh/vault/__init__.py b/swh/vault/__init__.py --- a/swh/vault/__init__.py +++ b/swh/vault/__init__.py @@ -2,9 +2,12 @@ # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging +logger = logging.getLogger(__name__) -def get_vault(cls, args): + +def get_vault(cls='remote', args={}): """ Get a vault object of class `vault_class` with arguments `vault_args`. @@ -15,16 +18,23 @@ - args (dict): dictionary with keys Returns: - an instance of swh.storage.Storage (either local or remote) + an instance of VaultBackend (either local or remote) Raises: ValueError if passed an unknown storage class. """ - if cls == 'remote': from .api.client import RemoteVaultClient as Vault + elif cls == 'local': + from swh.scheduler import get_scheduler + from swh.storage import get_storage + from swh.vault.cache import VaultCache + from swh.vault.backend import VaultBackend as Vault + args['cache'] = VaultCache(**args['cache']) + args['storage'] = get_storage(**args['storage']) + args['scheduler'] = get_scheduler(**args['scheduler']) else: raise ValueError('Unknown storage class `%s`' % cls) - + logger.debug('Instantiating %s with %s' % (Vault, args)) return Vault(**args) diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py --- a/swh/vault/api/client.py +++ b/swh/vault/api/client.py @@ -17,9 +17,9 @@ class RemoteVaultClient(SWHRemoteAPI): """Client to the Software Heritage vault cache.""" - def __init__(self, base_url, timeout=None): + def __init__(self, url, timeout=None): super().__init__( - api_exception=VaultAPIError, url=base_url, timeout=timeout) + api_exception=VaultAPIError, url=url, timeout=timeout) # Web API endpoints diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -1,11 +1,11 @@ -# Copyright (C) 2016-2018 The Software Heritage developers +# Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import os import aiohttp.web import asyncio -import click import collections from swh.core import config @@ -13,8 +13,9 @@ encode_data_server as encode_data, decode_request) from swh.model import hashutil +from swh.vault import get_vault from swh.vault.cookers import COOKER_TYPES -from swh.vault.backend import VaultBackend, NotFoundExc +from swh.vault.backend import NotFoundExc DEFAULT_CONFIG_PATH = 'vault/server' @@ -33,7 +34,12 @@ }, }), 'client_max_size': ('int', 1024 ** 3), - 'db': ('str', 'dbname=softwareheritage-vault-dev'), + 'vault': ('dict', { + 'cls': 'local', + 'args': { + 'db': 'dbname=softwareheritage-vault-dev', + }, + }), 'scheduler': ('dict', { 'cls': 'remote', 'args': { @@ -168,10 +174,7 @@ # Web server -def make_app(config, **kwargs): - if 'client_max_size' in config: - kwargs['client_max_size'] = config['client_max_size'] - +def make_app(backend, **kwargs): app = SWHRemoteAPI(**kwargs) app.router.add_route('GET', '/', index) @@ -190,31 +193,45 @@ app.router.add_route('POST', '/batch_cook', batch_cook) app.router.add_route('GET', '/batch_progress/{batch_id}', batch_progress) - app['backend'] = VaultBackend(config) + app['backend'] = backend return app -api_cfg = None +def get_local_backend(config_file): + if os.path.isfile(config_file): + cfg = config.read(config_file, DEFAULT_CONFIG) + else: + cfg = config.load_named_config(config_file, DEFAULT_CONFIG) + + if 'vault' not in cfg: + raise ValueError("missing '%vault' configuration") + + vcfg = cfg['vault'] + if vcfg['cls'] != 'local': + raise EnvironmentError( + "The vault backend can only be started with a 'local' " + "configuration", err=True) + args = vcfg['args'] + if 'cache' not in args: + args['cache'] = cfg.get('cache') + if 'storage' not in args: + args['storage'] = cfg.get('storage') + if 'scheduler' not in args: + args['scheduler'] = cfg.get('scheduler') + for key in ('cache', 'storage', 'scheduler'): + if not args.get(key): + raise ValueError( + "invalid configuration; missing %s config entry." % key) -def make_app_from_configfile(config_path=DEFAULT_CONFIG_PATH, **kwargs): - global api_cfg - if not api_cfg: - api_cfg = config.read(config_path, DEFAULT_CONFIG) - return make_app(api_cfg, **kwargs) + return get_vault('local', args) -@click.command() -@click.argument('config-path', required=1) -@click.option('--host', default='0.0.0.0', help="Host to run the server") -@click.option('--port', default=5005, type=click.INT, - help="Binding port of the server") -@click.option('--debug/--nodebug', default=True, - help="Indicates if the server should run in debug mode") -def launch(config_path, host, port, debug): - app = make_app(config.read(config_path, DEFAULT_CONFIG), debug=bool(debug)) - aiohttp.web.run_app(app, host=host, port=int(port)) +def make_app_from_configfile(config_file=DEFAULT_CONFIG_PATH, **kwargs): + config_file = os.environ.get('SWH_CONFIG_FILENAME', config_file) + vault = get_local_backend(config_file) + return make_app(backend=vault, **kwargs) if __name__ == '__main__': - launch() + print('Deprecated. Use swh-vault ') diff --git a/swh/vault/api/wsgi.py b/swh/vault/api/wsgi.py new file mode 100644 --- /dev/null +++ b/swh/vault/api/wsgi.py @@ -0,0 +1,8 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.vault.api.server import make_app_from_configfile + +app = make_app_from_configfile() diff --git a/swh/vault/backend.py b/swh/vault/backend.py --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -4,17 +4,15 @@ # See top-level LICENSE file for more information import smtplib -import psycopg2 -from psycopg2.extras import RealDictCursor +import psycopg2.extras -from functools import wraps from email.mime.text import MIMEText +from swh.core.db import BaseDb +from swh.core.db.common import db_transaction from swh.model import hashutil -from swh.scheduler import get_scheduler from swh.scheduler.utils import create_oneshot_task_dict -from swh.vault.cache import VaultCache -from swh.vault.cookers import get_cooker +from swh.vault.cookers import get_cooker_cls cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' @@ -69,146 +67,87 @@ for obj_type, obj_id in batch] -# TODO: Imported from swh.scheduler.backend. Factorization needed. -def autocommit(fn): - @wraps(fn) - def wrapped(self, *args, **kwargs): - autocommit = False - # TODO: I don't like using None, it's confusing for the user. how about - # a NEW_CURSOR object()? - if 'cursor' not in kwargs or not kwargs['cursor']: - autocommit = True - kwargs['cursor'] = self.cursor() - - try: - ret = fn(self, *args, **kwargs) - except BaseException: - if autocommit: - self.rollback() - raise - - if autocommit: - self.commit() - - return ret - - return wrapped - - -# TODO: This has to be factorized with other database base classes and helpers -# (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) -# The three first methods are imported from swh.scheduler.backend. class VaultBackend: """ Backend for the Software Heritage vault. """ - def __init__(self, config): + def __init__(self, db, cache, scheduler, storage=None, **config): self.config = config - self.cache = VaultCache(self.config['cache']) - self.db = None - self.reconnect() + self.cache = cache + self.scheduler = scheduler + self.storage = storage self.smtp_server = smtplib.SMTP() - if self.config['scheduler'] is not None: - self.scheduler = get_scheduler(**self.config['scheduler']) - - def reconnect(self): - """Reconnect to the database.""" - if not self.db or self.db.closed: - self.db = psycopg2.connect( - dsn=self.config['db'], - cursor_factory=RealDictCursor, - ) - def close(self): - """Close the underlying database connection.""" - self.db.close() - - def cursor(self): - """Return a fresh cursor on the database, with auto-reconnection in - case of failure""" - cur = None - - # Get a fresh cursor and reconnect at most three times - tries = 0 - while True: - tries += 1 - try: - cur = self.db.cursor() - cur.execute('select 1') - break - except psycopg2.OperationalError: - if tries < 3: - self.reconnect() - else: - raise - return cur - - def commit(self): - """Commit a transaction""" - self.db.commit() - - def rollback(self): - """Rollback a transaction""" - self.db.rollback() - - @autocommit - def task_info(self, obj_type, obj_id, cursor=None): + self._pool = psycopg2.pool.ThreadedConnectionPool( + config.get('min_pool_conns', 1), + config.get('max_pool_conns', 10), + db, + cursor_factory=psycopg2.extras.RealDictCursor, + ) + self._db = None + + def get_db(self): + if self._db: + return self._db + return BaseDb.from_pool(self._pool) + + @db_transaction() + def task_info(self, obj_type, obj_id, db=None, cur=None): """Fetch information from a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) - cursor.execute(''' + cur.execute(''' SELECT id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) - res = cursor.fetchone() + res = cur.fetchone() if res: res['object_id'] = bytes(res['object_id']) return res - def _send_task(self, args): + def _send_task(self, *args): """Send a cooking task to the celery scheduler""" task = create_oneshot_task_dict('swh-vault-cooking', *args) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]['id'] - @autocommit - def create_task(self, obj_type, obj_id, sticky=False, cursor=None): + @db_transaction() + def create_task(self, obj_type, obj_id, sticky=False, db=None, cur=None): """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) hex_id = hashutil.hash_to_hex(obj_id) - args = [obj_type, hex_id] - backend_storage_config = {'storage': self.config['storage']} - cooker_class = get_cooker(obj_type) - cooker = cooker_class(*args, override_cfg=backend_storage_config) + cooker_class = get_cooker_cls(obj_type) + cooker = cooker_class(obj_type, hex_id, + backend=self, storage=self.storage) if not cooker.check_exists(): raise NotFoundExc("Object {} was not found.".format(hex_id)) - cursor.execute(''' + cur.execute(''' INSERT INTO vault_bundle (type, object_id, sticky) VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky)) - self.commit() + db.conn.commit() - task_id = self._send_task(args) + task_id = self._send_task(obj_type, hex_id) - cursor.execute(''' + cur.execute(''' UPDATE vault_bundle SET task_id = %s WHERE type = %s AND object_id = %s''', (task_id, obj_type, obj_id)) - @autocommit - def add_notif_email(self, obj_type, obj_id, email, cursor=None): + @db_transaction() + def add_notif_email(self, obj_type, obj_id, email, db=None, cur=None): """Add an e-mail address to notify when a given bundle is ready""" obj_id = hashutil.hash_to_bytes(obj_id) - cursor.execute(''' + cur.execute(''' INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))''', - (email, obj_type, obj_id)) + (email, obj_type, obj_id)) - @autocommit + @db_transaction() def cook_request(self, obj_type, obj_id, *, sticky=False, - email=None, cursor=None): + email=None, db=None, cur=None): """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" obj_id = hashutil.hash_to_bytes(obj_id) @@ -216,10 +155,10 @@ # If there's a failed bundle entry, delete it first. if info is not None and info['task_status'] == 'failed': - cursor.execute('''DELETE FROM vault_bundle + cur.execute('''DELETE FROM vault_bundle WHERE type = %s AND object_id = %s''', - (obj_type, obj_id)) - self.commit() + (obj_type, obj_id)) + db.conn.commit() info = None # If there's no bundle entry, create the task. @@ -238,44 +177,44 @@ info = self.task_info(obj_type, obj_id) return info - @autocommit - def batch_cook(self, batch, cursor=None): + @db_transaction() + def batch_cook(self, batch, db=None, cur=None): """Cook a batch of bundles and returns the cooking id.""" # Import execute_values at runtime only, because it requires # psycopg2 >= 2.7 (only available on postgresql servers) from psycopg2.extras import execute_values - cursor.execute(''' + cur.execute(''' INSERT INTO vault_batch (id) VALUES (DEFAULT) RETURNING id''') - batch_id = cursor.fetchone()['id'] + batch_id = cur.fetchone()['id'] batch = batch_to_bytes(batch) # Delete all failed bundles from the batch - cursor.execute(''' + cur.execute(''' DELETE FROM vault_bundle WHERE task_status = 'failed' AND (type, object_id) IN %s''', (tuple(batch),)) # Insert all the bundles, return the new ones - execute_values(cursor, ''' + execute_values(cur, ''' INSERT INTO vault_bundle (type, object_id) VALUES %s ON CONFLICT DO NOTHING''', batch) # Get the bundle ids and task status - cursor.execute(''' + cur.execute(''' SELECT id, type, object_id, task_id FROM vault_bundle WHERE (type, object_id) IN %s''', (tuple(batch),)) - bundles = cursor.fetchall() + bundles = cur.fetchall() # Insert the batch-bundle entries batch_id_bundle_ids = [(batch_id, row['id']) for row in bundles] - execute_values(cursor, ''' + execute_values(cur, ''' INSERT INTO vault_batch_bundle (batch_id, bundle_id) VALUES %s ON CONFLICT DO NOTHING''', batch_id_bundle_ids) - self.commit() + db.conn.commit() # Get the tasks to fetch batch_new = [(row['type'], bytes(row['object_id'])) @@ -296,7 +235,7 @@ in tasks_ids_bundle_ids] # Update the task ids - execute_values(cursor, ''' + execute_values(cur, ''' UPDATE vault_bundle SET task_id = s_task_id FROM (VALUES %s) AS sub (s_task_id, s_type, s_object_id) @@ -304,50 +243,50 @@ tasks_ids_bundle_ids) return batch_id - @autocommit - def batch_info(self, batch_id, cursor=None): + @db_transaction() + def batch_info(self, batch_id, db=None, cur=None): """Fetch information from a batch of bundles""" - cursor.execute(''' + cur.execute(''' SELECT vault_bundle.id as id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_batch_bundle LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id WHERE batch_id = %s''', (batch_id,)) - res = cursor.fetchall() + res = cur.fetchall() if res: for d in res: d['object_id'] = bytes(d['object_id']) return res - @autocommit - def is_available(self, obj_type, obj_id, cursor=None): + @db_transaction() + def is_available(self, obj_type, obj_id, db=None, cur=None): """Check whether a bundle is available for retrieval""" - info = self.task_info(obj_type, obj_id, cursor=cursor) + info = self.task_info(obj_type, obj_id, cur=cur) return (info is not None and info['task_status'] == 'done' and self.cache.is_cached(obj_type, obj_id)) - @autocommit - def fetch(self, obj_type, obj_id, cursor=None): + @db_transaction() + def fetch(self, obj_type, obj_id, db=None, cur=None): """Retrieve a bundle from the cache""" - if not self.is_available(obj_type, obj_id, cursor=cursor): + if not self.is_available(obj_type, obj_id, cur=cur): return None - self.update_access_ts(obj_type, obj_id, cursor=cursor) + self.update_access_ts(obj_type, obj_id, cur=cur) return self.cache.get(obj_type, obj_id) - @autocommit - def update_access_ts(self, obj_type, obj_id, cursor=None): + @db_transaction() + def update_access_ts(self, obj_type, obj_id, db=None, cur=None): """Update the last access timestamp of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) - cursor.execute(''' + cur.execute(''' UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s''', - (obj_type, obj_id)) + (obj_type, obj_id)) - @autocommit - def set_status(self, obj_type, obj_id, status, cursor=None): + @db_transaction() + def set_status(self, obj_type, obj_id, status, db=None, cur=None): """Set the cooking status of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) req = (''' @@ -355,36 +294,36 @@ SET task_status = %s ''' + (''', ts_done = NOW() ''' if status == 'done' else '') + '''WHERE type = %s AND object_id = %s''') - cursor.execute(req, (status, obj_type, obj_id)) + cur.execute(req, (status, obj_type, obj_id)) - @autocommit - def set_progress(self, obj_type, obj_id, progress, cursor=None): + @db_transaction() + def set_progress(self, obj_type, obj_id, progress, db=None, cur=None): """Set the cooking progress of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) - cursor.execute(''' + cur.execute(''' UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s''', - (progress, obj_type, obj_id)) + (progress, obj_type, obj_id)) - @autocommit - def send_all_notifications(self, obj_type, obj_id, cursor=None): + @db_transaction() + def send_all_notifications(self, obj_type, obj_id, db=None, cur=None): """Send all the e-mails in the notification list of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) - cursor.execute(''' + cur.execute(''' SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', - (obj_type, obj_id)) - for d in cursor: + (obj_type, obj_id)) + for d in cur: self.send_notification(d['id'], d['email'], obj_type, obj_id, status=d['task_status'], progress_msg=d['progress_msg']) - @autocommit + @db_transaction() def send_notification(self, n_id, email, obj_type, obj_id, status, - progress_msg=None, cursor=None): + progress_msg=None, db=None, cur=None): """Send the notification of a bundle to a specific e-mail""" hex_id = hashutil.hash_to_hex(obj_id) short_id = hex_id[:7] @@ -421,7 +360,7 @@ self._smtp_send(msg) if n_id is not None: - cursor.execute(''' + cur.execute(''' DELETE FROM vault_notif_email WHERE id = %s''', (n_id,)) @@ -437,11 +376,11 @@ # Send the message self.smtp_server.send_message(msg) - @autocommit - def _cache_expire(self, cond, *args, cursor=None): + @db_transaction() + def _cache_expire(self, cond, *args, db=None, cur=None): """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT - cursor.execute(''' + cur.execute(''' DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid @@ -452,18 +391,18 @@ RETURNING type, object_id '''.format(cond), args) - for d in cursor: + for d in cur: self.cache.delete(d['type'], bytes(d['object_id'])) - @autocommit - def cache_expire_oldest(self, n=1, by='last_access', cursor=None): + @db_transaction() + def cache_expire_oldest(self, n=1, by='last_access', db=None, cur=None): """Expire the `n` oldest bundles""" assert by in ('created', 'done', 'last_access') filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n) return self._cache_expire(filter) - @autocommit - def cache_expire_until(self, date, by='last_access', cursor=None): + @db_transaction() + def cache_expire_until(self, date, by='last_access', db=None, cur=None): """Expire all the bundles until a certain date""" assert by in ('created', 'done', 'last_access') filter = '''AND ts_{} <= %s'''.format(by) diff --git a/swh/vault/cache.py b/swh/vault/cache.py --- a/swh/vault/cache.py +++ b/swh/vault/cache.py @@ -15,7 +15,7 @@ internal identifiers used in the underlying objstorage. """ - def __init__(self, objstorage): + def __init__(self, **objstorage): self.objstorage = get_objstorage(**objstorage) def add(self, obj_type, obj_id, content): diff --git a/swh/vault/cli.py b/swh/vault/cli.py new file mode 100644 --- /dev/null +++ b/swh/vault/cli.py @@ -0,0 +1,50 @@ +import logging +import click +import aiohttp + +from swh.vault.api.server import make_app_from_configfile + + +CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) + + +@click.command(context_settings=CONTEXT_SETTINGS) +@click.option('--config-file', '-C', default=None, + type=click.Path(exists=True, dir_okay=False,), + help="Configuration file.") +@click.option('--log-level', '-l', default='INFO', + type=click.Choice(logging._nameToLevel.keys()), + help="Log level (default to INFO)") +@click.option('--no-stdout', is_flag=True, default=False, + help="Do NOT output logs on the console") +@click.option('--host', default='0.0.0.0', help="Host to run the server") +@click.option('--port', default=5005, type=click.INT, + help="Binding port of the server") +@click.option('--debug/--nodebug', default=True, + help="Indicates if the server should run in debug mode") +@click.pass_context +def cli(ctx, config_file, log_level, no_stdout, host, port, debug): + """Software Heritage Vault API server + + """ + from swh.scheduler.celery_backend.config import setup_log_handler + log_level = setup_log_handler( + loglevel=log_level, colorize=False, + format='[%(levelname)s] %(name)s -- %(message)s', + log_console=not no_stdout) + + try: + app = make_app_from_configfile(config_file, debug=debug) + except EnvironmentError as e: + click.echo(e.msg, err=True) + ctx.exit(1) + + aiohttp.web.run_app(app, host=host, port=int(port)) + + +def main(): + return cli(auto_envvar_prefix='SWH_VAULT') + + +if __name__ == '__main__': + main() diff --git a/swh/vault/cookers/__init__.py b/swh/vault/cookers/__init__.py --- a/swh/vault/cookers/__init__.py +++ b/swh/vault/cookers/__init__.py @@ -2,7 +2,12 @@ # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import os +from swh.core.config import load_named_config, read as read_config +from swh.storage import get_storage +from swh.vault import get_vault +from swh.vault.cookers.base import DEFAULT_CONFIG_PATH, DEFAULT_CONFIG from swh.vault.cookers.directory import DirectoryCooker from swh.vault.cookers.revision_flat import RevisionFlatCooker from swh.vault.cookers.revision_gitfast import RevisionGitfastCooker @@ -13,4 +18,36 @@ 'revision_gitfast': RevisionGitfastCooker, } -get_cooker = COOKER_TYPES.__getitem__ + +def get_cooker_cls(obj_type): + return COOKER_TYPES[obj_type] + + +def get_cooker(obj_type, obj_id): + if 'SWH_CONFIG_FILENAME' in os.environ: + cfg = read_config(os.environ['SWH_CONFIG_FILENAME'], DEFAULT_CONFIG) + else: + cfg = load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG) + cooker_cls = get_cooker_cls(obj_type) + if 'vault' not in cfg: + raise ValueError("missing '%vault' configuration") + + vcfg = cfg['vault'] + if vcfg['cls'] != 'remote': + raise EnvironmentError( + "This vault backend can only be a 'remote' " + "configuration", err=True) + args = vcfg['args'] + if 'storage' not in args: + args['storage'] = cfg.get('storage') + + if not args.get('storage'): + raise ValueError( + "invalid configuration; missing 'storage' config entry.") + + storage = get_storage(**args.pop('storage')) + backend = get_vault(**vcfg) + + return cooker_cls(obj_type, obj_id, + backend=backend, storage=storage, + max_bundle_size=cfg['max_bundle_size']) diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -7,12 +7,9 @@ import io import logging -from swh.core import config from swh.model import hashutil -from swh.storage import get_storage -from swh.vault.api.client import RemoteVaultClient - +MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB DEFAULT_CONFIG_PATH = 'vault/cooker' DEFAULT_CONFIG = { 'storage': ('dict', { @@ -21,8 +18,13 @@ 'url': 'http://localhost:5002/', }, }), - 'vault_url': ('str', 'http://localhost:5005/'), - 'max_bundle_size': ('int', 2 ** 29), # 512 MiB + 'vault': ('dict', { + 'cls': 'remote', + 'args': { + 'url': 'http://localhost:5005/', + }, + }), + 'max_bundle_size': ('int', MAX_BUNDLE_SIZE), } @@ -61,7 +63,8 @@ """ CACHE_TYPE_KEY = None - def __init__(self, obj_type, obj_id, *, override_cfg=None): + def __init__(self, obj_type, obj_id, backend, storage, + max_bundle_size=MAX_BUNDLE_SIZE): """Initialize the cooker. The type of the object represented by the id depends on the @@ -69,20 +72,17 @@ own cooker class. Args: - storage: the storage object - cache: the cache where to store the bundle + obj_type: type of the object to be cooked into a bundle (directory, + revision_flat or revision_gitfast; see + swh.vault.cooker.COOKER_TYPES). obj_id: id of the object to be cooked into a bundle. + backend: the vault backend (swh.vault.backend.VaultBackend). """ - self.config = config.load_named_config(DEFAULT_CONFIG_PATH, - DEFAULT_CONFIG) - if override_cfg is not None: - self.config.update(override_cfg) - self.obj_type = obj_type self.obj_id = hashutil.hash_to_bytes(obj_id) - self.backend = RemoteVaultClient(self.config['vault_url']) - self.storage = get_storage(**self.config['storage']) - self.max_bundle_size = self.config['max_bundle_size'] + self.backend = backend + self.storage = storage + self.max_bundle_size = max_bundle_size @abc.abstractmethod def check_exists(self): diff --git a/swh/vault/cooking_tasks.py b/swh/vault/cooking_tasks.py --- a/swh/vault/cooking_tasks.py +++ b/swh/vault/cooking_tasks.py @@ -11,11 +11,11 @@ @app.task(name=__name__ + '.SWHCookingTask') def cook_bundle(obj_type, obj_id): """Main task to cook a bundle.""" - get_cooker(obj_type)(obj_type, obj_id).cook() + get_cooker(obj_type, obj_id).cook() # TODO: remove once the scheduler handles priority tasks @app.task(name=__name__ + '.SWHBatchCookingTask') def batch_cook_bundle(obj_type, obj_id): """Temporary task for the batch queue.""" - get_cooker(obj_type)(obj_type, obj_id).cook() + get_cooker(obj_type, obj_id).cook() diff --git a/swh/vault/tests/conftest.py b/swh/vault/tests/conftest.py new file mode 100644 --- /dev/null +++ b/swh/vault/tests/conftest.py @@ -0,0 +1,80 @@ +import pytest +import glob +import os +import pkg_resources.extern.packaging.version + +from swh.core.utils import numfile_sortkey as sortkey +from swh.vault import get_vault +from swh.vault.tests import SQL_DIR +from swh.storage.tests import SQL_DIR as STORAGE_SQL_DIR +from pytest_postgresql import factories + + +pytest_v = pkg_resources.get_distribution("pytest").parsed_version +if pytest_v < pkg_resources.extern.packaging.version.parse('3.9'): + @pytest.fixture + def tmp_path(request): + import tempfile + import pathlib + with tempfile.TemporaryDirectory() as tmpdir: + yield pathlib.Path(tmpdir) + + +def db_url(name, postgresql_proc): + return 'postgresql://{user}@{host}:{port}/{dbname}'.format( + host=postgresql_proc.host, + port=postgresql_proc.port, + user='postgres', + dbname=name) + + +postgresql2 = factories.postgresql('postgresql_proc', 'tests2') + + +@pytest.fixture +def swh_vault(request, postgresql_proc, postgresql, postgresql2, tmp_path): + + for sql_dir, pg in ((SQL_DIR, postgresql), (STORAGE_SQL_DIR, postgresql2)): + dump_files = os.path.join(sql_dir, '*.sql') + all_dump_files = sorted(glob.glob(dump_files), key=sortkey) + + cursor = pg.cursor() + for fname in all_dump_files: + with open(fname) as fobj: + # disable concurrent index creation since we run in a + # transaction + cursor.execute(fobj.read().replace('concurrently', '')) + pg.commit() + + vault_config = { + 'db': db_url('tests', postgresql_proc), + 'storage': { + 'cls': 'local', + 'args': { + 'db': db_url('tests2', postgresql_proc), + 'objstorage': { + 'cls': 'pathslicing', + 'args': { + 'root': str(tmp_path), + 'slicing': '0:1/1:5', + }, + }, + }, + }, + 'cache': { + 'cls': 'pathslicing', + 'args': { + 'root': str(tmp_path), + 'slicing': '0:1/1:5', + 'allow_delete': True, + } + }, + 'scheduler': { + 'cls': 'remote', + 'args': { + 'url': 'http://swh-scheduler:5008', + }, + }, + } + + return get_vault('local', vault_config) diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -6,49 +6,50 @@ import contextlib import datetime import psycopg2 -import unittest -from unittest.mock import patch +from unittest.mock import patch, MagicMock +import pytest from swh.model import hashutil -from swh.vault.tests.vault_testing import VaultTestFixture, hash_content - - -class BaseTestBackend(VaultTestFixture): - @contextlib.contextmanager - def mock_cooking(self): - with patch.object(self.vault_backend, '_send_task') as mt: - mt.return_value = 42 - with patch('swh.vault.backend.get_cooker') as mg: - mcc = unittest.mock.MagicMock() - mc = unittest.mock.MagicMock() - mg.return_value = mcc - mcc.return_value = mc - mc.check_exists.return_value = True - - yield {'send_task': mt, - 'get_cooker': mg, - 'cooker_cls': mcc, - 'cooker': mc} - - def assertTimestampAlmostNow(self, ts, tolerance_secs=1.0): # noqa - now = datetime.datetime.now(datetime.timezone.utc) - creation_delta_secs = (ts - now).total_seconds() - self.assertLess(creation_delta_secs, tolerance_secs) - - def fake_cook(self, obj_type, result_content, sticky=False): - content, obj_id = hash_content(result_content) - with self.mock_cooking(): - self.vault_backend.create_task(obj_type, obj_id, sticky) - self.vault_backend.cache.add(obj_type, obj_id, b'content') - self.vault_backend.set_status(obj_type, obj_id, 'done') - return obj_id, content - - def fail_cook(self, obj_type, obj_id, failure_reason): - with self.mock_cooking(): - self.vault_backend.create_task(obj_type, obj_id) - self.vault_backend.set_status(obj_type, obj_id, 'failed') - self.vault_backend.set_progress(obj_type, obj_id, failure_reason) +from swh.vault.tests.vault_testing import hash_content + + +@contextlib.contextmanager +def mock_cooking(vault_backend): + with patch.object(vault_backend, '_send_task') as mt: + mt.return_value = 42 + with patch('swh.vault.backend.get_cooker') as mg: + mcc = MagicMock() + mc = MagicMock() + mg.return_value = mcc + mcc.return_value = mc + mc.check_exists.return_value = True + + yield {'_send_task': mt, + 'get_cooker': mg, + 'cooker_cls': mcc, + 'cooker': mc} + +def assertTimestampAlmostNow(ts, tolerance_secs=1.0): # noqa + now = datetime.datetime.now(datetime.timezone.utc) + creation_delta_secs = (ts - now).total_seconds() + assert creation_delta_secs < tolerance_secs + + +def fake_cook(backend, obj_type, result_content, sticky=False): + content, obj_id = hash_content(result_content) + with mock_cooking(backend): + backend.create_task(obj_type, obj_id, sticky) + backend.cache.add(obj_type, obj_id, b'content') + backend.set_status(obj_type, obj_id, 'done') + return obj_id, content + + +def fail_cook(backend, obj_type, obj_id, failure_reason): + with mock_cooking(backend): + backend.create_task(obj_type, obj_id) + backend.set_status(obj_type, obj_id, 'failed') + backend.set_progress(obj_type, obj_id, failure_reason) TEST_TYPE = 'revision_gitfast' @@ -56,272 +57,281 @@ TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) TEST_PROGRESS = ("Mr. White, You're telling me you're cooking again?" " \N{ASTONISHED FACE} ") -TEST_EMAIL = 'ouiche@example.com' - - -class TestBackend(BaseTestBackend, unittest.TestCase): - def test_create_task_simple(self): - with self.mock_cooking() as m: - self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) - - m['get_cooker'].assert_called_once_with(TEST_TYPE) - - args = m['cooker_cls'].call_args[0] - self.assertEqual(args[0], TEST_TYPE) - self.assertEqual(args[1], TEST_HEX_ID) - - self.assertEqual(m['cooker'].check_exists.call_count, 1) - - self.assertEqual(m['send_task'].call_count, 1) - args = m['send_task'].call_args[0][0] - self.assertEqual(args[0], TEST_TYPE) - self.assertEqual(args[1], TEST_HEX_ID) - - info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) - self.assertEqual(info['object_id'], TEST_OBJ_ID) - self.assertEqual(info['type'], TEST_TYPE) - self.assertEqual(info['task_status'], 'new') - self.assertEqual(info['task_id'], 42) - - self.assertTimestampAlmostNow(info['ts_created']) - - self.assertEqual(info['ts_done'], None) - self.assertEqual(info['progress_msg'], None) - - def test_create_fail_duplicate_task(self): - with self.mock_cooking(): - self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) - with self.assertRaises(psycopg2.IntegrityError): - self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) - - def test_create_fail_nonexisting_object(self): - with self.mock_cooking() as m: - m['cooker'].check_exists.side_effect = ValueError('Nothing here.') - with self.assertRaises(ValueError): - self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) - - def test_create_set_progress(self): - with self.mock_cooking(): - self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) - - info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) - self.assertEqual(info['progress_msg'], None) - self.vault_backend.set_progress(TEST_TYPE, TEST_OBJ_ID, - TEST_PROGRESS) - info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) - self.assertEqual(info['progress_msg'], TEST_PROGRESS) - - def test_create_set_status(self): - with self.mock_cooking(): - self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) - - info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) - self.assertEqual(info['task_status'], 'new') - self.assertEqual(info['ts_done'], None) - - self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'pending') - info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) - self.assertEqual(info['task_status'], 'pending') - self.assertEqual(info['ts_done'], None) - - self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') - info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) - self.assertEqual(info['task_status'], 'done') - self.assertTimestampAlmostNow(info['ts_done']) - - def test_create_update_access_ts(self): - with self.mock_cooking(): - self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) - - info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) - access_ts_1 = info['ts_last_access'] - self.assertTimestampAlmostNow(access_ts_1) - - self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID) - info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) - access_ts_2 = info['ts_last_access'] - self.assertTimestampAlmostNow(access_ts_2) - - self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID) - info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) - access_ts_3 = info['ts_last_access'] - self.assertTimestampAlmostNow(access_ts_3) - - self.assertLess(access_ts_1, access_ts_2) - self.assertLess(access_ts_2, access_ts_3) - - def test_cook_request_idempotent(self): - with self.mock_cooking(): - info1 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) - info2 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) - info3 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) - self.assertEqual(info1, info2) - self.assertEqual(info1, info3) - - def test_cook_email_pending_done(self): - with self.mock_cooking(), \ - patch.object(self.vault_backend, 'add_notif_email') as madd, \ - patch.object(self.vault_backend, 'send_notification') as msend: - - self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) - madd.assert_not_called() - msend.assert_not_called() - - madd.reset_mock() - msend.reset_mock() - - self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, - email=TEST_EMAIL) - madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) - msend.assert_not_called() - - madd.reset_mock() - msend.reset_mock() - - self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') - self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, - email=TEST_EMAIL) - msend.assert_called_once_with(None, TEST_EMAIL, - TEST_TYPE, TEST_OBJ_ID, 'done') - madd.assert_not_called() - - def test_send_all_emails(self): - with self.mock_cooking(): - emails = ('a@example.com', - 'billg@example.com', - 'test+42@example.org') - for email in emails: - self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, - email=email) - - self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') - - with patch.object(self.vault_backend, 'smtp_server') as m: - self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) - - sent_emails = {k[0][0] for k in m.send_message.call_args_list} - self.assertEqual({k['To'] for k in sent_emails}, set(emails)) - - for e in sent_emails: - self.assertIn('info@softwareheritage.org', e['From']) - self.assertIn(TEST_TYPE, e['Subject']) - self.assertIn(TEST_HEX_ID[:5], e['Subject']) - self.assertIn(TEST_TYPE, str(e)) - self.assertIn('https://archive.softwareheritage.org/', str(e)) - self.assertIn(TEST_HEX_ID[:5], str(e)) - self.assertIn('--\x20\n', str(e)) # Well-formated signature!!! - - # Check that the entries have been deleted and recalling the - # function does not re-send the e-mails - m.reset_mock() - self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) - m.assert_not_called() - - def test_available(self): - self.assertFalse(self.vault_backend.is_available(TEST_TYPE, - TEST_OBJ_ID)) - with self.mock_cooking(): - self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) - self.assertFalse(self.vault_backend.is_available(TEST_TYPE, - TEST_OBJ_ID)) - self.vault_backend.cache.add(TEST_TYPE, TEST_OBJ_ID, b'content') - self.assertFalse(self.vault_backend.is_available(TEST_TYPE, - TEST_OBJ_ID)) - self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') - self.assertTrue(self.vault_backend.is_available(TEST_TYPE, - TEST_OBJ_ID)) - - def test_fetch(self): - self.assertEqual(self.vault_backend.fetch(TEST_TYPE, TEST_OBJ_ID), - None) - obj_id, content = self.fake_cook(TEST_TYPE, b'content') - - info = self.vault_backend.task_info(TEST_TYPE, obj_id) - access_ts_before = info['ts_last_access'] - - self.assertEqual(self.vault_backend.fetch(TEST_TYPE, obj_id), - b'content') - - info = self.vault_backend.task_info(TEST_TYPE, obj_id) - access_ts_after = info['ts_last_access'] - - self.assertTimestampAlmostNow(access_ts_after) - self.assertLess(access_ts_before, access_ts_after) - - def test_cache_expire_oldest(self): - r = range(1, 10) - inserted = {} - for i in r: - sticky = (i == 5) - content = b'content%s' % str(i).encode() - obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) - inserted[i] = (obj_id, content) - - self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) - self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) - self.vault_backend.cache_expire_oldest(n=4) - - should_be_still_here = {2, 3, 5, 8, 9} - for i in r: - self.assertEqual(self.vault_backend.is_available( - TEST_TYPE, inserted[i][0]), i in should_be_still_here) - - def test_cache_expire_until(self): - r = range(1, 10) - inserted = {} - for i in r: - sticky = (i == 5) - content = b'content%s' % str(i).encode() - obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) - inserted[i] = (obj_id, content) - - if i == 7: - cutoff_date = datetime.datetime.now() - - self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) - self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) - self.vault_backend.cache_expire_until(date=cutoff_date) - - should_be_still_here = {2, 3, 5, 8, 9} - for i in r: - self.assertEqual(self.vault_backend.is_available( - TEST_TYPE, inserted[i][0]), i in should_be_still_here) - - def test_fail_cook_simple(self): - self.fail_cook(TEST_TYPE, TEST_OBJ_ID, 'error42') - self.assertFalse(self.vault_backend.is_available(TEST_TYPE, - TEST_OBJ_ID)) - info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) - self.assertEqual(info['progress_msg'], 'error42') - - def test_send_failure_email(self): - with self.mock_cooking(): - self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, - email='a@example.com') - - self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'failed') - self.vault_backend.set_progress(TEST_TYPE, TEST_OBJ_ID, 'test error') - - with patch.object(self.vault_backend, 'smtp_server') as m: - self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) - - e = [k[0][0] for k in m.send_message.call_args_list][0] - self.assertEqual(e['To'], 'a@example.com') - - self.assertIn('info@softwareheritage.org', e['From']) - self.assertIn(TEST_TYPE, e['Subject']) - self.assertIn(TEST_HEX_ID[:5], e['Subject']) - self.assertIn('fail', e['Subject']) - self.assertIn(TEST_TYPE, str(e)) - self.assertIn(TEST_HEX_ID[:5], str(e)) - self.assertIn('test error', str(e)) - self.assertIn('--\x20\n', str(e)) # Well-formated signature - - def test_retry_failed_bundle(self): - self.fail_cook(TEST_TYPE, TEST_OBJ_ID, 'error42') - info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) - self.assertEqual(info['task_status'], 'failed') - with self.mock_cooking(): - self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) - info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) - self.assertEqual(info['task_status'], 'new') +TEST_EMAIL = 'ouiche@lorraine.fr' + + +def test_create_task_simple(swh_vault): + with mock_cooking(swh_vault) as m: + swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + + m['get_cooker'].assert_called_once_with(TEST_TYPE) + + args = m['cooker_cls'].call_args[0] + assert args[0] == TEST_TYPE + assert args[1] == TEST_HEX_ID + + assert m['cooker'].check_exists.call_count == 1 + assert m['_send_task'].call_count == 1 + + args = m['_send_task'].call_args[0] + assert args[0] == TEST_TYPE + assert args[1] == TEST_HEX_ID + + info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + assert info['object_id'] == TEST_OBJ_ID + assert info['type'] == TEST_TYPE + assert info['task_status'] == 'new' + assert info['task_id'] == 42 + + assertTimestampAlmostNow(info['ts_created']) + + assert info['ts_done'] is None + assert info['progress_msg'] is None + + +def test_create_fail_duplicate_task(swh_vault): + with mock_cooking(swh_vault): + swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + with pytest.raises(psycopg2.IntegrityError): + swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + + +def test_create_fail_nonexisting_object(swh_vault): + with mock_cooking(swh_vault) as m: + m['cooker'].check_exists.side_effect = ValueError('Nothing here.') + with pytest.raises(ValueError): + swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + + +def test_create_set_progress(swh_vault): + with mock_cooking(swh_vault): + swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + + info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + assert info['progress_msg'] is None + swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, + TEST_PROGRESS) + info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + assert info['progress_msg'] == TEST_PROGRESS + + +def test_create_set_status(swh_vault): + with mock_cooking(swh_vault): + swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + + info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + assert info['task_status'] == 'new' + assert info['ts_done'] is None + + swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'pending') + info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + assert info['task_status'] == 'pending' + assert info['ts_done'] is None + + swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') + info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + assert info['task_status'] == 'done' + assertTimestampAlmostNow(info['ts_done']) + + +def test_create_update_access_ts(swh_vault): + with mock_cooking(swh_vault): + swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + + info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + access_ts_1 = info['ts_last_access'] + assertTimestampAlmostNow(access_ts_1) + + swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) + info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + access_ts_2 = info['ts_last_access'] + assertTimestampAlmostNow(access_ts_2) + + swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) + info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + access_ts_3 = info['ts_last_access'] + assertTimestampAlmostNow(access_ts_3) + + assert access_ts_1 < access_ts_2 + assert access_ts_2 < access_ts_3 + + +def test_cook_request_idempotent(swh_vault): + with mock_cooking(swh_vault): + info1 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) + info2 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) + info3 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) + assert info1 == info2 + assert info1 == info3 + + +def test_cook_email_pending_done(swh_vault): + with mock_cooking(swh_vault), \ + patch.object(swh_vault, 'add_notif_email') as madd, \ + patch.object(swh_vault, 'send_notification') as msend: + + swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) + madd.assert_not_called() + msend.assert_not_called() + + madd.reset_mock() + msend.reset_mock() + + swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, + email=TEST_EMAIL) + madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) + msend.assert_not_called() + + madd.reset_mock() + msend.reset_mock() + + swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') + swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, + email=TEST_EMAIL) + msend.assert_called_once_with(None, TEST_EMAIL, + TEST_TYPE, TEST_OBJ_ID, 'done') + madd.assert_not_called() + + +def test_send_all_emails(swh_vault): + with mock_cooking(swh_vault): + emails = ('a@example.com', + 'billg@example.com', + 'test+42@example.org') + for email in emails: + swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, + email=email) + + swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') + + with patch.object(swh_vault, 'smtp_server') as m: + swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) + + sent_emails = {k[0][0] for k in m.send_message.call_args_list} + assert {k['To'] for k in sent_emails} == set(emails) + + for e in sent_emails: + assert 'info@softwareheritage.org' in e['From'] + assert TEST_TYPE in e['Subject'] + assert TEST_HEX_ID[:5] in e['Subject'] + assert TEST_TYPE in str(e) + assert 'https://archive.softwareheritage.org/' in str(e) + assert TEST_HEX_ID[:5] in str(e) + assert '--\x20\n' in str(e) # Well-formated signature!!! + + # Check that the entries have been deleted and recalling the + # function does not re-send the e-mails + m.reset_mock() + swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) + m.assert_not_called() + + +def test_available(swh_vault): + assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) + + with mock_cooking(swh_vault): + swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) + + swh_vault.cache.add(TEST_TYPE, TEST_OBJ_ID, b'content') + assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) + + swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') + assert swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) + + +def test_fetch(swh_vault): + assert swh_vault.fetch(TEST_TYPE, TEST_OBJ_ID) is None + obj_id, content = fake_cook(swh_vault, TEST_TYPE, b'content') + + info = swh_vault.task_info(TEST_TYPE, obj_id) + access_ts_before = info['ts_last_access'] + + assert swh_vault.fetch(TEST_TYPE, obj_id) == b'content' + + info = swh_vault.task_info(TEST_TYPE, obj_id) + access_ts_after = info['ts_last_access'] + + assertTimestampAlmostNow(access_ts_after) + assert access_ts_before < access_ts_after + + +def test_cache_expire_oldest(swh_vault): + r = range(1, 10) + inserted = {} + for i in r: + sticky = (i == 5) + content = b'content%s' % str(i).encode() + obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) + inserted[i] = (obj_id, content) + + swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) + swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) + swh_vault.cache_expire_oldest(n=4) + + should_be_still_here = {2, 3, 5, 8, 9} + for i in r: + assert swh_vault.is_available( + TEST_TYPE, inserted[i][0]) == (i in should_be_still_here) + + +def test_cache_expire_until(swh_vault): + r = range(1, 10) + inserted = {} + for i in r: + sticky = (i == 5) + content = b'content%s' % str(i).encode() + obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) + inserted[i] = (obj_id, content) + + if i == 7: + cutoff_date = datetime.datetime.now() + + swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) + swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) + swh_vault.cache_expire_until(date=cutoff_date) + + should_be_still_here = {2, 3, 5, 8, 9} + for i in r: + assert swh_vault.is_available( + TEST_TYPE, inserted[i][0]) == (i in should_be_still_here) + + +def test_fail_cook_simple(swh_vault): + fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, 'error42') + assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) + info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + assert info['progress_msg'] == 'error42' + + +def test_send_failure_email(swh_vault): + with mock_cooking(swh_vault): + swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email='a@example.com') + + swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'failed') + swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, 'test error') + + with patch.object(swh_vault, 'smtp_server') as m: + swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) + + e = [k[0][0] for k in m.send_message.call_args_list][0] + assert e['To'] == 'a@example.com' + + assert 'info@softwareheritage.org' in e['From'] + assert TEST_TYPE in e['Subject'] + assert TEST_HEX_ID[:5] in e['Subject'] + assert 'fail' in e['Subject'] + assert TEST_TYPE in str(e) + assert TEST_HEX_ID[:5] in str(e) + assert 'test error' in str(e) + assert '--\x20\n' in str(e) # Well-formated signature + + +def test_retry_failed_bundle(swh_vault): + fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, 'error42') + info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + assert info['task_status'] == 'failed' + with mock_cooking(swh_vault): + swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) + info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + assert info['task_status'] == 'new' diff --git a/swh/vault/tests/test_cache.py b/swh/vault/tests/test_cache.py --- a/swh/vault/tests/test_cache.py +++ b/swh/vault/tests/test_cache.py @@ -3,10 +3,8 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import unittest from swh.model import hashutil -from swh.vault.tests.vault_testing import VaultTestFixture TEST_TYPE_1 = 'revision_gitfast' TEST_TYPE_2 = 'directory' @@ -21,54 +19,51 @@ TEST_CONTENT_2 = b'test content 2' -class BaseTestVaultCache(VaultTestFixture): - def setUp(self): - super().setUp() - self.cache = self.vault_backend.cache # little shortcut - - -class TestVaultCache(BaseTestVaultCache, unittest.TestCase): - # Let's try to avoid replicating edge-cases already tested in - # swh-objstorage, and instead focus on testing behaviors specific to the - # Vault cache here. - - def test_internal_id(self): - sid = self.cache._get_internal_id(TEST_TYPE_1, TEST_OBJ_ID_1) - self.assertEqual(hashutil.hash_to_hex(sid), - '6829cda55b54c295aa043a611a4e0320239988d9') - - def test_simple_add_get(self): - self.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1) - self.assertEqual(self.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1), - TEST_CONTENT_1) - self.assertTrue(self.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1)) - - def test_different_type_same_id(self): - self.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1) - self.cache.add(TEST_TYPE_2, TEST_OBJ_ID_1, TEST_CONTENT_2) - self.assertEqual(self.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1), - TEST_CONTENT_1) - self.assertEqual(self.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1), - TEST_CONTENT_2) - self.assertTrue(self.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1)) - self.assertTrue(self.cache.is_cached(TEST_TYPE_2, TEST_OBJ_ID_1)) - - def test_different_type_same_content(self): - self.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1) - self.cache.add(TEST_TYPE_2, TEST_OBJ_ID_1, TEST_CONTENT_1) - self.assertEqual(self.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1), - TEST_CONTENT_1) - self.assertEqual(self.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1), - TEST_CONTENT_1) - self.assertTrue(self.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1)) - self.assertTrue(self.cache.is_cached(TEST_TYPE_2, TEST_OBJ_ID_1)) - - def test_different_id_same_type(self): - self.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1) - self.cache.add(TEST_TYPE_1, TEST_OBJ_ID_2, TEST_CONTENT_2) - self.assertEqual(self.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1), - TEST_CONTENT_1) - self.assertEqual(self.cache.get(TEST_TYPE_1, TEST_OBJ_ID_2), - TEST_CONTENT_2) - self.assertTrue(self.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1)) - self.assertTrue(self.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_2)) +# Let's try to avoid replicating edge-cases already tested in +# swh-objstorage, and instead focus on testing behaviors specific to the +# Vault cache here. + +def test_internal_id(swh_vault): + sid = swh_vault.cache._get_internal_id(TEST_TYPE_1, TEST_OBJ_ID_1) + assert hashutil.hash_to_hex(sid) == \ + '6829cda55b54c295aa043a611a4e0320239988d9' + + +def test_simple_add_get(swh_vault): + swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1) + assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == \ + TEST_CONTENT_1 + assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1) + + +def test_different_type_same_id(swh_vault): + swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1) + swh_vault.cache.add(TEST_TYPE_2, TEST_OBJ_ID_1, TEST_CONTENT_2) + assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == \ + TEST_CONTENT_1 + assert swh_vault.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1) == \ + TEST_CONTENT_2 + assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1) + assert swh_vault.cache.is_cached(TEST_TYPE_2, TEST_OBJ_ID_1) + + +def test_different_type_same_content(swh_vault): + swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1) + swh_vault.cache.add(TEST_TYPE_2, TEST_OBJ_ID_1, TEST_CONTENT_1) + assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == \ + TEST_CONTENT_1 + assert swh_vault.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1) == \ + TEST_CONTENT_1 + assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1) + assert swh_vault.cache.is_cached(TEST_TYPE_2, TEST_OBJ_ID_1) + + +def test_different_id_same_type(swh_vault): + swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1) + swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_2, TEST_CONTENT_2) + assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == \ + TEST_CONTENT_1 + assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_2) == \ + TEST_CONTENT_2 + assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1) + assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_2) diff --git a/swh/vault/tests/test_cookers.py b/swh/vault/tests/test_cookers.py --- a/swh/vault/tests/test_cookers.py +++ b/swh/vault/tests/test_cookers.py @@ -26,7 +26,7 @@ from swh.model import hashutil from swh.model.from_disk import Directory from swh.vault.cookers import DirectoryCooker, RevisionGitfastCooker -from swh.vault.tests.vault_testing import VaultTestFixture, hash_content +from swh.vault.tests.vault_testing import hash_content from swh.vault.to_disk import SKIPPED_MESSAGE, HIDDEN_MESSAGE @@ -101,61 +101,60 @@ self.git_shell(*args, stdout=None) -@pytest.mark.config_issue -class BaseTestCookers(VaultTestFixture): - """Base class of cookers unit tests""" - def setUp(self): - super().setUp() - self.loader = GitLoaderFromDisk() - self.loader.storage = self.storage - - def tearDown(self): - self.loader = None - super().tearDown() - - def load(self, repo_path): - """Load a repository in the test storage""" - self.loader.load('fake_origin', repo_path, datetime.datetime.now()) - - @contextlib.contextmanager - def cook_extract_directory(self, obj_id): - """Context manager that cooks a directory and extract it.""" - cooker = DirectoryCooker('directory', obj_id) - cooker.storage = self.storage - cooker.backend = unittest.mock.MagicMock() - cooker.fileobj = io.BytesIO() - assert cooker.check_exists() - cooker.prepare_bundle() - cooker.fileobj.seek(0) - with tempfile.TemporaryDirectory(prefix='tmp-vault-extract-') as td: - with tarfile.open(fileobj=cooker.fileobj, mode='r') as tar: - tar.extractall(td) - yield pathlib.Path(td) / hashutil.hash_to_hex(obj_id) - cooker.storage = None - - @contextlib.contextmanager - def cook_stream_revision_gitfast(self, obj_id): - """Context manager that cooks a revision and stream its fastexport.""" - cooker = RevisionGitfastCooker('revision_gitfast', obj_id) - cooker.storage = self.storage - cooker.backend = unittest.mock.MagicMock() - cooker.fileobj = io.BytesIO() - assert cooker.check_exists() - cooker.prepare_bundle() - cooker.fileobj.seek(0) - fastexport_stream = gzip.GzipFile(fileobj=cooker.fileobj) - yield fastexport_stream - cooker.storage = None - - @contextlib.contextmanager - def cook_extract_revision_gitfast(self, obj_id): - """Context manager that cooks a revision and extract it.""" - test_repo = TestRepo() - with self.cook_stream_revision_gitfast(obj_id) as stream, \ - test_repo as p: - processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) - processor.import_stream(stream) - yield test_repo, p +@pytest.fixture +def swh_git_loader(swh_vault): + loader = GitLoaderFromDisk() + loader.storage = swh_vault.storage + return loader + + +def load(loader, repo_path): + """Load a repository in the test storage""" + loader.load('fake_origin', repo_path, datetime.datetime.now()) + + +@contextlib.contextmanager +def cook_extract_directory(storage, obj_id): + """Context manager that cooks a directory and extract it.""" + backend = unittest.mock.MagicMock() + backend.storage = storage + cooker = DirectoryCooker('directory', obj_id, backend=backend) + cooker.fileobj = io.BytesIO() + assert cooker.check_exists() + cooker.prepare_bundle() + cooker.fileobj.seek(0) + with tempfile.TemporaryDirectory(prefix='tmp-vault-extract-') as td: + with tarfile.open(fileobj=cooker.fileobj, mode='r') as tar: + tar.extractall(td) + yield pathlib.Path(td) / hashutil.hash_to_hex(obj_id) + cooker.storage = None + + +@contextlib.contextmanager +def cook_stream_revision_gitfast(storage, obj_id): + """Context manager that cooks a revision and stream its fastexport.""" + backend = unittest.mock.MagicMock() + backend.storage = storage + cooker = RevisionGitfastCooker( + 'revision_gitfast', obj_id, backend=backend) + cooker.fileobj = io.BytesIO() + assert cooker.check_exists() + cooker.prepare_bundle() + cooker.fileobj.seek(0) + fastexport_stream = gzip.GzipFile(fileobj=cooker.fileobj) + yield fastexport_stream + cooker.storage = None + + +@contextlib.contextmanager +def cook_extract_revision_gitfast(storage, obj_id): + """Context manager that cooks a revision and extract it.""" + test_repo = TestRepo() + with cook_stream_revision_gitfast(storage, obj_id) as stream, \ + test_repo as p: + processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) + processor.import_stream(stream) + yield test_repo, p TEST_CONTENT = (" test content\n" @@ -164,8 +163,8 @@ TEST_EXECUTABLE = b'\x42\x40\x00\x00\x05' -class TestDirectoryCooker(BaseTestCookers, unittest.TestCase): - def test_directory_simple(self): +class TestDirectoryCooker: + def test_directory_simple(self, swh_git_loader): repo = TestRepo() with repo as rp: (rp / 'file').write_text(TEST_CONTENT) @@ -175,25 +174,25 @@ (rp / 'dir1/dir2').mkdir(parents=True) (rp / 'dir1/dir2/file').write_text(TEST_CONTENT) c = repo.commit() - self.load(str(rp)) + load(swh_git_loader, str(rp)) obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) - with self.cook_extract_directory(obj_id) as p: - self.assertEqual((p / 'file').stat().st_mode, 0o100644) - self.assertEqual((p / 'file').read_text(), TEST_CONTENT) - self.assertEqual((p / 'executable').stat().st_mode, 0o100755) - self.assertEqual((p / 'executable').read_bytes(), TEST_EXECUTABLE) - self.assertTrue((p / 'link').is_symlink) - self.assertEqual(os.readlink(str(p / 'link')), 'file') - self.assertEqual((p / 'dir1/dir2/file').stat().st_mode, 0o100644) - self.assertEqual((p / 'dir1/dir2/file').read_text(), TEST_CONTENT) + with cook_extract_directory(swh_git_loader.storage, obj_id) as p: + assert (p / 'file').stat().st_mode == 0o100644 + assert (p / 'file').read_text() == TEST_CONTENT + assert (p / 'executable').stat().st_mode == 0o100755 + assert (p / 'executable').read_bytes() == TEST_EXECUTABLE + assert (p / 'link').is_symlink + assert os.readlink(str(p / 'link')) == 'file' + assert (p / 'dir1/dir2/file').stat().st_mode == 0o100644 + assert (p / 'dir1/dir2/file').read_text() == TEST_CONTENT directory = Directory.from_disk(path=bytes(p)) - self.assertEqual(obj_id_hex, hashutil.hash_to_hex(directory.hash)) + assert obj_id_hex == hashutil.hash_to_hex(directory.hash) - def test_directory_filtered_objects(self): + def test_directory_filtered_objects(self, swh_git_loader): repo = TestRepo() with repo as rp: file_1, id_1 = hash_content(b'test1') @@ -205,14 +204,14 @@ (rp / 'absent_file').write_bytes(file_3) c = repo.commit() - self.load(str(rp)) + load(swh_git_loader, str(rp)) obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) # FIXME: storage.content_update() should be changed to allow things # like that - with self.storage.get_db().transaction() as cur: + with swh_git_loader.storage.get_db().transaction() as cur: cur.execute("""update content set status = 'visible' where sha1 = %s""", (id_1,)) cur.execute("""update content set status = 'hidden' @@ -220,12 +219,12 @@ cur.execute("""update content set status = 'absent' where sha1 = %s""", (id_3,)) - with self.cook_extract_directory(obj_id) as p: - self.assertEqual((p / 'file').read_bytes(), b'test1') - self.assertEqual((p / 'hidden_file').read_bytes(), HIDDEN_MESSAGE) - self.assertEqual((p / 'absent_file').read_bytes(), SKIPPED_MESSAGE) + with cook_extract_directory(swh_git_loader.storage, obj_id) as p: + assert (p / 'file').read_bytes() == b'test1' + assert (p / 'hidden_file').read_bytes() == HIDDEN_MESSAGE + assert (p / 'absent_file').read_bytes() == SKIPPED_MESSAGE - def test_directory_bogus_perms(self): + def test_directory_bogus_perms(self, swh_git_loader): # Some early git repositories have 664/775 permissions... let's check # if all the weird modes are properly normalized in the directory # cooker. @@ -238,17 +237,17 @@ (rp / 'wat').write_text(TEST_CONTENT) (rp / 'wat').chmod(0o604) c = repo.commit() - self.load(str(rp)) + load(swh_git_loader, str(rp)) obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) - with self.cook_extract_directory(obj_id) as p: - self.assertEqual((p / 'file').stat().st_mode, 0o100644) - self.assertEqual((p / 'executable').stat().st_mode, 0o100755) - self.assertEqual((p / 'wat').stat().st_mode, 0o100644) + with cook_extract_directory(swh_git_loader.storage, obj_id) as p: + assert (p / 'file').stat().st_mode == 0o100644 + assert (p / 'executable').stat().st_mode == 0o100755 + assert (p / 'wat').stat().st_mode == 0o100644 - def test_directory_revision_data(self): + def test_directory_revision_data(self, swh_git_loader): target_rev = '0e8a3ad980ec179856012b7eecf4327e99cd44cd' d = hashutil.hash_to_bytes('17a3e48bce37be5226490e750202ad3a9a1a3fe9') @@ -263,18 +262,19 @@ } ], } - self.storage.directory_add([dir]) + swh_git_loader.storage.directory_add([dir]) - with self.cook_extract_directory(d) as p: - self.assertTrue((p / 'submodule').is_symlink()) - self.assertEqual(os.readlink(str(p / 'submodule')), target_rev) + with cook_extract_directory(swh_git_loader.storage, d) as p: + assert (p / 'submodule').is_symlink() + assert os.readlink(str(p / 'submodule')) == target_rev -class TestRevisionGitfastCooker(BaseTestCookers, unittest.TestCase): - def test_revision_simple(self): +class TestRevisionGitfastCooker: + def test_revision_simple(self, swh_git_loader): # # 1--2--3--4--5--6--7 # + storage = swh_git_loader.storage repo = TestRepo() with repo as rp: (rp / 'file1').write_text(TEST_CONTENT) @@ -293,28 +293,29 @@ repo.commit('remove file2') (rp / 'bin1').rename(rp / 'bin') repo.commit('rename bin1 to bin') - self.load(str(rp)) + load(swh_git_loader, str(rp)) obj_id_hex = repo.repo.refs[b'HEAD'].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) - with self.cook_extract_revision_gitfast(obj_id) as (ert, p): + with cook_extract_revision_gitfast(storage, obj_id) as (ert, p): ert.checkout(b'HEAD') - self.assertEqual((p / 'file1').stat().st_mode, 0o100644) - self.assertEqual((p / 'file1').read_text(), TEST_CONTENT) - self.assertTrue((p / 'link1').is_symlink) - self.assertEqual(os.readlink(str(p / 'link1')), 'file1') - self.assertEqual((p / 'bin').stat().st_mode, 0o100755) - self.assertEqual((p / 'bin').read_bytes(), TEST_EXECUTABLE) - self.assertEqual((p / 'dir1/dir2/file').read_text(), TEST_CONTENT) - self.assertEqual((p / 'dir1/dir2/file').stat().st_mode, 0o100644) - self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex) - - def test_revision_two_roots(self): + assert (p / 'file1').stat().st_mode == 0o100644 + assert (p / 'file1').read_text() == TEST_CONTENT + assert (p / 'link1').is_symlink + assert os.readlink(str(p / 'link1')) == 'file1' + assert (p / 'bin').stat().st_mode == 0o100755 + assert (p / 'bin').read_bytes() == TEST_EXECUTABLE + assert (p / 'dir1/dir2/file').read_text() == TEST_CONTENT + assert (p / 'dir1/dir2/file').stat().st_mode == 0o100644 + assert ert.repo.refs[b'HEAD'].decode() == obj_id_hex + + def test_revision_two_roots(self, swh_git_loader): # # 1----3---4 # / # 2---- # + storage = swh_git_loader.storage repo = TestRepo() with repo as rp: (rp / 'file1').write_text(TEST_CONTENT) @@ -327,17 +328,18 @@ repo.commit('add file3') obj_id_hex = repo.repo.refs[b'HEAD'].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) - self.load(str(rp)) + load(swh_git_loader, str(rp)) - with self.cook_extract_revision_gitfast(obj_id) as (ert, p): - self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex) + with cook_extract_revision_gitfast(storage, obj_id) as (ert, p): + assert ert.repo.refs[b'HEAD'].decode() == obj_id_hex - def test_revision_two_double_fork_merge(self): + def test_revision_two_double_fork_merge(self, swh_git_loader): # # 2---4---6 # / / / # 1---3---5 # + storage = swh_git_loader.storage repo = TestRepo() with repo as rp: (rp / 'file1').write_text(TEST_CONTENT) @@ -360,12 +362,12 @@ obj_id_hex = repo.repo.refs[b'HEAD'].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) - self.load(str(rp)) + load(swh_git_loader, str(rp)) - with self.cook_extract_revision_gitfast(obj_id) as (ert, p): - self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex) + with cook_extract_revision_gitfast(storage, obj_id) as (ert, p): + assert ert.repo.refs[b'HEAD'].decode() == obj_id_hex - def test_revision_triple_merge(self): + def test_revision_triple_merge(self, swh_git_loader): # # .---.---5 # / / / @@ -373,6 +375,7 @@ # / / / # 1---.---. # + storage = swh_git_loader.storage repo = TestRepo() with repo as rp: (rp / 'file1').write_text(TEST_CONTENT) @@ -387,12 +390,13 @@ obj_id_hex = repo.repo.refs[b'HEAD'].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) - self.load(str(rp)) + load(swh_git_loader, str(rp)) - with self.cook_extract_revision_gitfast(obj_id) as (ert, p): - self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex) + with cook_extract_revision_gitfast(storage, obj_id) as (ert, p): + assert ert.repo.refs[b'HEAD'].decode() == obj_id_hex - def test_revision_filtered_objects(self): + def test_revision_filtered_objects(self, swh_git_loader): + storage = swh_git_loader.storage repo = TestRepo() with repo as rp: file_1, id_1 = hash_content(b'test1') @@ -406,11 +410,11 @@ repo.commit() obj_id_hex = repo.repo.refs[b'HEAD'].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) - self.load(str(rp)) + load(swh_git_loader, str(rp)) # FIXME: storage.content_update() should be changed to allow things # like that - with self.storage.get_db().transaction() as cur: + with storage.get_db().transaction() as cur: cur.execute("""update content set status = 'visible' where sha1 = %s""", (id_1,)) cur.execute("""update content set status = 'hidden' @@ -418,16 +422,17 @@ cur.execute("""update content set status = 'absent' where sha1 = %s""", (id_3,)) - with self.cook_extract_revision_gitfast(obj_id) as (ert, p): + with cook_extract_revision_gitfast(storage, obj_id) as (ert, p): ert.checkout(b'HEAD') - self.assertEqual((p / 'file').read_bytes(), b'test1') - self.assertEqual((p / 'hidden_file').read_bytes(), HIDDEN_MESSAGE) - self.assertEqual((p / 'absent_file').read_bytes(), SKIPPED_MESSAGE) + assert (p / 'file').read_bytes() == b'test1' + assert (p / 'hidden_file').read_bytes() == HIDDEN_MESSAGE + assert (p / 'absent_file').read_bytes() == SKIPPED_MESSAGE - def test_revision_bogus_perms(self): + def test_revision_bogus_perms(self, swh_git_loader): # Some early git repositories have 664/775 permissions... let's check # if all the weird modes are properly normalized in the revision # cooker. + storage = swh_git_loader.storage repo = TestRepo() with repo as rp: (rp / 'file').write_text(TEST_CONTENT) @@ -437,24 +442,25 @@ (rp / 'wat').write_text(TEST_CONTENT) (rp / 'wat').chmod(0o604) repo.commit('initial commit') - self.load(str(rp)) + load(swh_git_loader, str(rp)) obj_id_hex = repo.repo.refs[b'HEAD'].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) - with self.cook_extract_revision_gitfast(obj_id) as (ert, p): + with cook_extract_revision_gitfast(storage, obj_id) as (ert, p): ert.checkout(b'HEAD') - self.assertEqual((p / 'file').stat().st_mode, 0o100644) - self.assertEqual((p / 'executable').stat().st_mode, 0o100755) - self.assertEqual((p / 'wat').stat().st_mode, 0o100644) + assert (p / 'file').stat().st_mode == 0o100644 + assert (p / 'executable').stat().st_mode == 0o100755 + assert (p / 'wat').stat().st_mode == 0o100644 - def test_revision_null_fields(self): + def test_revision_null_fields(self, swh_git_loader): # Our schema doesn't enforce a lot of non-null revision fields. We need # to check these cases don't break the cooker. + storage = swh_git_loader.storage repo = TestRepo() with repo as rp: (rp / 'file').write_text(TEST_CONTENT) c = repo.commit('initial commit') - self.load(str(rp)) + load(swh_git_loader, str(rp)) repo.repo.refs[b'HEAD'].decode() dir_id_hex = repo.repo[c].tree.decode() dir_id = hashutil.hash_to_bytes(dir_id_hex) @@ -474,13 +480,14 @@ 'synthetic': True } - self.storage.revision_add([test_revision]) + storage.revision_add([test_revision]) - with self.cook_extract_revision_gitfast(test_id) as (ert, p): + with cook_extract_revision_gitfast(storage, test_id) as (ert, p): ert.checkout(b'HEAD') - self.assertEqual((p / 'file').stat().st_mode, 0o100644) + assert (p / 'file').stat().st_mode == 0o100644 - def test_revision_revision_data(self): + def test_revision_revision_data(self, swh_git_loader): + storage = swh_git_loader.storage target_rev = '0e8a3ad980ec179856012b7eecf4327e99cd44cd' d = hashutil.hash_to_bytes('17a3e48bce37be5226490e750202ad3a9a1a3fe9') r = hashutil.hash_to_bytes('1ecc9270c4fc61cfddbc65a774e91ef5c425a6f0') @@ -496,7 +503,7 @@ } ], } - self.storage.directory_add([dir]) + storage.directory_add([dir]) rev = { 'id': r, @@ -511,8 +518,8 @@ 'metadata': {}, 'synthetic': True } - self.storage.revision_add([rev]) + storage.revision_add([rev]) - with self.cook_stream_revision_gitfast(r) as stream: + with cook_stream_revision_gitfast(storage, r) as stream: pattern = 'M 160000 {} submodule'.format(target_rev).encode() - self.assertIn(pattern, stream.read()) + assert pattern in stream.read() diff --git a/swh/vault/tests/test_cookers_base.py b/swh/vault/tests/test_cookers_base.py --- a/swh/vault/tests/test_cookers_base.py +++ b/swh/vault/tests/test_cookers_base.py @@ -3,7 +3,6 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import unittest from unittest.mock import MagicMock from swh.model import hashutil @@ -22,10 +21,15 @@ class BaseVaultCookerMock(BaseVaultCooker): CACHE_TYPE_KEY = TEST_OBJ_TYPE - def __init__(self, *args, **kwargs): - super().__init__(self.CACHE_TYPE_KEY, TEST_OBJ_ID, *args, **kwargs) + def __init__(self): + # we do not call super() here to bypass the building of db objects from + # config since we do mock these db objects + self.config = {} self.storage = MagicMock() self.backend = MagicMock() + self.obj_type = self.CACHE_TYPE_KEY + self.obj_id = hashutil.hash_to_bytes(TEST_OBJ_ID) + self.max_bundle_size = 1024 def check_exists(self): return True @@ -35,44 +39,43 @@ self.write(chunk) -class TestBaseVaultCooker(unittest.TestCase): - def test_simple_cook(self): - cooker = BaseVaultCookerMock() - cooker.cook() - cooker.backend.put_bundle.assert_called_once_with( - TEST_OBJ_TYPE, TEST_OBJ_ID, TEST_BUNDLE_CONTENT) - cooker.backend.set_status.assert_called_with( - TEST_OBJ_TYPE, TEST_OBJ_ID, 'done') - cooker.backend.set_progress.assert_called_with( - TEST_OBJ_TYPE, TEST_OBJ_ID, None) - cooker.backend.send_notif.assert_called_with( - TEST_OBJ_TYPE, TEST_OBJ_ID) - - def test_code_exception_cook(self): - cooker = BaseVaultCookerMock() - cooker.prepare_bundle = MagicMock() - cooker.prepare_bundle.side_effect = RuntimeError("Nope") - cooker.cook() - - # Potentially remove this when we have objstorage streaming - cooker.backend.put_bundle.assert_not_called() - - cooker.backend.set_status.assert_called_with( - TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed') - self.assertNotIn("Nope", cooker.backend.set_progress.call_args[0][2]) - cooker.backend.send_notif.assert_called_with( - TEST_OBJ_TYPE, TEST_OBJ_ID) - - def test_policy_exception_cook(self): - cooker = BaseVaultCookerMock() - cooker.max_bundle_size = 8 - cooker.cook() - - # Potentially remove this when we have objstorage streaming - cooker.backend.put_bundle.assert_not_called() - - cooker.backend.set_status.assert_called_with( - TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed') - self.assertIn("exceeds", cooker.backend.set_progress.call_args[0][2]) - cooker.backend.send_notif.assert_called_with( - TEST_OBJ_TYPE, TEST_OBJ_ID) +def test_simple_cook(): + cooker = BaseVaultCookerMock() + cooker.cook() + cooker.backend.put_bundle.assert_called_once_with( + TEST_OBJ_TYPE, TEST_OBJ_ID, TEST_BUNDLE_CONTENT) + cooker.backend.set_status.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID, 'done') + cooker.backend.set_progress.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID, None) + cooker.backend.send_notif.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID) + + +def test_code_exception_cook(): + cooker = BaseVaultCookerMock() + cooker.prepare_bundle = MagicMock() + cooker.prepare_bundle.side_effect = RuntimeError("Nope") + cooker.cook() + + # Potentially remove this when we have objstorage streaming + cooker.backend.put_bundle.assert_not_called() + + cooker.backend.set_status.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed') + assert "Nope" not in cooker.backend.set_progress.call_args[0][2] + cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID) + + +def test_policy_exception_cook(): + cooker = BaseVaultCookerMock() + cooker.max_bundle_size = 8 + cooker.cook() + + # Potentially remove this when we have objstorage streaming + cooker.backend.put_bundle.assert_not_called() + + cooker.backend.set_status.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed') + assert "exceeds" in cooker.backend.set_progress.call_args[0][2] + cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID) diff --git a/swh/vault/tests/vault_testing.py b/swh/vault/tests/vault_testing.py --- a/swh/vault/tests/vault_testing.py +++ b/swh/vault/tests/vault_testing.py @@ -3,60 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os -import tempfile - from swh.model import hashutil -from swh.vault.backend import VaultBackend - -from swh.storage.tests.storage_testing import StorageTestFixture -from swh.vault.tests import SQL_DIR - - -class VaultTestFixture(StorageTestFixture): - """Mix this in a test subject class to get Vault Database testing support. - - This fixture requires to come before DbTestFixture and StorageTestFixture - in the inheritance list as it uses their methods to setup its own internal - components. - - Usage example: - - class TestVault(VaultTestFixture, unittest.TestCase): - ... - """ - TEST_DB_NAME = 'softwareheritage-test-vault' - TEST_DB_DUMP = [StorageTestFixture.TEST_DB_DUMP, - os.path.join(SQL_DIR, '*.sql')] - - def setUp(self): - super().setUp() - self.cache_root = tempfile.TemporaryDirectory('vault-cache-') - self.vault_config = { - 'storage': self.storage_config, - 'cache': { - 'cls': 'pathslicing', - 'args': { - 'root': self.cache_root.name, - 'slicing': '0:1/1:5', - 'allow_delete': True, - } - }, - 'db': 'postgresql:///' + self.TEST_DB_NAME, - 'scheduler': None, - } - self.vault_backend = VaultBackend(self.vault_config) - - def tearDown(self): - self.cache_root.cleanup() - self.vault_backend.close() - self.reset_storage_tables() - self.reset_vault_tables() - super().tearDown() - - def reset_vault_tables(self): - excluded = {'dbversion'} - self.reset_db_tables(self.TEST_DB_NAME, excluded=excluded) def hash_content(content): diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -5,9 +5,8 @@ deps = .[testing] pytest-cov - pifpaf commands = - pifpaf run postgresql -- pytest --cov=swh --cov-branch {posargs} -m 'not config_issue' + pytest --cov=swh --cov-branch {posargs} [testenv:flake8] skip_install = true