diff --git a/swh/vault/__init__.py b/swh/vault/__init__.py index 11c6215..f9bcd02 100644 --- a/swh/vault/__init__.py +++ b/swh/vault/__init__.py @@ -1,30 +1,40 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging +logger = logging.getLogger(__name__) -def get_vault(cls, args): + +def get_vault(cls='remote', args={}): """ Get a vault object of class `vault_class` with arguments `vault_args`. Args: vault (dict): dictionary with keys: - cls (str): vault's class, either 'remote' - args (dict): dictionary with keys Returns: - an instance of swh.storage.Storage (either local or remote) + an instance of VaultBackend (either local or remote) Raises: ValueError if passed an unknown storage class. """ - if cls == 'remote': from .api.client import RemoteVaultClient as Vault + elif cls == 'local': + from swh.scheduler import get_scheduler + from swh.storage import get_storage + from swh.vault.cache import VaultCache + from swh.vault.backend import VaultBackend as Vault + args['cache'] = VaultCache(**args['cache']) + args['storage'] = get_storage(**args['storage']) + args['scheduler'] = get_scheduler(**args['scheduler']) else: raise ValueError('Unknown storage class `%s`' % cls) - + logger.debug('Instantiating %s with %s' % (Vault, args)) return Vault(**args) diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py index 43cc0a3..fcdfb9d 100644 --- a/swh/vault/api/client.py +++ b/swh/vault/api/client.py @@ -1,69 +1,69 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.model import hashutil from swh.core.api import SWHRemoteAPI class VaultAPIError(Exception): """Vault API Error""" def __str__(self): return ('An unexpected error occurred in the Vault backend: {}' .format(self.args)) class RemoteVaultClient(SWHRemoteAPI): """Client to the Software Heritage vault cache.""" - def __init__(self, base_url, timeout=None): + def __init__(self, url, timeout=None): super().__init__( - api_exception=VaultAPIError, url=base_url, timeout=timeout) + api_exception=VaultAPIError, url=url, timeout=timeout) # Web API endpoints def fetch(self, obj_type, obj_id): hex_id = hashutil.hash_to_hex(obj_id) return self.get('fetch/{}/{}'.format(obj_type, hex_id)) def cook(self, obj_type, obj_id, email=None): hex_id = hashutil.hash_to_hex(obj_id) return self.post('cook/{}/{}'.format(obj_type, hex_id), data={}, params=({'email': email} if email else None)) def progress(self, obj_type, obj_id): hex_id = hashutil.hash_to_hex(obj_id) return self.get('progress/{}/{}'.format(obj_type, hex_id)) # Cookers endpoints def set_progress(self, obj_type, obj_id, progress): hex_id = hashutil.hash_to_hex(obj_id) return self.post('set_progress/{}/{}'.format(obj_type, hex_id), data=progress) def set_status(self, obj_type, obj_id, status): hex_id = hashutil.hash_to_hex(obj_id) return self.post('set_status/{}/{}' .format(obj_type, hex_id), data=status) # TODO: handle streaming properly def put_bundle(self, obj_type, obj_id, bundle): hex_id = hashutil.hash_to_hex(obj_id) return self.post('put_bundle/{}/{}' .format(obj_type, hex_id), data=bundle) def send_notif(self, obj_type, obj_id): hex_id = hashutil.hash_to_hex(obj_id) return self.post('send_notif/{}/{}' .format(obj_type, hex_id), data=None) # Batch endpoints def batch_cook(self, batch): return self.post('batch_cook', data=batch) def batch_progress(self, batch_id): return self.get('batch_progress/{}'.format(batch_id)) diff --git a/swh/vault/backend.py b/swh/vault/backend.py index 9d99f5b..3eb0670 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,417 +1,409 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import smtplib import psycopg2.extras from email.mime.text import MIMEText from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.model import hashutil -from swh.scheduler import get_scheduler from swh.scheduler.utils import create_oneshot_task_dict -from swh.vault.cache import VaultCache -from swh.vault.cookers import get_cooker +from swh.vault.cookers import get_cooker_cls cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' NOTIF_EMAIL_FROM = ('"Software Heritage Vault" ' '') NOTIF_EMAIL_SUBJECT_SUCCESS = ("Bundle ready: {obj_type} {short_id}") NOTIF_EMAIL_SUBJECT_FAILURE = ("Bundle failed: {obj_type} {short_id}") NOTIF_EMAIL_BODY_SUCCESS = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ NOTIF_EMAIL_BODY_FAILURE = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle could not be cooked for the following reason: {progress_msg} We apologize for the inconvenience. --\x20 The Software Heritage Developers """ class NotFoundExc(Exception): """Bundle was not found.""" pass def batch_to_bytes(batch): return [(obj_type, hashutil.hash_to_bytes(obj_id)) for obj_type, obj_id in batch] -# TODO: This has to be factorized with other database base classes and helpers -# (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) -# The three first methods are imported from swh.scheduler.backend. class VaultBackend: """ Backend for the Software Heritage vault. """ - def __init__(self, **config): + def __init__(self, db, cache, scheduler, storage=None, **config): self.config = config - self.cache = VaultCache(self.config['cache']) + self.cache = cache + self.scheduler = scheduler + self.storage = storage self.smtp_server = smtplib.SMTP() - self.scheduler = get_scheduler(**self.config['scheduler']) - cfg = config['vault'] - assert cfg['cls'] == 'local' - args = cfg['args'] self._pool = psycopg2.pool.ThreadedConnectionPool( - args.get('min_pool_conns', 1), - args.get('max_pool_conns', 10), - args['db'], + config.get('min_pool_conns', 1), + config.get('max_pool_conns', 10), + db, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) @db_transaction() def task_info(self, obj_type, obj_id, db=None, cur=None): """Fetch information from a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cur.execute(''' SELECT id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) res = cur.fetchone() if res: res['object_id'] = bytes(res['object_id']) return res - def _send_task(self, args): + def _send_task(self, *args): """Send a cooking task to the celery scheduler""" task = create_oneshot_task_dict('swh-vault-cooking', *args) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]['id'] @db_transaction() def create_task(self, obj_type, obj_id, sticky=False, db=None, cur=None): """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) hex_id = hashutil.hash_to_hex(obj_id) - args = [obj_type, hex_id] - backend_storage_config = {'storage': self.config['storage']} - cooker_class = get_cooker(obj_type) - cooker = cooker_class(*args, override_cfg=backend_storage_config) + cooker_class = get_cooker_cls(obj_type) + cooker = cooker_class(obj_type, hex_id, + backend=self, storage=self.storage) if not cooker.check_exists(): raise NotFoundExc("Object {} was not found.".format(hex_id)) cur.execute(''' INSERT INTO vault_bundle (type, object_id, sticky) VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky)) db.conn.commit() - task_id = self._send_task(args) + task_id = self._send_task(obj_type, hex_id) cur.execute(''' UPDATE vault_bundle SET task_id = %s WHERE type = %s AND object_id = %s''', (task_id, obj_type, obj_id)) @db_transaction() def add_notif_email(self, obj_type, obj_id, email, db=None, cur=None): """Add an e-mail address to notify when a given bundle is ready""" obj_id = hashutil.hash_to_bytes(obj_id) cur.execute(''' INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))''', (email, obj_type, obj_id)) @db_transaction() def cook_request(self, obj_type, obj_id, *, sticky=False, email=None, db=None, cur=None): """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" obj_id = hashutil.hash_to_bytes(obj_id) info = self.task_info(obj_type, obj_id) # If there's a failed bundle entry, delete it first. if info is not None and info['task_status'] == 'failed': cur.execute('''DELETE FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) db.conn.commit() info = None # If there's no bundle entry, create the task. if info is None: self.create_task(obj_type, obj_id, sticky) if email is not None: # If the task is already done, send the email directly if info is not None and info['task_status'] == 'done': self.send_notification(None, email, obj_type, obj_id, info['task_status']) # Else, add it to the notification queue else: self.add_notif_email(obj_type, obj_id, email) info = self.task_info(obj_type, obj_id) return info @db_transaction() def batch_cook(self, batch, db=None, cur=None): """Cook a batch of bundles and returns the cooking id.""" # Import execute_values at runtime only, because it requires # psycopg2 >= 2.7 (only available on postgresql servers) from psycopg2.extras import execute_values cur.execute(''' INSERT INTO vault_batch (id) VALUES (DEFAULT) RETURNING id''') batch_id = cur.fetchone()['id'] batch = batch_to_bytes(batch) # Delete all failed bundles from the batch cur.execute(''' DELETE FROM vault_bundle WHERE task_status = 'failed' AND (type, object_id) IN %s''', (tuple(batch),)) # Insert all the bundles, return the new ones execute_values(cur, ''' INSERT INTO vault_bundle (type, object_id) VALUES %s ON CONFLICT DO NOTHING''', batch) # Get the bundle ids and task status cur.execute(''' SELECT id, type, object_id, task_id FROM vault_bundle WHERE (type, object_id) IN %s''', (tuple(batch),)) bundles = cur.fetchall() # Insert the batch-bundle entries batch_id_bundle_ids = [(batch_id, row['id']) for row in bundles] execute_values(cur, ''' INSERT INTO vault_batch_bundle (batch_id, bundle_id) VALUES %s ON CONFLICT DO NOTHING''', batch_id_bundle_ids) db.conn.commit() # Get the tasks to fetch batch_new = [(row['type'], bytes(row['object_id'])) for row in bundles if row['task_id'] is None] # Send the tasks args_batch = [(obj_type, hashutil.hash_to_hex(obj_id)) for obj_type, obj_id in batch_new] # TODO: change once the scheduler handles priority tasks tasks = [create_oneshot_task_dict('swh-vault-batch-cooking', *args) for args in args_batch] added_tasks = self.scheduler.create_tasks(tasks) tasks_ids_bundle_ids = zip([task['id'] for task in added_tasks], batch_new) tasks_ids_bundle_ids = [(task_id, obj_type, obj_id) for task_id, (obj_type, obj_id) in tasks_ids_bundle_ids] # Update the task ids execute_values(cur, ''' UPDATE vault_bundle SET task_id = s_task_id FROM (VALUES %s) AS sub (s_task_id, s_type, s_object_id) WHERE type = s_type::cook_type AND object_id = s_object_id ''', tasks_ids_bundle_ids) return batch_id @db_transaction() def batch_info(self, batch_id, db=None, cur=None): """Fetch information from a batch of bundles""" cur.execute(''' SELECT vault_bundle.id as id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_batch_bundle LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id WHERE batch_id = %s''', (batch_id,)) res = cur.fetchall() if res: for d in res: d['object_id'] = bytes(d['object_id']) return res @db_transaction() def is_available(self, obj_type, obj_id, db=None, cur=None): """Check whether a bundle is available for retrieval""" info = self.task_info(obj_type, obj_id, cur=cur) return (info is not None and info['task_status'] == 'done' and self.cache.is_cached(obj_type, obj_id)) @db_transaction() def fetch(self, obj_type, obj_id, db=None, cur=None): """Retrieve a bundle from the cache""" if not self.is_available(obj_type, obj_id, cur=cur): return None self.update_access_ts(obj_type, obj_id, cur=cur) return self.cache.get(obj_type, obj_id) @db_transaction() def update_access_ts(self, obj_type, obj_id, db=None, cur=None): """Update the last access timestamp of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cur.execute(''' UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) @db_transaction() def set_status(self, obj_type, obj_id, status, db=None, cur=None): """Set the cooking status of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) req = (''' UPDATE vault_bundle SET task_status = %s ''' + (''', ts_done = NOW() ''' if status == 'done' else '') + '''WHERE type = %s AND object_id = %s''') cur.execute(req, (status, obj_type, obj_id)) @db_transaction() def set_progress(self, obj_type, obj_id, progress, db=None, cur=None): """Set the cooking progress of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cur.execute(''' UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s''', (progress, obj_type, obj_id)) @db_transaction() def send_all_notifications(self, obj_type, obj_id, db=None, cur=None): """Send all the e-mails in the notification list of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cur.execute(''' SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', (obj_type, obj_id)) for d in cur: self.send_notification(d['id'], d['email'], obj_type, obj_id, status=d['task_status'], progress_msg=d['progress_msg']) @db_transaction() def send_notification(self, n_id, email, obj_type, obj_id, status, progress_msg=None, db=None, cur=None): """Send the notification of a bundle to a specific e-mail""" hex_id = hashutil.hash_to_hex(obj_id) short_id = hex_id[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = ('https://archive.softwareheritage.org/api/1/vault/{}/{}/' 'raw'.format(obj_type, hex_id)) if status == 'done': text = NOTIF_EMAIL_BODY_SUCCESS.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) msg = MIMEText(text) msg['Subject'] = (NOTIF_EMAIL_SUBJECT_SUCCESS .format(obj_type=obj_type, short_id=short_id)) elif status == 'failed': text = NOTIF_EMAIL_BODY_FAILURE.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, progress_msg=progress_msg) msg = MIMEText(text) msg['Subject'] = (NOTIF_EMAIL_SUBJECT_FAILURE .format(obj_type=obj_type, short_id=short_id)) else: raise RuntimeError("send_notification called on a '{}' bundle" .format(status)) msg['From'] = NOTIF_EMAIL_FROM msg['To'] = email self._smtp_send(msg) if n_id is not None: cur.execute(''' DELETE FROM vault_notif_email WHERE id = %s''', (n_id,)) def _smtp_send(self, msg): # Reconnect if needed try: status = self.smtp_server.noop()[0] except smtplib.SMTPException: status = -1 if status != 250: self.smtp_server.connect('localhost', 25) # Send the message self.smtp_server.send_message(msg) @db_transaction() def _cache_expire(self, cond, *args, db=None, cur=None): """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cur.execute(''' DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, object_id '''.format(cond), args) for d in cur: self.cache.delete(d['type'], bytes(d['object_id'])) @db_transaction() def cache_expire_oldest(self, n=1, by='last_access', db=None, cur=None): """Expire the `n` oldest bundles""" assert by in ('created', 'done', 'last_access') filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n) return self._cache_expire(filter) @db_transaction() def cache_expire_until(self, date, by='last_access', db=None, cur=None): """Expire all the bundles until a certain date""" assert by in ('created', 'done', 'last_access') filter = '''AND ts_{} <= %s'''.format(by) return self._cache_expire(filter, date) diff --git a/swh/vault/cache.py b/swh/vault/cache.py index 4c06004..2a6b231 100644 --- a/swh/vault/cache.py +++ b/swh/vault/cache.py @@ -1,47 +1,47 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.model import hashutil from swh.objstorage import get_objstorage from swh.objstorage.objstorage import compute_hash class VaultCache: """The Vault cache is an object storage that stores Vault bundles. This implementation computes sha1(':') as the internal identifiers used in the underlying objstorage. """ - def __init__(self, objstorage): + def __init__(self, **objstorage): self.objstorage = get_objstorage(**objstorage) def add(self, obj_type, obj_id, content): sid = self._get_internal_id(obj_type, obj_id) return self.objstorage.add(content, sid) def get(self, obj_type, obj_id): sid = self._get_internal_id(obj_type, obj_id) return self.objstorage.get(hashutil.hash_to_bytes(sid)) def delete(self, obj_type, obj_id): sid = self._get_internal_id(obj_type, obj_id) return self.objstorage.delete(hashutil.hash_to_bytes(sid)) def add_stream(self, obj_type, obj_id, content_iter): sid = self._get_internal_id(obj_type, obj_id) return self.objstorage.add_stream(content_iter, sid) def get_stream(self, obj_type, obj_id): sid = self._get_internal_id(obj_type, obj_id) return self.objstorage.get_stream(hashutil.hash_to_bytes(sid)) def is_cached(self, obj_type, obj_id): sid = self._get_internal_id(obj_type, obj_id) return hashutil.hash_to_bytes(sid) in self.objstorage def _get_internal_id(self, obj_type, obj_id): obj_id = hashutil.hash_to_hex(obj_id) return compute_hash('{}:{}'.format(obj_type, obj_id).encode()) diff --git a/swh/vault/cli.py b/swh/vault/cli.py index 3037962..98dc6c4 100644 --- a/swh/vault/cli.py +++ b/swh/vault/cli.py @@ -1,91 +1,50 @@ import logging import click import aiohttp -from swh.core import config -from swh.vault import get_vault -from swh.vault.api.server import make_app, DEFAULT_CONFIG +from swh.vault.api.server import make_app_from_configfile CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) -@click.group(context_settings=CONTEXT_SETTINGS) +@click.command(context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") -@click.option('--database', '-d', default=None, - help="Scheduling database DSN (imply cls is 'local')") -@click.option('--url', '-u', default=None, - help="Scheduler's url access (imply cls is 'remote')") @click.option('--log-level', '-l', default='INFO', type=click.Choice(logging._nameToLevel.keys()), help="Log level (default to INFO)") @click.option('--no-stdout', is_flag=True, default=False, help="Do NOT output logs on the console") +@click.option('--host', default='0.0.0.0', help="Host to run the server") +@click.option('--port', default=5005, type=click.INT, + help="Binding port of the server") +@click.option('--debug/--nodebug', default=True, + help="Indicates if the server should run in debug mode") @click.pass_context -def cli(ctx, config_file, database, url, log_level, no_stdout): - """Software Heritage Scheduler CLI interface - - Default to use the the local scheduler instance (plugged to the - main scheduler db). +def cli(ctx, config_file, log_level, no_stdout, host, port, debug): + """Software Heritage Vault API server """ from swh.scheduler.celery_backend.config import setup_log_handler log_level = setup_log_handler( loglevel=log_level, colorize=False, format='[%(levelname)s] %(name)s -- %(message)s', log_console=not no_stdout) - ctx.ensure_object(dict) - - logger = logging.getLogger(__name__) - vault = None - conf = config.read(config_file, DEFAULT_CONFIG) - if 'vault' not in conf: - raise ValueError("missing 'vault' configuration") - - if database: - conf['vault']['cls'] = 'local' - conf['vault']['args']['db'] = database - elif url: - conf['vault']['cls'] = 'remote' - conf['vault']['args'] = {'url': url} - sched_conf = conf['vault'] try: - logger.debug('Instanciating vault with %s' % ( - sched_conf)) - vault = get_vault(**conf) - except ValueError: - # it's the subcommand to decide whether not having a proper - # vault instance is a problem. - pass - - ctx.obj['vault'] = vault - ctx.obj['config'] = conf - ctx.obj['loglevel'] = log_level - - -@cli.command('api-server') -@click.option('--host', default='0.0.0.0', help="Host to run the server") -@click.option('--port', default=5005, type=click.INT, - help="Binding port of the server") -@click.option('--debug/--nodebug', default=True, - help="Indicates if the server should run in debug mode") -@click.pass_context -def serve(ctx, host, port, debug): - if ctx.obj['config']['vault']['cls'] == 'remote': - click.echo("The API server can only be started with a 'local' " - "configuration", err=True) + app = make_app_from_configfile(config_file, debug=debug) + except EnvironmentError as e: + click.echo(e.msg, err=True) ctx.exit(1) - app = make_app(ctx.obj['config'], backend=ctx.obj['vault'], - debug=bool(debug)) + aiohttp.web.run_app(app, host=host, port=int(port)) def main(): return cli(auto_envvar_prefix='SWH_VAULT') if __name__ == '__main__': main() diff --git a/swh/vault/cookers/__init__.py b/swh/vault/cookers/__init__.py index 6dc658a..f4124da 100644 --- a/swh/vault/cookers/__init__.py +++ b/swh/vault/cookers/__init__.py @@ -1,16 +1,53 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import os +from swh.core.config import load_named_config, read as read_config +from swh.storage import get_storage +from swh.vault import get_vault +from swh.vault.cookers.base import DEFAULT_CONFIG_PATH, DEFAULT_CONFIG from swh.vault.cookers.directory import DirectoryCooker from swh.vault.cookers.revision_flat import RevisionFlatCooker from swh.vault.cookers.revision_gitfast import RevisionGitfastCooker COOKER_TYPES = { 'directory': DirectoryCooker, 'revision_flat': RevisionFlatCooker, 'revision_gitfast': RevisionGitfastCooker, } -get_cooker = COOKER_TYPES.__getitem__ + +def get_cooker_cls(obj_type): + return COOKER_TYPES[obj_type] + + +def get_cooker(obj_type, obj_id): + if 'SWH_CONFIG_FILENAME' in os.environ: + cfg = read_config(os.environ['SWH_CONFIG_FILENAME'], DEFAULT_CONFIG) + else: + cfg = load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG) + cooker_cls = get_cooker_cls(obj_type) + if 'vault' not in cfg: + raise ValueError("missing '%vault' configuration") + + vcfg = cfg['vault'] + if vcfg['cls'] != 'remote': + raise EnvironmentError( + "This vault backend can only be a 'remote' " + "configuration", err=True) + args = vcfg['args'] + if 'storage' not in args: + args['storage'] = cfg.get('storage') + + if not args.get('storage'): + raise ValueError( + "invalid configuration; missing 'storage' config entry.") + + storage = get_storage(**args.pop('storage')) + backend = get_vault(**vcfg) + + return cooker_cls(obj_type, obj_id, + backend=backend, storage=storage, + max_bundle_size=cfg['max_bundle_size']) diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index aea710e..8e42f5f 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,131 +1,131 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import logging -from swh.core import config from swh.model import hashutil -from swh.storage import get_storage -from swh.vault.api.client import RemoteVaultClient - +MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB DEFAULT_CONFIG_PATH = 'vault/cooker' DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/', }, }), - 'vault_url': ('str', 'http://localhost:5005/'), - 'max_bundle_size': ('int', 2 ** 29), # 512 MiB + 'vault': ('dict', { + 'cls': 'remote', + 'args': { + 'url': 'http://localhost:5005/', + }, + }), + 'max_bundle_size': ('int', MAX_BUNDLE_SIZE), } class PolicyError(Exception): """Raised when the bundle violates the cooking policy.""" pass class BundleTooLargeError(PolicyError): """Raised when the bundle is too large to be cooked.""" pass class BytesIOBundleSizeLimit(io.BytesIO): def __init__(self, *args, size_limit=None, **kwargs): super().__init__(*args, **kwargs) self.size_limit = size_limit def write(self, chunk): if ((self.size_limit is not None and self.getbuffer().nbytes + len(chunk) > self.size_limit)): raise BundleTooLargeError( "The requested bundle exceeds the maximum allowed " "size of {} bytes.".format(self.size_limit)) return super().write(chunk) class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle """ CACHE_TYPE_KEY = None - def __init__(self, obj_type, obj_id, *, override_cfg=None): + def __init__(self, obj_type, obj_id, backend, storage, + max_bundle_size=MAX_BUNDLE_SIZE): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: - storage: the storage object - cache: the cache where to store the bundle + obj_type: type of the object to be cooked into a bundle (directory, + revision_flat or revision_gitfast; see + swh.vault.cooker.COOKER_TYPES). obj_id: id of the object to be cooked into a bundle. + backend: the vault backend (swh.vault.backend.VaultBackend). """ - self.config = config.load_named_config(DEFAULT_CONFIG_PATH, - DEFAULT_CONFIG) - if override_cfg is not None: - self.config.update(override_cfg) - self.obj_type = obj_type self.obj_id = hashutil.hash_to_bytes(obj_id) - self.backend = RemoteVaultClient(self.config['vault_url']) - self.storage = get_storage(**self.config['storage']) - self.max_bundle_size = self.config['max_bundle_size'] + self.backend = backend + self.storage = storage + self.max_bundle_size = max_bundle_size @abc.abstractmethod def check_exists(self): """Checks that the requested object exists and can be cooked. Override this in the cooker implementation. """ raise NotImplementedError @abc.abstractmethod def prepare_bundle(self): """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplementedError def write(self, chunk): self.fileobj.write(chunk) def cook(self): """Cook the requested object into a bundle """ self.backend.set_status(self.obj_type, self.obj_id, 'pending') self.backend.set_progress(self.obj_type, self.obj_id, 'Processing...') self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) try: self.prepare_bundle() bundle = self.fileobj.getvalue() # TODO: use proper content streaming instead of put_bundle() self.backend.put_bundle(self.CACHE_TYPE_KEY, self.obj_id, bundle) except PolicyError as e: self.backend.set_status(self.obj_type, self.obj_id, 'failed') self.backend.set_progress(self.obj_type, self.obj_id, str(e)) except Exception: self.backend.set_status(self.obj_type, self.obj_id, 'failed') self.backend.set_progress( self.obj_type, self.obj_id, "Internal Server Error. This incident will be reported.") logging.exception("Bundle cooking failed.") else: self.backend.set_status(self.obj_type, self.obj_id, 'done') self.backend.set_progress(self.obj_type, self.obj_id, None) finally: self.backend.send_notif(self.obj_type, self.obj_id) diff --git a/swh/vault/cooking_tasks.py b/swh/vault/cooking_tasks.py index 5e4254b..4cf9ced 100644 --- a/swh/vault/cooking_tasks.py +++ b/swh/vault/cooking_tasks.py @@ -1,21 +1,21 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import current_app as app from swh.vault.cookers import get_cooker @app.task(name=__name__ + '.SWHCookingTask') def cook_bundle(obj_type, obj_id): """Main task to cook a bundle.""" - get_cooker(obj_type)(obj_type, obj_id).cook() + get_cooker(obj_type, obj_id).cook() # TODO: remove once the scheduler handles priority tasks @app.task(name=__name__ + '.SWHBatchCookingTask') def batch_cook_bundle(obj_type, obj_id): """Temporary task for the batch queue.""" - get_cooker(obj_type)(obj_type, obj_id).cook() + get_cooker(obj_type, obj_id).cook()