Page MenuHomeSoftware Heritage

D1108.id3584.diff
No OneTemporary

D1108.id3584.diff

diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,4 +1,5 @@
-pytest
+pytest < 4
+pytest-postgresql
dulwich >= 0.18.7
swh.loader.git >= 0.0.48
swh.storage[testing]
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -50,6 +50,10 @@
vcversioner={},
include_package_data=True,
zip_safe=False,
+ entry_points='''
+ [console_scripts]
+ swh-vault=swh.vault.cli:main
+ ''',
classifiers=[
"Programming Language :: Python :: 3",
"Intended Audience :: Developers",
diff --git a/swh/vault/__init__.py b/swh/vault/__init__.py
--- a/swh/vault/__init__.py
+++ b/swh/vault/__init__.py
@@ -2,9 +2,12 @@
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import logging
+logger = logging.getLogger(__name__)
-def get_vault(cls, args):
+
+def get_vault(cls='remote', args={}):
"""
Get a vault object of class `vault_class` with arguments
`vault_args`.
@@ -15,16 +18,23 @@
- args (dict): dictionary with keys
Returns:
- an instance of swh.storage.Storage (either local or remote)
+ an instance of VaultBackend (either local or remote)
Raises:
ValueError if passed an unknown storage class.
"""
-
if cls == 'remote':
from .api.client import RemoteVaultClient as Vault
+ elif cls == 'local':
+ from swh.scheduler import get_scheduler
+ from swh.storage import get_storage
+ from swh.vault.cache import VaultCache
+ from swh.vault.backend import VaultBackend as Vault
+ args['cache'] = VaultCache(**args['cache'])
+ args['storage'] = get_storage(**args['storage'])
+ args['scheduler'] = get_scheduler(**args['scheduler'])
else:
raise ValueError('Unknown storage class `%s`' % cls)
-
+ logger.debug('Instantiating %s with %s' % (Vault, args))
return Vault(**args)
diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py
--- a/swh/vault/api/client.py
+++ b/swh/vault/api/client.py
@@ -17,9 +17,9 @@
class RemoteVaultClient(SWHRemoteAPI):
"""Client to the Software Heritage vault cache."""
- def __init__(self, base_url, timeout=None):
+ def __init__(self, url, timeout=None):
super().__init__(
- api_exception=VaultAPIError, url=base_url, timeout=timeout)
+ api_exception=VaultAPIError, url=url, timeout=timeout)
# Web API endpoints
diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py
--- a/swh/vault/api/server.py
+++ b/swh/vault/api/server.py
@@ -5,7 +5,6 @@
import aiohttp.web
import asyncio
-import click
import collections
from swh.core import config
@@ -13,8 +12,9 @@
encode_data_server as encode_data,
decode_request)
from swh.model import hashutil
+from swh.vault import get_vault
from swh.vault.cookers import COOKER_TYPES
-from swh.vault.backend import VaultBackend, NotFoundExc
+from swh.vault.backend import NotFoundExc
DEFAULT_CONFIG_PATH = 'vault/server'
@@ -33,7 +33,12 @@
},
}),
'client_max_size': ('int', 1024 ** 3),
- 'db': ('str', 'dbname=softwareheritage-vault-dev'),
+ 'vault': ('dict', {
+ 'cls': 'local',
+ 'args': {
+ 'db': 'dbname=softwareheritage-vault-dev',
+ },
+ }),
'scheduler': ('dict', {
'cls': 'remote',
'args': {
@@ -168,10 +173,7 @@
# Web server
-def make_app(config, **kwargs):
- if 'client_max_size' in config:
- kwargs['client_max_size'] = config['client_max_size']
-
+def make_app(backend, **kwargs):
app = SWHRemoteAPI(**kwargs)
app.router.add_route('GET', '/', index)
@@ -190,26 +192,40 @@
app.router.add_route('POST', '/batch_cook', batch_cook)
app.router.add_route('GET', '/batch_progress/{batch_id}', batch_progress)
- app['backend'] = VaultBackend(config)
+ app['backend'] = backend
return app
-def make_app_from_configfile(config_path=DEFAULT_CONFIG_PATH, **kwargs):
- cfg = config.load_named_config(config_path, DEFAULT_CONFIG)
- return make_app(cfg, **kwargs)
+def get_local_backend(config_file):
+ cfg = config.read(config_file, DEFAULT_CONFIG)
+ if 'vault' not in cfg:
+ raise ValueError("missing '%vault' configuration")
+
+ vcfg = cfg['vault']
+ if vcfg['cls'] != 'local':
+ raise EnvironmentError(
+ "The vault backend can only be started with a 'local' "
+ "configuration", err=True)
+ args = vcfg['args']
+ if 'cache' not in args:
+ args['cache'] = cfg.get('cache')
+ if 'storage' not in args:
+ args['storage'] = cfg.get('storage')
+ if 'scheduler' not in args:
+ args['scheduler'] = cfg.get('scheduler')
+
+ for key in ('cache', 'storage', 'scheduler'):
+ if not args.get(key):
+ raise ValueError(
+ "invalid configuration; missing %s config entry." % key)
+
+ return get_vault('local', args)
-@click.command()
-@click.argument('config-path', required=1)
-@click.option('--host', default='0.0.0.0', help="Host to run the server")
-@click.option('--port', default=5005, type=click.INT,
- help="Binding port of the server")
-@click.option('--debug/--nodebug', default=True,
- help="Indicates if the server should run in debug mode")
-def launch(config_path, host, port, debug):
- app = make_app(config.read(config_path, DEFAULT_CONFIG), debug=bool(debug))
- aiohttp.web.run_app(app, host=host, port=int(port))
+def make_app_from_configfile(config_file, **kwargs):
+ vault = get_local_backend(config_file)
+ return make_app(backend=vault, **kwargs)
if __name__ == '__main__':
- launch()
+ print('Deprecated. Use swh-vault ')
diff --git a/swh/vault/backend.py b/swh/vault/backend.py
--- a/swh/vault/backend.py
+++ b/swh/vault/backend.py
@@ -4,17 +4,15 @@
# See top-level LICENSE file for more information
import smtplib
-import psycopg2
-from psycopg2.extras import RealDictCursor
+import psycopg2.extras
-from functools import wraps
from email.mime.text import MIMEText
+from swh.core.db import BaseDb
+from swh.core.db.common import db_transaction
from swh.model import hashutil
-from swh.scheduler import get_scheduler
from swh.scheduler.utils import create_oneshot_task_dict
-from swh.vault.cache import VaultCache
-from swh.vault.cookers import get_cooker
+from swh.vault.cookers import get_cooker_cls
cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask'
@@ -69,146 +67,87 @@
for obj_type, obj_id in batch]
-# TODO: Imported from swh.scheduler.backend. Factorization needed.
-def autocommit(fn):
- @wraps(fn)
- def wrapped(self, *args, **kwargs):
- autocommit = False
- # TODO: I don't like using None, it's confusing for the user. how about
- # a NEW_CURSOR object()?
- if 'cursor' not in kwargs or not kwargs['cursor']:
- autocommit = True
- kwargs['cursor'] = self.cursor()
-
- try:
- ret = fn(self, *args, **kwargs)
- except BaseException:
- if autocommit:
- self.rollback()
- raise
-
- if autocommit:
- self.commit()
-
- return ret
-
- return wrapped
-
-
-# TODO: This has to be factorized with other database base classes and helpers
-# (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...)
-# The three first methods are imported from swh.scheduler.backend.
class VaultBackend:
"""
Backend for the Software Heritage vault.
"""
- def __init__(self, config):
+ def __init__(self, db, cache, scheduler, storage=None, **config):
self.config = config
- self.cache = VaultCache(self.config['cache'])
- self.db = None
- self.reconnect()
+ self.cache = cache
+ self.scheduler = scheduler
+ self.storage = storage
self.smtp_server = smtplib.SMTP()
- if self.config['scheduler'] is not None:
- self.scheduler = get_scheduler(**self.config['scheduler'])
-
- def reconnect(self):
- """Reconnect to the database."""
- if not self.db or self.db.closed:
- self.db = psycopg2.connect(
- dsn=self.config['db'],
- cursor_factory=RealDictCursor,
- )
- def close(self):
- """Close the underlying database connection."""
- self.db.close()
-
- def cursor(self):
- """Return a fresh cursor on the database, with auto-reconnection in
- case of failure"""
- cur = None
-
- # Get a fresh cursor and reconnect at most three times
- tries = 0
- while True:
- tries += 1
- try:
- cur = self.db.cursor()
- cur.execute('select 1')
- break
- except psycopg2.OperationalError:
- if tries < 3:
- self.reconnect()
- else:
- raise
- return cur
-
- def commit(self):
- """Commit a transaction"""
- self.db.commit()
-
- def rollback(self):
- """Rollback a transaction"""
- self.db.rollback()
-
- @autocommit
- def task_info(self, obj_type, obj_id, cursor=None):
+ self._pool = psycopg2.pool.ThreadedConnectionPool(
+ config.get('min_pool_conns', 1),
+ config.get('max_pool_conns', 10),
+ db,
+ cursor_factory=psycopg2.extras.RealDictCursor,
+ )
+ self._db = None
+
+ def get_db(self):
+ if self._db:
+ return self._db
+ return BaseDb.from_pool(self._pool)
+
+ @db_transaction()
+ def task_info(self, obj_type, obj_id, db=None, cur=None):
"""Fetch information from a bundle"""
obj_id = hashutil.hash_to_bytes(obj_id)
- cursor.execute('''
+ cur.execute('''
SELECT id, type, object_id, task_id, task_status, sticky,
ts_created, ts_done, ts_last_access, progress_msg
FROM vault_bundle
WHERE type = %s AND object_id = %s''', (obj_type, obj_id))
- res = cursor.fetchone()
+ res = cur.fetchone()
if res:
res['object_id'] = bytes(res['object_id'])
return res
- def _send_task(self, args):
+ def _send_task(self, *args):
"""Send a cooking task to the celery scheduler"""
task = create_oneshot_task_dict('swh-vault-cooking', *args)
added_tasks = self.scheduler.create_tasks([task])
return added_tasks[0]['id']
- @autocommit
- def create_task(self, obj_type, obj_id, sticky=False, cursor=None):
+ @db_transaction()
+ def create_task(self, obj_type, obj_id, sticky=False, db=None, cur=None):
"""Create and send a cooking task"""
obj_id = hashutil.hash_to_bytes(obj_id)
hex_id = hashutil.hash_to_hex(obj_id)
- args = [obj_type, hex_id]
- backend_storage_config = {'storage': self.config['storage']}
- cooker_class = get_cooker(obj_type)
- cooker = cooker_class(*args, override_cfg=backend_storage_config)
+ cooker_class = get_cooker_cls(obj_type)
+ cooker = cooker_class(obj_type, hex_id,
+ backend=self, storage=self.storage)
if not cooker.check_exists():
raise NotFoundExc("Object {} was not found.".format(hex_id))
- cursor.execute('''
+ cur.execute('''
INSERT INTO vault_bundle (type, object_id, sticky)
VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky))
- self.commit()
+ db.conn.commit()
- task_id = self._send_task(args)
+ task_id = self._send_task(obj_type, hex_id)
- cursor.execute('''
+ cur.execute('''
UPDATE vault_bundle
SET task_id = %s
WHERE type = %s AND object_id = %s''', (task_id, obj_type, obj_id))
- @autocommit
- def add_notif_email(self, obj_type, obj_id, email, cursor=None):
+ @db_transaction()
+ def add_notif_email(self, obj_type, obj_id, email, db=None, cur=None):
"""Add an e-mail address to notify when a given bundle is ready"""
obj_id = hashutil.hash_to_bytes(obj_id)
- cursor.execute('''
+ cur.execute('''
INSERT INTO vault_notif_email (email, bundle_id)
VALUES (%s, (SELECT id FROM vault_bundle
WHERE type = %s AND object_id = %s))''',
- (email, obj_type, obj_id))
+ (email, obj_type, obj_id))
- @autocommit
+ @db_transaction()
def cook_request(self, obj_type, obj_id, *, sticky=False,
- email=None, cursor=None):
+ email=None, db=None, cur=None):
"""Main entry point for cooking requests. This starts a cooking task if
needed, and add the given e-mail to the notify list"""
obj_id = hashutil.hash_to_bytes(obj_id)
@@ -216,10 +155,10 @@
# If there's a failed bundle entry, delete it first.
if info is not None and info['task_status'] == 'failed':
- cursor.execute('''DELETE FROM vault_bundle
+ cur.execute('''DELETE FROM vault_bundle
WHERE type = %s AND object_id = %s''',
- (obj_type, obj_id))
- self.commit()
+ (obj_type, obj_id))
+ db.conn.commit()
info = None
# If there's no bundle entry, create the task.
@@ -238,44 +177,44 @@
info = self.task_info(obj_type, obj_id)
return info
- @autocommit
- def batch_cook(self, batch, cursor=None):
+ @db_transaction()
+ def batch_cook(self, batch, db=None, cur=None):
"""Cook a batch of bundles and returns the cooking id."""
# Import execute_values at runtime only, because it requires
# psycopg2 >= 2.7 (only available on postgresql servers)
from psycopg2.extras import execute_values
- cursor.execute('''
+ cur.execute('''
INSERT INTO vault_batch (id)
VALUES (DEFAULT)
RETURNING id''')
- batch_id = cursor.fetchone()['id']
+ batch_id = cur.fetchone()['id']
batch = batch_to_bytes(batch)
# Delete all failed bundles from the batch
- cursor.execute('''
+ cur.execute('''
DELETE FROM vault_bundle
WHERE task_status = 'failed'
AND (type, object_id) IN %s''', (tuple(batch),))
# Insert all the bundles, return the new ones
- execute_values(cursor, '''
+ execute_values(cur, '''
INSERT INTO vault_bundle (type, object_id)
VALUES %s ON CONFLICT DO NOTHING''', batch)
# Get the bundle ids and task status
- cursor.execute('''
+ cur.execute('''
SELECT id, type, object_id, task_id FROM vault_bundle
WHERE (type, object_id) IN %s''', (tuple(batch),))
- bundles = cursor.fetchall()
+ bundles = cur.fetchall()
# Insert the batch-bundle entries
batch_id_bundle_ids = [(batch_id, row['id']) for row in bundles]
- execute_values(cursor, '''
+ execute_values(cur, '''
INSERT INTO vault_batch_bundle (batch_id, bundle_id)
VALUES %s ON CONFLICT DO NOTHING''',
batch_id_bundle_ids)
- self.commit()
+ db.conn.commit()
# Get the tasks to fetch
batch_new = [(row['type'], bytes(row['object_id']))
@@ -296,7 +235,7 @@
in tasks_ids_bundle_ids]
# Update the task ids
- execute_values(cursor, '''
+ execute_values(cur, '''
UPDATE vault_bundle
SET task_id = s_task_id
FROM (VALUES %s) AS sub (s_task_id, s_type, s_object_id)
@@ -304,50 +243,50 @@
tasks_ids_bundle_ids)
return batch_id
- @autocommit
- def batch_info(self, batch_id, cursor=None):
+ @db_transaction()
+ def batch_info(self, batch_id, db=None, cur=None):
"""Fetch information from a batch of bundles"""
- cursor.execute('''
+ cur.execute('''
SELECT vault_bundle.id as id,
type, object_id, task_id, task_status, sticky,
ts_created, ts_done, ts_last_access, progress_msg
FROM vault_batch_bundle
LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id
WHERE batch_id = %s''', (batch_id,))
- res = cursor.fetchall()
+ res = cur.fetchall()
if res:
for d in res:
d['object_id'] = bytes(d['object_id'])
return res
- @autocommit
- def is_available(self, obj_type, obj_id, cursor=None):
+ @db_transaction()
+ def is_available(self, obj_type, obj_id, db=None, cur=None):
"""Check whether a bundle is available for retrieval"""
- info = self.task_info(obj_type, obj_id, cursor=cursor)
+ info = self.task_info(obj_type, obj_id, cur=cur)
return (info is not None
and info['task_status'] == 'done'
and self.cache.is_cached(obj_type, obj_id))
- @autocommit
- def fetch(self, obj_type, obj_id, cursor=None):
+ @db_transaction()
+ def fetch(self, obj_type, obj_id, db=None, cur=None):
"""Retrieve a bundle from the cache"""
- if not self.is_available(obj_type, obj_id, cursor=cursor):
+ if not self.is_available(obj_type, obj_id, cur=cur):
return None
- self.update_access_ts(obj_type, obj_id, cursor=cursor)
+ self.update_access_ts(obj_type, obj_id, cur=cur)
return self.cache.get(obj_type, obj_id)
- @autocommit
- def update_access_ts(self, obj_type, obj_id, cursor=None):
+ @db_transaction()
+ def update_access_ts(self, obj_type, obj_id, db=None, cur=None):
"""Update the last access timestamp of a bundle"""
obj_id = hashutil.hash_to_bytes(obj_id)
- cursor.execute('''
+ cur.execute('''
UPDATE vault_bundle
SET ts_last_access = NOW()
WHERE type = %s AND object_id = %s''',
- (obj_type, obj_id))
+ (obj_type, obj_id))
- @autocommit
- def set_status(self, obj_type, obj_id, status, cursor=None):
+ @db_transaction()
+ def set_status(self, obj_type, obj_id, status, db=None, cur=None):
"""Set the cooking status of a bundle"""
obj_id = hashutil.hash_to_bytes(obj_id)
req = ('''
@@ -355,36 +294,36 @@
SET task_status = %s '''
+ (''', ts_done = NOW() ''' if status == 'done' else '')
+ '''WHERE type = %s AND object_id = %s''')
- cursor.execute(req, (status, obj_type, obj_id))
+ cur.execute(req, (status, obj_type, obj_id))
- @autocommit
- def set_progress(self, obj_type, obj_id, progress, cursor=None):
+ @db_transaction()
+ def set_progress(self, obj_type, obj_id, progress, db=None, cur=None):
"""Set the cooking progress of a bundle"""
obj_id = hashutil.hash_to_bytes(obj_id)
- cursor.execute('''
+ cur.execute('''
UPDATE vault_bundle
SET progress_msg = %s
WHERE type = %s AND object_id = %s''',
- (progress, obj_type, obj_id))
+ (progress, obj_type, obj_id))
- @autocommit
- def send_all_notifications(self, obj_type, obj_id, cursor=None):
+ @db_transaction()
+ def send_all_notifications(self, obj_type, obj_id, db=None, cur=None):
"""Send all the e-mails in the notification list of a bundle"""
obj_id = hashutil.hash_to_bytes(obj_id)
- cursor.execute('''
+ cur.execute('''
SELECT vault_notif_email.id AS id, email, task_status, progress_msg
FROM vault_notif_email
INNER JOIN vault_bundle ON bundle_id = vault_bundle.id
WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''',
- (obj_type, obj_id))
- for d in cursor:
+ (obj_type, obj_id))
+ for d in cur:
self.send_notification(d['id'], d['email'], obj_type, obj_id,
status=d['task_status'],
progress_msg=d['progress_msg'])
- @autocommit
+ @db_transaction()
def send_notification(self, n_id, email, obj_type, obj_id, status,
- progress_msg=None, cursor=None):
+ progress_msg=None, db=None, cur=None):
"""Send the notification of a bundle to a specific e-mail"""
hex_id = hashutil.hash_to_hex(obj_id)
short_id = hex_id[:7]
@@ -421,7 +360,7 @@
self._smtp_send(msg)
if n_id is not None:
- cursor.execute('''
+ cur.execute('''
DELETE FROM vault_notif_email
WHERE id = %s''', (n_id,))
@@ -437,11 +376,11 @@
# Send the message
self.smtp_server.send_message(msg)
- @autocommit
- def _cache_expire(self, cond, *args, cursor=None):
+ @db_transaction()
+ def _cache_expire(self, cond, *args, db=None, cur=None):
"""Low-level expiration method, used by cache_expire_* methods"""
# Embedded SELECT query to be able to use ORDER BY and LIMIT
- cursor.execute('''
+ cur.execute('''
DELETE FROM vault_bundle
WHERE ctid IN (
SELECT ctid
@@ -452,18 +391,18 @@
RETURNING type, object_id
'''.format(cond), args)
- for d in cursor:
+ for d in cur:
self.cache.delete(d['type'], bytes(d['object_id']))
- @autocommit
- def cache_expire_oldest(self, n=1, by='last_access', cursor=None):
+ @db_transaction()
+ def cache_expire_oldest(self, n=1, by='last_access', db=None, cur=None):
"""Expire the `n` oldest bundles"""
assert by in ('created', 'done', 'last_access')
filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n)
return self._cache_expire(filter)
- @autocommit
- def cache_expire_until(self, date, by='last_access', cursor=None):
+ @db_transaction()
+ def cache_expire_until(self, date, by='last_access', db=None, cur=None):
"""Expire all the bundles until a certain date"""
assert by in ('created', 'done', 'last_access')
filter = '''AND ts_{} <= %s'''.format(by)
diff --git a/swh/vault/cache.py b/swh/vault/cache.py
--- a/swh/vault/cache.py
+++ b/swh/vault/cache.py
@@ -15,7 +15,7 @@
internal identifiers used in the underlying objstorage.
"""
- def __init__(self, objstorage):
+ def __init__(self, **objstorage):
self.objstorage = get_objstorage(**objstorage)
def add(self, obj_type, obj_id, content):
diff --git a/swh/vault/cli.py b/swh/vault/cli.py
new file mode 100644
--- /dev/null
+++ b/swh/vault/cli.py
@@ -0,0 +1,50 @@
+import logging
+import click
+import aiohttp
+
+from swh.vault.api.server import make_app_from_configfile
+
+
+CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
+
+
+@click.command(context_settings=CONTEXT_SETTINGS)
+@click.option('--config-file', '-C', default=None,
+ type=click.Path(exists=True, dir_okay=False,),
+ help="Configuration file.")
+@click.option('--log-level', '-l', default='INFO',
+ type=click.Choice(logging._nameToLevel.keys()),
+ help="Log level (default to INFO)")
+@click.option('--no-stdout', is_flag=True, default=False,
+ help="Do NOT output logs on the console")
+@click.option('--host', default='0.0.0.0', help="Host to run the server")
+@click.option('--port', default=5005, type=click.INT,
+ help="Binding port of the server")
+@click.option('--debug/--nodebug', default=True,
+ help="Indicates if the server should run in debug mode")
+@click.pass_context
+def cli(ctx, config_file, log_level, no_stdout, host, port, debug):
+ """Software Heritage Vault API server
+
+ """
+ from swh.scheduler.celery_backend.config import setup_log_handler
+ log_level = setup_log_handler(
+ loglevel=log_level, colorize=False,
+ format='[%(levelname)s] %(name)s -- %(message)s',
+ log_console=not no_stdout)
+
+ try:
+ app = make_app_from_configfile(config_file, debug=debug)
+ except EnvironmentError as e:
+ click.echo(e.msg, err=True)
+ ctx.exit(1)
+
+ aiohttp.web.run_app(app, host=host, port=int(port))
+
+
+def main():
+ return cli(auto_envvar_prefix='SWH_VAULT')
+
+
+if __name__ == '__main__':
+ main()
diff --git a/swh/vault/cookers/__init__.py b/swh/vault/cookers/__init__.py
--- a/swh/vault/cookers/__init__.py
+++ b/swh/vault/cookers/__init__.py
@@ -2,7 +2,12 @@
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import os
+from swh.core.config import load_named_config, read as read_config
+from swh.storage import get_storage
+from swh.vault import get_vault
+from swh.vault.cookers.base import DEFAULT_CONFIG_PATH, DEFAULT_CONFIG
from swh.vault.cookers.directory import DirectoryCooker
from swh.vault.cookers.revision_flat import RevisionFlatCooker
from swh.vault.cookers.revision_gitfast import RevisionGitfastCooker
@@ -13,4 +18,36 @@
'revision_gitfast': RevisionGitfastCooker,
}
-get_cooker = COOKER_TYPES.__getitem__
+
+def get_cooker_cls(obj_type):
+ return COOKER_TYPES[obj_type]
+
+
+def get_cooker(obj_type, obj_id):
+ if 'SWH_CONFIG_FILENAME' in os.environ:
+ cfg = read_config(os.environ['SWH_CONFIG_FILENAME'], DEFAULT_CONFIG)
+ else:
+ cfg = load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG)
+ cooker_cls = get_cooker_cls(obj_type)
+ if 'vault' not in cfg:
+ raise ValueError("missing '%vault' configuration")
+
+ vcfg = cfg['vault']
+ if vcfg['cls'] != 'remote':
+ raise EnvironmentError(
+ "This vault backend can only be a 'remote' "
+ "configuration", err=True)
+ args = vcfg['args']
+ if 'storage' not in args:
+ args['storage'] = cfg.get('storage')
+
+ if not args.get('storage'):
+ raise ValueError(
+ "invalid configuration; missing 'storage' config entry.")
+
+ storage = get_storage(**args.pop('storage'))
+ backend = get_vault(**vcfg)
+
+ return cooker_cls(obj_type, obj_id,
+ backend=backend, storage=storage,
+ max_bundle_size=cfg['max_bundle_size'])
diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py
--- a/swh/vault/cookers/base.py
+++ b/swh/vault/cookers/base.py
@@ -7,12 +7,9 @@
import io
import logging
-from swh.core import config
from swh.model import hashutil
-from swh.storage import get_storage
-from swh.vault.api.client import RemoteVaultClient
-
+MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB
DEFAULT_CONFIG_PATH = 'vault/cooker'
DEFAULT_CONFIG = {
'storage': ('dict', {
@@ -21,8 +18,13 @@
'url': 'http://localhost:5002/',
},
}),
- 'vault_url': ('str', 'http://localhost:5005/'),
- 'max_bundle_size': ('int', 2 ** 29), # 512 MiB
+ 'vault': ('dict', {
+ 'cls': 'remote',
+ 'args': {
+ 'url': 'http://localhost:5005/',
+ },
+ }),
+ 'max_bundle_size': ('int', MAX_BUNDLE_SIZE),
}
@@ -61,7 +63,8 @@
"""
CACHE_TYPE_KEY = None
- def __init__(self, obj_type, obj_id, *, override_cfg=None):
+ def __init__(self, obj_type, obj_id, backend, storage,
+ max_bundle_size=MAX_BUNDLE_SIZE):
"""Initialize the cooker.
The type of the object represented by the id depends on the
@@ -69,20 +72,17 @@
own cooker class.
Args:
- storage: the storage object
- cache: the cache where to store the bundle
+ obj_type: type of the object to be cooked into a bundle (directory,
+ revision_flat or revision_gitfast; see
+ swh.vault.cooker.COOKER_TYPES).
obj_id: id of the object to be cooked into a bundle.
+ backend: the vault backend (swh.vault.backend.VaultBackend).
"""
- self.config = config.load_named_config(DEFAULT_CONFIG_PATH,
- DEFAULT_CONFIG)
- if override_cfg is not None:
- self.config.update(override_cfg)
-
self.obj_type = obj_type
self.obj_id = hashutil.hash_to_bytes(obj_id)
- self.backend = RemoteVaultClient(self.config['vault_url'])
- self.storage = get_storage(**self.config['storage'])
- self.max_bundle_size = self.config['max_bundle_size']
+ self.backend = backend
+ self.storage = storage
+ self.max_bundle_size = max_bundle_size
@abc.abstractmethod
def check_exists(self):
diff --git a/swh/vault/cooking_tasks.py b/swh/vault/cooking_tasks.py
--- a/swh/vault/cooking_tasks.py
+++ b/swh/vault/cooking_tasks.py
@@ -11,11 +11,11 @@
@app.task(name=__name__ + '.SWHCookingTask')
def cook_bundle(obj_type, obj_id):
"""Main task to cook a bundle."""
- get_cooker(obj_type)(obj_type, obj_id).cook()
+ get_cooker(obj_type, obj_id).cook()
# TODO: remove once the scheduler handles priority tasks
@app.task(name=__name__ + '.SWHBatchCookingTask')
def batch_cook_bundle(obj_type, obj_id):
"""Temporary task for the batch queue."""
- get_cooker(obj_type)(obj_type, obj_id).cook()
+ get_cooker(obj_type, obj_id).cook()
diff --git a/swh/vault/tests/conftest.py b/swh/vault/tests/conftest.py
new file mode 100644
--- /dev/null
+++ b/swh/vault/tests/conftest.py
@@ -0,0 +1,80 @@
+import pytest
+import glob
+import os
+import pkg_resources.extern.packaging.version
+
+from swh.core.utils import numfile_sortkey as sortkey
+from swh.vault import get_vault
+from swh.vault.tests import SQL_DIR
+from swh.storage.tests import SQL_DIR as STORAGE_SQL_DIR
+from pytest_postgresql import factories
+
+
+pytest_v = pkg_resources.get_distribution("pytest").parsed_version
+if pytest_v < pkg_resources.extern.packaging.version.parse('3.9'):
+ @pytest.fixture
+ def tmp_path(request):
+ import tempfile
+ import pathlib
+ with tempfile.TemporaryDirectory() as tmpdir:
+ yield pathlib.Path(tmpdir)
+
+
+def db_url(name, postgresql_proc):
+ return 'postgresql://{user}@{host}:{port}/{dbname}'.format(
+ host=postgresql_proc.host,
+ port=postgresql_proc.port,
+ user='postgres',
+ dbname=name)
+
+
+postgresql2 = factories.postgresql('postgresql_proc', 'tests2')
+
+
+@pytest.fixture
+def swh_vault(request, postgresql_proc, postgresql, postgresql2, tmp_path):
+
+ for sql_dir, pg in ((SQL_DIR, postgresql), (STORAGE_SQL_DIR, postgresql2)):
+ dump_files = os.path.join(sql_dir, '*.sql')
+ all_dump_files = sorted(glob.glob(dump_files), key=sortkey)
+
+ cursor = pg.cursor()
+ for fname in all_dump_files:
+ with open(fname) as fobj:
+ # disable concurrent index creation since we run in a
+ # transaction
+ cursor.execute(fobj.read().replace('concurrently', ''))
+ pg.commit()
+
+ vault_config = {
+ 'db': db_url('tests', postgresql_proc),
+ 'storage': {
+ 'cls': 'local',
+ 'args': {
+ 'db': db_url('tests2', postgresql_proc),
+ 'objstorage': {
+ 'cls': 'pathslicing',
+ 'args': {
+ 'root': str(tmp_path),
+ 'slicing': '0:1/1:5',
+ },
+ },
+ },
+ },
+ 'cache': {
+ 'cls': 'pathslicing',
+ 'args': {
+ 'root': str(tmp_path),
+ 'slicing': '0:1/1:5',
+ 'allow_delete': True,
+ }
+ },
+ 'scheduler': {
+ 'cls': 'remote',
+ 'args': {
+ 'url': 'http://swh-scheduler:5008',
+ },
+ },
+ }
+
+ return get_vault('local', vault_config)
diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py
--- a/swh/vault/tests/test_backend.py
+++ b/swh/vault/tests/test_backend.py
@@ -6,49 +6,50 @@
import contextlib
import datetime
import psycopg2
-import unittest
-from unittest.mock import patch
+from unittest.mock import patch, MagicMock
+import pytest
from swh.model import hashutil
-from swh.vault.tests.vault_testing import VaultTestFixture, hash_content
-
-
-class BaseTestBackend(VaultTestFixture):
- @contextlib.contextmanager
- def mock_cooking(self):
- with patch.object(self.vault_backend, '_send_task') as mt:
- mt.return_value = 42
- with patch('swh.vault.backend.get_cooker') as mg:
- mcc = unittest.mock.MagicMock()
- mc = unittest.mock.MagicMock()
- mg.return_value = mcc
- mcc.return_value = mc
- mc.check_exists.return_value = True
-
- yield {'send_task': mt,
- 'get_cooker': mg,
- 'cooker_cls': mcc,
- 'cooker': mc}
-
- def assertTimestampAlmostNow(self, ts, tolerance_secs=1.0): # noqa
- now = datetime.datetime.now(datetime.timezone.utc)
- creation_delta_secs = (ts - now).total_seconds()
- self.assertLess(creation_delta_secs, tolerance_secs)
-
- def fake_cook(self, obj_type, result_content, sticky=False):
- content, obj_id = hash_content(result_content)
- with self.mock_cooking():
- self.vault_backend.create_task(obj_type, obj_id, sticky)
- self.vault_backend.cache.add(obj_type, obj_id, b'content')
- self.vault_backend.set_status(obj_type, obj_id, 'done')
- return obj_id, content
-
- def fail_cook(self, obj_type, obj_id, failure_reason):
- with self.mock_cooking():
- self.vault_backend.create_task(obj_type, obj_id)
- self.vault_backend.set_status(obj_type, obj_id, 'failed')
- self.vault_backend.set_progress(obj_type, obj_id, failure_reason)
+from swh.vault.tests.vault_testing import hash_content
+
+
+@contextlib.contextmanager
+def mock_cooking(vault_backend):
+ with patch.object(vault_backend, '_send_task') as mt:
+ mt.return_value = 42
+ with patch('swh.vault.backend.get_cooker') as mg:
+ mcc = MagicMock()
+ mc = MagicMock()
+ mg.return_value = mcc
+ mcc.return_value = mc
+ mc.check_exists.return_value = True
+
+ yield {'_send_task': mt,
+ 'get_cooker': mg,
+ 'cooker_cls': mcc,
+ 'cooker': mc}
+
+def assertTimestampAlmostNow(ts, tolerance_secs=1.0): # noqa
+ now = datetime.datetime.now(datetime.timezone.utc)
+ creation_delta_secs = (ts - now).total_seconds()
+ assert creation_delta_secs < tolerance_secs
+
+
+def fake_cook(backend, obj_type, result_content, sticky=False):
+ content, obj_id = hash_content(result_content)
+ with mock_cooking(backend):
+ backend.create_task(obj_type, obj_id, sticky)
+ backend.cache.add(obj_type, obj_id, b'content')
+ backend.set_status(obj_type, obj_id, 'done')
+ return obj_id, content
+
+
+def fail_cook(backend, obj_type, obj_id, failure_reason):
+ with mock_cooking(backend):
+ backend.create_task(obj_type, obj_id)
+ backend.set_status(obj_type, obj_id, 'failed')
+ backend.set_progress(obj_type, obj_id, failure_reason)
TEST_TYPE = 'revision_gitfast'
@@ -56,272 +57,281 @@
TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID)
TEST_PROGRESS = ("Mr. White, You're telling me you're cooking again?"
" \N{ASTONISHED FACE} ")
-TEST_EMAIL = 'ouiche@example.com'
-
-
-class TestBackend(BaseTestBackend, unittest.TestCase):
- def test_create_task_simple(self):
- with self.mock_cooking() as m:
- self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID)
-
- m['get_cooker'].assert_called_once_with(TEST_TYPE)
-
- args = m['cooker_cls'].call_args[0]
- self.assertEqual(args[0], TEST_TYPE)
- self.assertEqual(args[1], TEST_HEX_ID)
-
- self.assertEqual(m['cooker'].check_exists.call_count, 1)
-
- self.assertEqual(m['send_task'].call_count, 1)
- args = m['send_task'].call_args[0][0]
- self.assertEqual(args[0], TEST_TYPE)
- self.assertEqual(args[1], TEST_HEX_ID)
-
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['object_id'], TEST_OBJ_ID)
- self.assertEqual(info['type'], TEST_TYPE)
- self.assertEqual(info['task_status'], 'new')
- self.assertEqual(info['task_id'], 42)
-
- self.assertTimestampAlmostNow(info['ts_created'])
-
- self.assertEqual(info['ts_done'], None)
- self.assertEqual(info['progress_msg'], None)
-
- def test_create_fail_duplicate_task(self):
- with self.mock_cooking():
- self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID)
- with self.assertRaises(psycopg2.IntegrityError):
- self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID)
-
- def test_create_fail_nonexisting_object(self):
- with self.mock_cooking() as m:
- m['cooker'].check_exists.side_effect = ValueError('Nothing here.')
- with self.assertRaises(ValueError):
- self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID)
-
- def test_create_set_progress(self):
- with self.mock_cooking():
- self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID)
-
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['progress_msg'], None)
- self.vault_backend.set_progress(TEST_TYPE, TEST_OBJ_ID,
- TEST_PROGRESS)
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['progress_msg'], TEST_PROGRESS)
-
- def test_create_set_status(self):
- with self.mock_cooking():
- self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID)
-
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['task_status'], 'new')
- self.assertEqual(info['ts_done'], None)
-
- self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'pending')
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['task_status'], 'pending')
- self.assertEqual(info['ts_done'], None)
-
- self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done')
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['task_status'], 'done')
- self.assertTimestampAlmostNow(info['ts_done'])
-
- def test_create_update_access_ts(self):
- with self.mock_cooking():
- self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID)
-
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- access_ts_1 = info['ts_last_access']
- self.assertTimestampAlmostNow(access_ts_1)
-
- self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID)
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- access_ts_2 = info['ts_last_access']
- self.assertTimestampAlmostNow(access_ts_2)
-
- self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID)
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- access_ts_3 = info['ts_last_access']
- self.assertTimestampAlmostNow(access_ts_3)
-
- self.assertLess(access_ts_1, access_ts_2)
- self.assertLess(access_ts_2, access_ts_3)
-
- def test_cook_request_idempotent(self):
- with self.mock_cooking():
- info1 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID)
- info2 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID)
- info3 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info1, info2)
- self.assertEqual(info1, info3)
-
- def test_cook_email_pending_done(self):
- with self.mock_cooking(), \
- patch.object(self.vault_backend, 'add_notif_email') as madd, \
- patch.object(self.vault_backend, 'send_notification') as msend:
-
- self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID)
- madd.assert_not_called()
- msend.assert_not_called()
-
- madd.reset_mock()
- msend.reset_mock()
-
- self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID,
- email=TEST_EMAIL)
- madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL)
- msend.assert_not_called()
-
- madd.reset_mock()
- msend.reset_mock()
-
- self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done')
- self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID,
- email=TEST_EMAIL)
- msend.assert_called_once_with(None, TEST_EMAIL,
- TEST_TYPE, TEST_OBJ_ID, 'done')
- madd.assert_not_called()
-
- def test_send_all_emails(self):
- with self.mock_cooking():
- emails = ('a@example.com',
- 'billg@example.com',
- 'test+42@example.org')
- for email in emails:
- self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID,
- email=email)
-
- self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done')
-
- with patch.object(self.vault_backend, 'smtp_server') as m:
- self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID)
-
- sent_emails = {k[0][0] for k in m.send_message.call_args_list}
- self.assertEqual({k['To'] for k in sent_emails}, set(emails))
-
- for e in sent_emails:
- self.assertIn('info@softwareheritage.org', e['From'])
- self.assertIn(TEST_TYPE, e['Subject'])
- self.assertIn(TEST_HEX_ID[:5], e['Subject'])
- self.assertIn(TEST_TYPE, str(e))
- self.assertIn('https://archive.softwareheritage.org/', str(e))
- self.assertIn(TEST_HEX_ID[:5], str(e))
- self.assertIn('--\x20\n', str(e)) # Well-formated signature!!!
-
- # Check that the entries have been deleted and recalling the
- # function does not re-send the e-mails
- m.reset_mock()
- self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID)
- m.assert_not_called()
-
- def test_available(self):
- self.assertFalse(self.vault_backend.is_available(TEST_TYPE,
- TEST_OBJ_ID))
- with self.mock_cooking():
- self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID)
- self.assertFalse(self.vault_backend.is_available(TEST_TYPE,
- TEST_OBJ_ID))
- self.vault_backend.cache.add(TEST_TYPE, TEST_OBJ_ID, b'content')
- self.assertFalse(self.vault_backend.is_available(TEST_TYPE,
- TEST_OBJ_ID))
- self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done')
- self.assertTrue(self.vault_backend.is_available(TEST_TYPE,
- TEST_OBJ_ID))
-
- def test_fetch(self):
- self.assertEqual(self.vault_backend.fetch(TEST_TYPE, TEST_OBJ_ID),
- None)
- obj_id, content = self.fake_cook(TEST_TYPE, b'content')
-
- info = self.vault_backend.task_info(TEST_TYPE, obj_id)
- access_ts_before = info['ts_last_access']
-
- self.assertEqual(self.vault_backend.fetch(TEST_TYPE, obj_id),
- b'content')
-
- info = self.vault_backend.task_info(TEST_TYPE, obj_id)
- access_ts_after = info['ts_last_access']
-
- self.assertTimestampAlmostNow(access_ts_after)
- self.assertLess(access_ts_before, access_ts_after)
-
- def test_cache_expire_oldest(self):
- r = range(1, 10)
- inserted = {}
- for i in r:
- sticky = (i == 5)
- content = b'content%s' % str(i).encode()
- obj_id, content = self.fake_cook(TEST_TYPE, content, sticky)
- inserted[i] = (obj_id, content)
-
- self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0])
- self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0])
- self.vault_backend.cache_expire_oldest(n=4)
-
- should_be_still_here = {2, 3, 5, 8, 9}
- for i in r:
- self.assertEqual(self.vault_backend.is_available(
- TEST_TYPE, inserted[i][0]), i in should_be_still_here)
-
- def test_cache_expire_until(self):
- r = range(1, 10)
- inserted = {}
- for i in r:
- sticky = (i == 5)
- content = b'content%s' % str(i).encode()
- obj_id, content = self.fake_cook(TEST_TYPE, content, sticky)
- inserted[i] = (obj_id, content)
-
- if i == 7:
- cutoff_date = datetime.datetime.now()
-
- self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0])
- self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0])
- self.vault_backend.cache_expire_until(date=cutoff_date)
-
- should_be_still_here = {2, 3, 5, 8, 9}
- for i in r:
- self.assertEqual(self.vault_backend.is_available(
- TEST_TYPE, inserted[i][0]), i in should_be_still_here)
-
- def test_fail_cook_simple(self):
- self.fail_cook(TEST_TYPE, TEST_OBJ_ID, 'error42')
- self.assertFalse(self.vault_backend.is_available(TEST_TYPE,
- TEST_OBJ_ID))
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['progress_msg'], 'error42')
-
- def test_send_failure_email(self):
- with self.mock_cooking():
- self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID,
- email='a@example.com')
-
- self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'failed')
- self.vault_backend.set_progress(TEST_TYPE, TEST_OBJ_ID, 'test error')
-
- with patch.object(self.vault_backend, 'smtp_server') as m:
- self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID)
-
- e = [k[0][0] for k in m.send_message.call_args_list][0]
- self.assertEqual(e['To'], 'a@example.com')
-
- self.assertIn('info@softwareheritage.org', e['From'])
- self.assertIn(TEST_TYPE, e['Subject'])
- self.assertIn(TEST_HEX_ID[:5], e['Subject'])
- self.assertIn('fail', e['Subject'])
- self.assertIn(TEST_TYPE, str(e))
- self.assertIn(TEST_HEX_ID[:5], str(e))
- self.assertIn('test error', str(e))
- self.assertIn('--\x20\n', str(e)) # Well-formated signature
-
- def test_retry_failed_bundle(self):
- self.fail_cook(TEST_TYPE, TEST_OBJ_ID, 'error42')
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['task_status'], 'failed')
- with self.mock_cooking():
- self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID)
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['task_status'], 'new')
+TEST_EMAIL = 'ouiche@lorraine.fr'
+
+
+def test_create_task_simple(swh_vault):
+ with mock_cooking(swh_vault) as m:
+ swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID)
+
+ m['get_cooker'].assert_called_once_with(TEST_TYPE)
+
+ args = m['cooker_cls'].call_args[0]
+ assert args[0] == TEST_TYPE
+ assert args[1] == TEST_HEX_ID
+
+ assert m['cooker'].check_exists.call_count == 1
+ assert m['_send_task'].call_count == 1
+
+ args = m['_send_task'].call_args[0]
+ assert args[0] == TEST_TYPE
+ assert args[1] == TEST_HEX_ID
+
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['object_id'] == TEST_OBJ_ID
+ assert info['type'] == TEST_TYPE
+ assert info['task_status'] == 'new'
+ assert info['task_id'] == 42
+
+ assertTimestampAlmostNow(info['ts_created'])
+
+ assert info['ts_done'] is None
+ assert info['progress_msg'] is None
+
+
+def test_create_fail_duplicate_task(swh_vault):
+ with mock_cooking(swh_vault):
+ swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID)
+ with pytest.raises(psycopg2.IntegrityError):
+ swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID)
+
+
+def test_create_fail_nonexisting_object(swh_vault):
+ with mock_cooking(swh_vault) as m:
+ m['cooker'].check_exists.side_effect = ValueError('Nothing here.')
+ with pytest.raises(ValueError):
+ swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID)
+
+
+def test_create_set_progress(swh_vault):
+ with mock_cooking(swh_vault):
+ swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID)
+
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['progress_msg'] is None
+ swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID,
+ TEST_PROGRESS)
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['progress_msg'] == TEST_PROGRESS
+
+
+def test_create_set_status(swh_vault):
+ with mock_cooking(swh_vault):
+ swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID)
+
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['task_status'] == 'new'
+ assert info['ts_done'] is None
+
+ swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'pending')
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['task_status'] == 'pending'
+ assert info['ts_done'] is None
+
+ swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done')
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['task_status'] == 'done'
+ assertTimestampAlmostNow(info['ts_done'])
+
+
+def test_create_update_access_ts(swh_vault):
+ with mock_cooking(swh_vault):
+ swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID)
+
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ access_ts_1 = info['ts_last_access']
+ assertTimestampAlmostNow(access_ts_1)
+
+ swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID)
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ access_ts_2 = info['ts_last_access']
+ assertTimestampAlmostNow(access_ts_2)
+
+ swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID)
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ access_ts_3 = info['ts_last_access']
+ assertTimestampAlmostNow(access_ts_3)
+
+ assert access_ts_1 < access_ts_2
+ assert access_ts_2 < access_ts_3
+
+
+def test_cook_request_idempotent(swh_vault):
+ with mock_cooking(swh_vault):
+ info1 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID)
+ info2 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID)
+ info3 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID)
+ assert info1 == info2
+ assert info1 == info3
+
+
+def test_cook_email_pending_done(swh_vault):
+ with mock_cooking(swh_vault), \
+ patch.object(swh_vault, 'add_notif_email') as madd, \
+ patch.object(swh_vault, 'send_notification') as msend:
+
+ swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID)
+ madd.assert_not_called()
+ msend.assert_not_called()
+
+ madd.reset_mock()
+ msend.reset_mock()
+
+ swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID,
+ email=TEST_EMAIL)
+ madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL)
+ msend.assert_not_called()
+
+ madd.reset_mock()
+ msend.reset_mock()
+
+ swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done')
+ swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID,
+ email=TEST_EMAIL)
+ msend.assert_called_once_with(None, TEST_EMAIL,
+ TEST_TYPE, TEST_OBJ_ID, 'done')
+ madd.assert_not_called()
+
+
+def test_send_all_emails(swh_vault):
+ with mock_cooking(swh_vault):
+ emails = ('a@example.com',
+ 'billg@example.com',
+ 'test+42@example.org')
+ for email in emails:
+ swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID,
+ email=email)
+
+ swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done')
+
+ with patch.object(swh_vault, 'smtp_server') as m:
+ swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID)
+
+ sent_emails = {k[0][0] for k in m.send_message.call_args_list}
+ assert {k['To'] for k in sent_emails} == set(emails)
+
+ for e in sent_emails:
+ assert 'info@softwareheritage.org' in e['From']
+ assert TEST_TYPE in e['Subject']
+ assert TEST_HEX_ID[:5] in e['Subject']
+ assert TEST_TYPE in str(e)
+ assert 'https://archive.softwareheritage.org/' in str(e)
+ assert TEST_HEX_ID[:5] in str(e)
+ assert '--\x20\n' in str(e) # Well-formated signature!!!
+
+ # Check that the entries have been deleted and recalling the
+ # function does not re-send the e-mails
+ m.reset_mock()
+ swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID)
+ m.assert_not_called()
+
+
+def test_available(swh_vault):
+ assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID)
+
+ with mock_cooking(swh_vault):
+ swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID)
+ assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID)
+
+ swh_vault.cache.add(TEST_TYPE, TEST_OBJ_ID, b'content')
+ assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID)
+
+ swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done')
+ assert swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID)
+
+
+def test_fetch(swh_vault):
+ assert swh_vault.fetch(TEST_TYPE, TEST_OBJ_ID) is None
+ obj_id, content = fake_cook(swh_vault, TEST_TYPE, b'content')
+
+ info = swh_vault.task_info(TEST_TYPE, obj_id)
+ access_ts_before = info['ts_last_access']
+
+ assert swh_vault.fetch(TEST_TYPE, obj_id) == b'content'
+
+ info = swh_vault.task_info(TEST_TYPE, obj_id)
+ access_ts_after = info['ts_last_access']
+
+ assertTimestampAlmostNow(access_ts_after)
+ assert access_ts_before < access_ts_after
+
+
+def test_cache_expire_oldest(swh_vault):
+ r = range(1, 10)
+ inserted = {}
+ for i in r:
+ sticky = (i == 5)
+ content = b'content%s' % str(i).encode()
+ obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky)
+ inserted[i] = (obj_id, content)
+
+ swh_vault.update_access_ts(TEST_TYPE, inserted[2][0])
+ swh_vault.update_access_ts(TEST_TYPE, inserted[3][0])
+ swh_vault.cache_expire_oldest(n=4)
+
+ should_be_still_here = {2, 3, 5, 8, 9}
+ for i in r:
+ assert swh_vault.is_available(
+ TEST_TYPE, inserted[i][0]) == (i in should_be_still_here)
+
+
+def test_cache_expire_until(swh_vault):
+ r = range(1, 10)
+ inserted = {}
+ for i in r:
+ sticky = (i == 5)
+ content = b'content%s' % str(i).encode()
+ obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky)
+ inserted[i] = (obj_id, content)
+
+ if i == 7:
+ cutoff_date = datetime.datetime.now()
+
+ swh_vault.update_access_ts(TEST_TYPE, inserted[2][0])
+ swh_vault.update_access_ts(TEST_TYPE, inserted[3][0])
+ swh_vault.cache_expire_until(date=cutoff_date)
+
+ should_be_still_here = {2, 3, 5, 8, 9}
+ for i in r:
+ assert swh_vault.is_available(
+ TEST_TYPE, inserted[i][0]) == (i in should_be_still_here)
+
+
+def test_fail_cook_simple(swh_vault):
+ fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, 'error42')
+ assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID)
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['progress_msg'] == 'error42'
+
+
+def test_send_failure_email(swh_vault):
+ with mock_cooking(swh_vault):
+ swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email='a@example.com')
+
+ swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'failed')
+ swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, 'test error')
+
+ with patch.object(swh_vault, 'smtp_server') as m:
+ swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID)
+
+ e = [k[0][0] for k in m.send_message.call_args_list][0]
+ assert e['To'] == 'a@example.com'
+
+ assert 'info@softwareheritage.org' in e['From']
+ assert TEST_TYPE in e['Subject']
+ assert TEST_HEX_ID[:5] in e['Subject']
+ assert 'fail' in e['Subject']
+ assert TEST_TYPE in str(e)
+ assert TEST_HEX_ID[:5] in str(e)
+ assert 'test error' in str(e)
+ assert '--\x20\n' in str(e) # Well-formated signature
+
+
+def test_retry_failed_bundle(swh_vault):
+ fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, 'error42')
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['task_status'] == 'failed'
+ with mock_cooking(swh_vault):
+ swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID)
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['task_status'] == 'new'
diff --git a/swh/vault/tests/test_cache.py b/swh/vault/tests/test_cache.py
--- a/swh/vault/tests/test_cache.py
+++ b/swh/vault/tests/test_cache.py
@@ -3,10 +3,8 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import unittest
from swh.model import hashutil
-from swh.vault.tests.vault_testing import VaultTestFixture
TEST_TYPE_1 = 'revision_gitfast'
TEST_TYPE_2 = 'directory'
@@ -21,54 +19,51 @@
TEST_CONTENT_2 = b'test content 2'
-class BaseTestVaultCache(VaultTestFixture):
- def setUp(self):
- super().setUp()
- self.cache = self.vault_backend.cache # little shortcut
-
-
-class TestVaultCache(BaseTestVaultCache, unittest.TestCase):
- # Let's try to avoid replicating edge-cases already tested in
- # swh-objstorage, and instead focus on testing behaviors specific to the
- # Vault cache here.
-
- def test_internal_id(self):
- sid = self.cache._get_internal_id(TEST_TYPE_1, TEST_OBJ_ID_1)
- self.assertEqual(hashutil.hash_to_hex(sid),
- '6829cda55b54c295aa043a611a4e0320239988d9')
-
- def test_simple_add_get(self):
- self.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1)
- self.assertEqual(self.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1),
- TEST_CONTENT_1)
- self.assertTrue(self.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1))
-
- def test_different_type_same_id(self):
- self.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1)
- self.cache.add(TEST_TYPE_2, TEST_OBJ_ID_1, TEST_CONTENT_2)
- self.assertEqual(self.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1),
- TEST_CONTENT_1)
- self.assertEqual(self.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1),
- TEST_CONTENT_2)
- self.assertTrue(self.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1))
- self.assertTrue(self.cache.is_cached(TEST_TYPE_2, TEST_OBJ_ID_1))
-
- def test_different_type_same_content(self):
- self.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1)
- self.cache.add(TEST_TYPE_2, TEST_OBJ_ID_1, TEST_CONTENT_1)
- self.assertEqual(self.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1),
- TEST_CONTENT_1)
- self.assertEqual(self.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1),
- TEST_CONTENT_1)
- self.assertTrue(self.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1))
- self.assertTrue(self.cache.is_cached(TEST_TYPE_2, TEST_OBJ_ID_1))
-
- def test_different_id_same_type(self):
- self.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1)
- self.cache.add(TEST_TYPE_1, TEST_OBJ_ID_2, TEST_CONTENT_2)
- self.assertEqual(self.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1),
- TEST_CONTENT_1)
- self.assertEqual(self.cache.get(TEST_TYPE_1, TEST_OBJ_ID_2),
- TEST_CONTENT_2)
- self.assertTrue(self.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1))
- self.assertTrue(self.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_2))
+# Let's try to avoid replicating edge-cases already tested in
+# swh-objstorage, and instead focus on testing behaviors specific to the
+# Vault cache here.
+
+def test_internal_id(swh_vault):
+ sid = swh_vault.cache._get_internal_id(TEST_TYPE_1, TEST_OBJ_ID_1)
+ assert hashutil.hash_to_hex(sid) == \
+ '6829cda55b54c295aa043a611a4e0320239988d9'
+
+
+def test_simple_add_get(swh_vault):
+ swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1)
+ assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == \
+ TEST_CONTENT_1
+ assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1)
+
+
+def test_different_type_same_id(swh_vault):
+ swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1)
+ swh_vault.cache.add(TEST_TYPE_2, TEST_OBJ_ID_1, TEST_CONTENT_2)
+ assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == \
+ TEST_CONTENT_1
+ assert swh_vault.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1) == \
+ TEST_CONTENT_2
+ assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1)
+ assert swh_vault.cache.is_cached(TEST_TYPE_2, TEST_OBJ_ID_1)
+
+
+def test_different_type_same_content(swh_vault):
+ swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1)
+ swh_vault.cache.add(TEST_TYPE_2, TEST_OBJ_ID_1, TEST_CONTENT_1)
+ assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == \
+ TEST_CONTENT_1
+ assert swh_vault.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1) == \
+ TEST_CONTENT_1
+ assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1)
+ assert swh_vault.cache.is_cached(TEST_TYPE_2, TEST_OBJ_ID_1)
+
+
+def test_different_id_same_type(swh_vault):
+ swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1)
+ swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_2, TEST_CONTENT_2)
+ assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == \
+ TEST_CONTENT_1
+ assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_2) == \
+ TEST_CONTENT_2
+ assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1)
+ assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_2)
diff --git a/swh/vault/tests/test_cookers.py b/swh/vault/tests/test_cookers.py
--- a/swh/vault/tests/test_cookers.py
+++ b/swh/vault/tests/test_cookers.py
@@ -26,7 +26,7 @@
from swh.model import hashutil
from swh.model.from_disk import Directory
from swh.vault.cookers import DirectoryCooker, RevisionGitfastCooker
-from swh.vault.tests.vault_testing import VaultTestFixture, hash_content
+from swh.vault.tests.vault_testing import hash_content
from swh.vault.to_disk import SKIPPED_MESSAGE, HIDDEN_MESSAGE
@@ -101,61 +101,60 @@
self.git_shell(*args, stdout=None)
-@pytest.mark.config_issue
-class BaseTestCookers(VaultTestFixture):
- """Base class of cookers unit tests"""
- def setUp(self):
- super().setUp()
- self.loader = GitLoaderFromDisk()
- self.loader.storage = self.storage
-
- def tearDown(self):
- self.loader = None
- super().tearDown()
-
- def load(self, repo_path):
- """Load a repository in the test storage"""
- self.loader.load('fake_origin', repo_path, datetime.datetime.now())
-
- @contextlib.contextmanager
- def cook_extract_directory(self, obj_id):
- """Context manager that cooks a directory and extract it."""
- cooker = DirectoryCooker('directory', obj_id)
- cooker.storage = self.storage
- cooker.backend = unittest.mock.MagicMock()
- cooker.fileobj = io.BytesIO()
- assert cooker.check_exists()
- cooker.prepare_bundle()
- cooker.fileobj.seek(0)
- with tempfile.TemporaryDirectory(prefix='tmp-vault-extract-') as td:
- with tarfile.open(fileobj=cooker.fileobj, mode='r') as tar:
- tar.extractall(td)
- yield pathlib.Path(td) / hashutil.hash_to_hex(obj_id)
- cooker.storage = None
-
- @contextlib.contextmanager
- def cook_stream_revision_gitfast(self, obj_id):
- """Context manager that cooks a revision and stream its fastexport."""
- cooker = RevisionGitfastCooker('revision_gitfast', obj_id)
- cooker.storage = self.storage
- cooker.backend = unittest.mock.MagicMock()
- cooker.fileobj = io.BytesIO()
- assert cooker.check_exists()
- cooker.prepare_bundle()
- cooker.fileobj.seek(0)
- fastexport_stream = gzip.GzipFile(fileobj=cooker.fileobj)
- yield fastexport_stream
- cooker.storage = None
-
- @contextlib.contextmanager
- def cook_extract_revision_gitfast(self, obj_id):
- """Context manager that cooks a revision and extract it."""
- test_repo = TestRepo()
- with self.cook_stream_revision_gitfast(obj_id) as stream, \
- test_repo as p:
- processor = dulwich.fastexport.GitImportProcessor(test_repo.repo)
- processor.import_stream(stream)
- yield test_repo, p
+@pytest.fixture
+def swh_git_loader(swh_vault):
+ loader = GitLoaderFromDisk()
+ loader.storage = swh_vault.storage
+ return loader
+
+
+def load(loader, repo_path):
+ """Load a repository in the test storage"""
+ loader.load('fake_origin', repo_path, datetime.datetime.now())
+
+
+@contextlib.contextmanager
+def cook_extract_directory(storage, obj_id):
+ """Context manager that cooks a directory and extract it."""
+ backend = unittest.mock.MagicMock()
+ backend.storage = storage
+ cooker = DirectoryCooker('directory', obj_id, backend=backend)
+ cooker.fileobj = io.BytesIO()
+ assert cooker.check_exists()
+ cooker.prepare_bundle()
+ cooker.fileobj.seek(0)
+ with tempfile.TemporaryDirectory(prefix='tmp-vault-extract-') as td:
+ with tarfile.open(fileobj=cooker.fileobj, mode='r') as tar:
+ tar.extractall(td)
+ yield pathlib.Path(td) / hashutil.hash_to_hex(obj_id)
+ cooker.storage = None
+
+
+@contextlib.contextmanager
+def cook_stream_revision_gitfast(storage, obj_id):
+ """Context manager that cooks a revision and stream its fastexport."""
+ backend = unittest.mock.MagicMock()
+ backend.storage = storage
+ cooker = RevisionGitfastCooker(
+ 'revision_gitfast', obj_id, backend=backend)
+ cooker.fileobj = io.BytesIO()
+ assert cooker.check_exists()
+ cooker.prepare_bundle()
+ cooker.fileobj.seek(0)
+ fastexport_stream = gzip.GzipFile(fileobj=cooker.fileobj)
+ yield fastexport_stream
+ cooker.storage = None
+
+
+@contextlib.contextmanager
+def cook_extract_revision_gitfast(storage, obj_id):
+ """Context manager that cooks a revision and extract it."""
+ test_repo = TestRepo()
+ with cook_stream_revision_gitfast(storage, obj_id) as stream, \
+ test_repo as p:
+ processor = dulwich.fastexport.GitImportProcessor(test_repo.repo)
+ processor.import_stream(stream)
+ yield test_repo, p
TEST_CONTENT = (" test content\n"
@@ -164,8 +163,8 @@
TEST_EXECUTABLE = b'\x42\x40\x00\x00\x05'
-class TestDirectoryCooker(BaseTestCookers, unittest.TestCase):
- def test_directory_simple(self):
+class TestDirectoryCooker:
+ def test_directory_simple(self, swh_git_loader):
repo = TestRepo()
with repo as rp:
(rp / 'file').write_text(TEST_CONTENT)
@@ -175,25 +174,25 @@
(rp / 'dir1/dir2').mkdir(parents=True)
(rp / 'dir1/dir2/file').write_text(TEST_CONTENT)
c = repo.commit()
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
obj_id_hex = repo.repo[c].tree.decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
- with self.cook_extract_directory(obj_id) as p:
- self.assertEqual((p / 'file').stat().st_mode, 0o100644)
- self.assertEqual((p / 'file').read_text(), TEST_CONTENT)
- self.assertEqual((p / 'executable').stat().st_mode, 0o100755)
- self.assertEqual((p / 'executable').read_bytes(), TEST_EXECUTABLE)
- self.assertTrue((p / 'link').is_symlink)
- self.assertEqual(os.readlink(str(p / 'link')), 'file')
- self.assertEqual((p / 'dir1/dir2/file').stat().st_mode, 0o100644)
- self.assertEqual((p / 'dir1/dir2/file').read_text(), TEST_CONTENT)
+ with cook_extract_directory(swh_git_loader.storage, obj_id) as p:
+ assert (p / 'file').stat().st_mode == 0o100644
+ assert (p / 'file').read_text() == TEST_CONTENT
+ assert (p / 'executable').stat().st_mode == 0o100755
+ assert (p / 'executable').read_bytes() == TEST_EXECUTABLE
+ assert (p / 'link').is_symlink
+ assert os.readlink(str(p / 'link')) == 'file'
+ assert (p / 'dir1/dir2/file').stat().st_mode == 0o100644
+ assert (p / 'dir1/dir2/file').read_text() == TEST_CONTENT
directory = Directory.from_disk(path=bytes(p))
- self.assertEqual(obj_id_hex, hashutil.hash_to_hex(directory.hash))
+ assert obj_id_hex == hashutil.hash_to_hex(directory.hash)
- def test_directory_filtered_objects(self):
+ def test_directory_filtered_objects(self, swh_git_loader):
repo = TestRepo()
with repo as rp:
file_1, id_1 = hash_content(b'test1')
@@ -205,14 +204,14 @@
(rp / 'absent_file').write_bytes(file_3)
c = repo.commit()
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
obj_id_hex = repo.repo[c].tree.decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
# FIXME: storage.content_update() should be changed to allow things
# like that
- with self.storage.get_db().transaction() as cur:
+ with swh_git_loader.storage.get_db().transaction() as cur:
cur.execute("""update content set status = 'visible'
where sha1 = %s""", (id_1,))
cur.execute("""update content set status = 'hidden'
@@ -220,12 +219,12 @@
cur.execute("""update content set status = 'absent'
where sha1 = %s""", (id_3,))
- with self.cook_extract_directory(obj_id) as p:
- self.assertEqual((p / 'file').read_bytes(), b'test1')
- self.assertEqual((p / 'hidden_file').read_bytes(), HIDDEN_MESSAGE)
- self.assertEqual((p / 'absent_file').read_bytes(), SKIPPED_MESSAGE)
+ with cook_extract_directory(swh_git_loader.storage, obj_id) as p:
+ assert (p / 'file').read_bytes() == b'test1'
+ assert (p / 'hidden_file').read_bytes() == HIDDEN_MESSAGE
+ assert (p / 'absent_file').read_bytes() == SKIPPED_MESSAGE
- def test_directory_bogus_perms(self):
+ def test_directory_bogus_perms(self, swh_git_loader):
# Some early git repositories have 664/775 permissions... let's check
# if all the weird modes are properly normalized in the directory
# cooker.
@@ -238,17 +237,17 @@
(rp / 'wat').write_text(TEST_CONTENT)
(rp / 'wat').chmod(0o604)
c = repo.commit()
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
obj_id_hex = repo.repo[c].tree.decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
- with self.cook_extract_directory(obj_id) as p:
- self.assertEqual((p / 'file').stat().st_mode, 0o100644)
- self.assertEqual((p / 'executable').stat().st_mode, 0o100755)
- self.assertEqual((p / 'wat').stat().st_mode, 0o100644)
+ with cook_extract_directory(swh_git_loader.storage, obj_id) as p:
+ assert (p / 'file').stat().st_mode == 0o100644
+ assert (p / 'executable').stat().st_mode == 0o100755
+ assert (p / 'wat').stat().st_mode == 0o100644
- def test_directory_revision_data(self):
+ def test_directory_revision_data(self, swh_git_loader):
target_rev = '0e8a3ad980ec179856012b7eecf4327e99cd44cd'
d = hashutil.hash_to_bytes('17a3e48bce37be5226490e750202ad3a9a1a3fe9')
@@ -263,18 +262,19 @@
}
],
}
- self.storage.directory_add([dir])
+ swh_git_loader.storage.directory_add([dir])
- with self.cook_extract_directory(d) as p:
- self.assertTrue((p / 'submodule').is_symlink())
- self.assertEqual(os.readlink(str(p / 'submodule')), target_rev)
+ with cook_extract_directory(swh_git_loader.storage, d) as p:
+ assert (p / 'submodule').is_symlink()
+ assert os.readlink(str(p / 'submodule')) == target_rev
-class TestRevisionGitfastCooker(BaseTestCookers, unittest.TestCase):
- def test_revision_simple(self):
+class TestRevisionGitfastCooker:
+ def test_revision_simple(self, swh_git_loader):
#
# 1--2--3--4--5--6--7
#
+ storage = swh_git_loader.storage
repo = TestRepo()
with repo as rp:
(rp / 'file1').write_text(TEST_CONTENT)
@@ -293,28 +293,29 @@
repo.commit('remove file2')
(rp / 'bin1').rename(rp / 'bin')
repo.commit('rename bin1 to bin')
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
obj_id_hex = repo.repo.refs[b'HEAD'].decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
- with self.cook_extract_revision_gitfast(obj_id) as (ert, p):
+ with cook_extract_revision_gitfast(storage, obj_id) as (ert, p):
ert.checkout(b'HEAD')
- self.assertEqual((p / 'file1').stat().st_mode, 0o100644)
- self.assertEqual((p / 'file1').read_text(), TEST_CONTENT)
- self.assertTrue((p / 'link1').is_symlink)
- self.assertEqual(os.readlink(str(p / 'link1')), 'file1')
- self.assertEqual((p / 'bin').stat().st_mode, 0o100755)
- self.assertEqual((p / 'bin').read_bytes(), TEST_EXECUTABLE)
- self.assertEqual((p / 'dir1/dir2/file').read_text(), TEST_CONTENT)
- self.assertEqual((p / 'dir1/dir2/file').stat().st_mode, 0o100644)
- self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex)
-
- def test_revision_two_roots(self):
+ assert (p / 'file1').stat().st_mode == 0o100644
+ assert (p / 'file1').read_text() == TEST_CONTENT
+ assert (p / 'link1').is_symlink
+ assert os.readlink(str(p / 'link1')) == 'file1'
+ assert (p / 'bin').stat().st_mode == 0o100755
+ assert (p / 'bin').read_bytes() == TEST_EXECUTABLE
+ assert (p / 'dir1/dir2/file').read_text() == TEST_CONTENT
+ assert (p / 'dir1/dir2/file').stat().st_mode == 0o100644
+ assert ert.repo.refs[b'HEAD'].decode() == obj_id_hex
+
+ def test_revision_two_roots(self, swh_git_loader):
#
# 1----3---4
# /
# 2----
#
+ storage = swh_git_loader.storage
repo = TestRepo()
with repo as rp:
(rp / 'file1').write_text(TEST_CONTENT)
@@ -327,17 +328,18 @@
repo.commit('add file3')
obj_id_hex = repo.repo.refs[b'HEAD'].decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
- with self.cook_extract_revision_gitfast(obj_id) as (ert, p):
- self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex)
+ with cook_extract_revision_gitfast(storage, obj_id) as (ert, p):
+ assert ert.repo.refs[b'HEAD'].decode() == obj_id_hex
- def test_revision_two_double_fork_merge(self):
+ def test_revision_two_double_fork_merge(self, swh_git_loader):
#
# 2---4---6
# / / /
# 1---3---5
#
+ storage = swh_git_loader.storage
repo = TestRepo()
with repo as rp:
(rp / 'file1').write_text(TEST_CONTENT)
@@ -360,12 +362,12 @@
obj_id_hex = repo.repo.refs[b'HEAD'].decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
- with self.cook_extract_revision_gitfast(obj_id) as (ert, p):
- self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex)
+ with cook_extract_revision_gitfast(storage, obj_id) as (ert, p):
+ assert ert.repo.refs[b'HEAD'].decode() == obj_id_hex
- def test_revision_triple_merge(self):
+ def test_revision_triple_merge(self, swh_git_loader):
#
# .---.---5
# / / /
@@ -373,6 +375,7 @@
# / / /
# 1---.---.
#
+ storage = swh_git_loader.storage
repo = TestRepo()
with repo as rp:
(rp / 'file1').write_text(TEST_CONTENT)
@@ -387,12 +390,13 @@
obj_id_hex = repo.repo.refs[b'HEAD'].decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
- with self.cook_extract_revision_gitfast(obj_id) as (ert, p):
- self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex)
+ with cook_extract_revision_gitfast(storage, obj_id) as (ert, p):
+ assert ert.repo.refs[b'HEAD'].decode() == obj_id_hex
- def test_revision_filtered_objects(self):
+ def test_revision_filtered_objects(self, swh_git_loader):
+ storage = swh_git_loader.storage
repo = TestRepo()
with repo as rp:
file_1, id_1 = hash_content(b'test1')
@@ -406,11 +410,11 @@
repo.commit()
obj_id_hex = repo.repo.refs[b'HEAD'].decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
# FIXME: storage.content_update() should be changed to allow things
# like that
- with self.storage.get_db().transaction() as cur:
+ with storage.get_db().transaction() as cur:
cur.execute("""update content set status = 'visible'
where sha1 = %s""", (id_1,))
cur.execute("""update content set status = 'hidden'
@@ -418,16 +422,17 @@
cur.execute("""update content set status = 'absent'
where sha1 = %s""", (id_3,))
- with self.cook_extract_revision_gitfast(obj_id) as (ert, p):
+ with cook_extract_revision_gitfast(storage, obj_id) as (ert, p):
ert.checkout(b'HEAD')
- self.assertEqual((p / 'file').read_bytes(), b'test1')
- self.assertEqual((p / 'hidden_file').read_bytes(), HIDDEN_MESSAGE)
- self.assertEqual((p / 'absent_file').read_bytes(), SKIPPED_MESSAGE)
+ assert (p / 'file').read_bytes() == b'test1'
+ assert (p / 'hidden_file').read_bytes() == HIDDEN_MESSAGE
+ assert (p / 'absent_file').read_bytes() == SKIPPED_MESSAGE
- def test_revision_bogus_perms(self):
+ def test_revision_bogus_perms(self, swh_git_loader):
# Some early git repositories have 664/775 permissions... let's check
# if all the weird modes are properly normalized in the revision
# cooker.
+ storage = swh_git_loader.storage
repo = TestRepo()
with repo as rp:
(rp / 'file').write_text(TEST_CONTENT)
@@ -437,24 +442,25 @@
(rp / 'wat').write_text(TEST_CONTENT)
(rp / 'wat').chmod(0o604)
repo.commit('initial commit')
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
obj_id_hex = repo.repo.refs[b'HEAD'].decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
- with self.cook_extract_revision_gitfast(obj_id) as (ert, p):
+ with cook_extract_revision_gitfast(storage, obj_id) as (ert, p):
ert.checkout(b'HEAD')
- self.assertEqual((p / 'file').stat().st_mode, 0o100644)
- self.assertEqual((p / 'executable').stat().st_mode, 0o100755)
- self.assertEqual((p / 'wat').stat().st_mode, 0o100644)
+ assert (p / 'file').stat().st_mode == 0o100644
+ assert (p / 'executable').stat().st_mode == 0o100755
+ assert (p / 'wat').stat().st_mode == 0o100644
- def test_revision_null_fields(self):
+ def test_revision_null_fields(self, swh_git_loader):
# Our schema doesn't enforce a lot of non-null revision fields. We need
# to check these cases don't break the cooker.
+ storage = swh_git_loader.storage
repo = TestRepo()
with repo as rp:
(rp / 'file').write_text(TEST_CONTENT)
c = repo.commit('initial commit')
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
repo.repo.refs[b'HEAD'].decode()
dir_id_hex = repo.repo[c].tree.decode()
dir_id = hashutil.hash_to_bytes(dir_id_hex)
@@ -474,13 +480,14 @@
'synthetic': True
}
- self.storage.revision_add([test_revision])
+ storage.revision_add([test_revision])
- with self.cook_extract_revision_gitfast(test_id) as (ert, p):
+ with cook_extract_revision_gitfast(storage, test_id) as (ert, p):
ert.checkout(b'HEAD')
- self.assertEqual((p / 'file').stat().st_mode, 0o100644)
+ assert (p / 'file').stat().st_mode == 0o100644
- def test_revision_revision_data(self):
+ def test_revision_revision_data(self, swh_git_loader):
+ storage = swh_git_loader.storage
target_rev = '0e8a3ad980ec179856012b7eecf4327e99cd44cd'
d = hashutil.hash_to_bytes('17a3e48bce37be5226490e750202ad3a9a1a3fe9')
r = hashutil.hash_to_bytes('1ecc9270c4fc61cfddbc65a774e91ef5c425a6f0')
@@ -496,7 +503,7 @@
}
],
}
- self.storage.directory_add([dir])
+ storage.directory_add([dir])
rev = {
'id': r,
@@ -511,8 +518,8 @@
'metadata': {},
'synthetic': True
}
- self.storage.revision_add([rev])
+ storage.revision_add([rev])
- with self.cook_stream_revision_gitfast(r) as stream:
+ with cook_stream_revision_gitfast(storage, r) as stream:
pattern = 'M 160000 {} submodule'.format(target_rev).encode()
- self.assertIn(pattern, stream.read())
+ assert pattern in stream.read()
diff --git a/swh/vault/tests/test_cookers_base.py b/swh/vault/tests/test_cookers_base.py
--- a/swh/vault/tests/test_cookers_base.py
+++ b/swh/vault/tests/test_cookers_base.py
@@ -3,7 +3,6 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import unittest
from unittest.mock import MagicMock
from swh.model import hashutil
@@ -22,10 +21,15 @@
class BaseVaultCookerMock(BaseVaultCooker):
CACHE_TYPE_KEY = TEST_OBJ_TYPE
- def __init__(self, *args, **kwargs):
- super().__init__(self.CACHE_TYPE_KEY, TEST_OBJ_ID, *args, **kwargs)
+ def __init__(self):
+ # we do not call super() here to bypass the building of db objects from
+ # config since we do mock these db objects
+ self.config = {}
self.storage = MagicMock()
self.backend = MagicMock()
+ self.obj_type = self.CACHE_TYPE_KEY
+ self.obj_id = hashutil.hash_to_bytes(TEST_OBJ_ID)
+ self.max_bundle_size = 1024
def check_exists(self):
return True
@@ -35,44 +39,43 @@
self.write(chunk)
-class TestBaseVaultCooker(unittest.TestCase):
- def test_simple_cook(self):
- cooker = BaseVaultCookerMock()
- cooker.cook()
- cooker.backend.put_bundle.assert_called_once_with(
- TEST_OBJ_TYPE, TEST_OBJ_ID, TEST_BUNDLE_CONTENT)
- cooker.backend.set_status.assert_called_with(
- TEST_OBJ_TYPE, TEST_OBJ_ID, 'done')
- cooker.backend.set_progress.assert_called_with(
- TEST_OBJ_TYPE, TEST_OBJ_ID, None)
- cooker.backend.send_notif.assert_called_with(
- TEST_OBJ_TYPE, TEST_OBJ_ID)
-
- def test_code_exception_cook(self):
- cooker = BaseVaultCookerMock()
- cooker.prepare_bundle = MagicMock()
- cooker.prepare_bundle.side_effect = RuntimeError("Nope")
- cooker.cook()
-
- # Potentially remove this when we have objstorage streaming
- cooker.backend.put_bundle.assert_not_called()
-
- cooker.backend.set_status.assert_called_with(
- TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed')
- self.assertNotIn("Nope", cooker.backend.set_progress.call_args[0][2])
- cooker.backend.send_notif.assert_called_with(
- TEST_OBJ_TYPE, TEST_OBJ_ID)
-
- def test_policy_exception_cook(self):
- cooker = BaseVaultCookerMock()
- cooker.max_bundle_size = 8
- cooker.cook()
-
- # Potentially remove this when we have objstorage streaming
- cooker.backend.put_bundle.assert_not_called()
-
- cooker.backend.set_status.assert_called_with(
- TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed')
- self.assertIn("exceeds", cooker.backend.set_progress.call_args[0][2])
- cooker.backend.send_notif.assert_called_with(
- TEST_OBJ_TYPE, TEST_OBJ_ID)
+def test_simple_cook():
+ cooker = BaseVaultCookerMock()
+ cooker.cook()
+ cooker.backend.put_bundle.assert_called_once_with(
+ TEST_OBJ_TYPE, TEST_OBJ_ID, TEST_BUNDLE_CONTENT)
+ cooker.backend.set_status.assert_called_with(
+ TEST_OBJ_TYPE, TEST_OBJ_ID, 'done')
+ cooker.backend.set_progress.assert_called_with(
+ TEST_OBJ_TYPE, TEST_OBJ_ID, None)
+ cooker.backend.send_notif.assert_called_with(
+ TEST_OBJ_TYPE, TEST_OBJ_ID)
+
+
+def test_code_exception_cook():
+ cooker = BaseVaultCookerMock()
+ cooker.prepare_bundle = MagicMock()
+ cooker.prepare_bundle.side_effect = RuntimeError("Nope")
+ cooker.cook()
+
+ # Potentially remove this when we have objstorage streaming
+ cooker.backend.put_bundle.assert_not_called()
+
+ cooker.backend.set_status.assert_called_with(
+ TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed')
+ assert "Nope" not in cooker.backend.set_progress.call_args[0][2]
+ cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID)
+
+
+def test_policy_exception_cook():
+ cooker = BaseVaultCookerMock()
+ cooker.max_bundle_size = 8
+ cooker.cook()
+
+ # Potentially remove this when we have objstorage streaming
+ cooker.backend.put_bundle.assert_not_called()
+
+ cooker.backend.set_status.assert_called_with(
+ TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed')
+ assert "exceeds" in cooker.backend.set_progress.call_args[0][2]
+ cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID)
diff --git a/swh/vault/tests/vault_testing.py b/swh/vault/tests/vault_testing.py
--- a/swh/vault/tests/vault_testing.py
+++ b/swh/vault/tests/vault_testing.py
@@ -3,60 +3,7 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import os
-import tempfile
-
from swh.model import hashutil
-from swh.vault.backend import VaultBackend
-
-from swh.storage.tests.storage_testing import StorageTestFixture
-from swh.vault.tests import SQL_DIR
-
-
-class VaultTestFixture(StorageTestFixture):
- """Mix this in a test subject class to get Vault Database testing support.
-
- This fixture requires to come before DbTestFixture and StorageTestFixture
- in the inheritance list as it uses their methods to setup its own internal
- components.
-
- Usage example:
-
- class TestVault(VaultTestFixture, unittest.TestCase):
- ...
- """
- TEST_DB_NAME = 'softwareheritage-test-vault'
- TEST_DB_DUMP = [StorageTestFixture.TEST_DB_DUMP,
- os.path.join(SQL_DIR, '*.sql')]
-
- def setUp(self):
- super().setUp()
- self.cache_root = tempfile.TemporaryDirectory('vault-cache-')
- self.vault_config = {
- 'storage': self.storage_config,
- 'cache': {
- 'cls': 'pathslicing',
- 'args': {
- 'root': self.cache_root.name,
- 'slicing': '0:1/1:5',
- 'allow_delete': True,
- }
- },
- 'db': 'postgresql:///' + self.TEST_DB_NAME,
- 'scheduler': None,
- }
- self.vault_backend = VaultBackend(self.vault_config)
-
- def tearDown(self):
- self.cache_root.cleanup()
- self.vault_backend.close()
- self.reset_storage_tables()
- self.reset_vault_tables()
- super().tearDown()
-
- def reset_vault_tables(self):
- excluded = {'dbversion'}
- self.reset_db_tables(self.TEST_DB_NAME, excluded=excluded)
def hash_content(content):
diff --git a/tox.ini b/tox.ini
--- a/tox.ini
+++ b/tox.ini
@@ -5,9 +5,8 @@
deps =
.[testing]
pytest-cov
- pifpaf
commands =
- pifpaf run postgresql -- pytest --cov=swh --cov-branch {posargs} -m 'not config_issue'
+ pytest --cov=swh --cov-branch {posargs}
[testenv:flake8]
skip_install = true

File Metadata

Mime Type
text/plain
Expires
Dec 19 2024, 11:05 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3232091

Event Timeline