Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123741
D1108.id3584.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
88 KB
Subscribers
None
D1108.id3584.diff
View Options
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,4 +1,5 @@
-pytest
+pytest < 4
+pytest-postgresql
dulwich >= 0.18.7
swh.loader.git >= 0.0.48
swh.storage[testing]
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -50,6 +50,10 @@
vcversioner={},
include_package_data=True,
zip_safe=False,
+ entry_points='''
+ [console_scripts]
+ swh-vault=swh.vault.cli:main
+ ''',
classifiers=[
"Programming Language :: Python :: 3",
"Intended Audience :: Developers",
diff --git a/swh/vault/__init__.py b/swh/vault/__init__.py
--- a/swh/vault/__init__.py
+++ b/swh/vault/__init__.py
@@ -2,9 +2,12 @@
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import logging
+logger = logging.getLogger(__name__)
-def get_vault(cls, args):
+
+def get_vault(cls='remote', args={}):
"""
Get a vault object of class `vault_class` with arguments
`vault_args`.
@@ -15,16 +18,23 @@
- args (dict): dictionary with keys
Returns:
- an instance of swh.storage.Storage (either local or remote)
+ an instance of VaultBackend (either local or remote)
Raises:
ValueError if passed an unknown storage class.
"""
-
if cls == 'remote':
from .api.client import RemoteVaultClient as Vault
+ elif cls == 'local':
+ from swh.scheduler import get_scheduler
+ from swh.storage import get_storage
+ from swh.vault.cache import VaultCache
+ from swh.vault.backend import VaultBackend as Vault
+ args['cache'] = VaultCache(**args['cache'])
+ args['storage'] = get_storage(**args['storage'])
+ args['scheduler'] = get_scheduler(**args['scheduler'])
else:
raise ValueError('Unknown storage class `%s`' % cls)
-
+ logger.debug('Instantiating %s with %s' % (Vault, args))
return Vault(**args)
diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py
--- a/swh/vault/api/client.py
+++ b/swh/vault/api/client.py
@@ -17,9 +17,9 @@
class RemoteVaultClient(SWHRemoteAPI):
"""Client to the Software Heritage vault cache."""
- def __init__(self, base_url, timeout=None):
+ def __init__(self, url, timeout=None):
super().__init__(
- api_exception=VaultAPIError, url=base_url, timeout=timeout)
+ api_exception=VaultAPIError, url=url, timeout=timeout)
# Web API endpoints
diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py
--- a/swh/vault/api/server.py
+++ b/swh/vault/api/server.py
@@ -5,7 +5,6 @@
import aiohttp.web
import asyncio
-import click
import collections
from swh.core import config
@@ -13,8 +12,9 @@
encode_data_server as encode_data,
decode_request)
from swh.model import hashutil
+from swh.vault import get_vault
from swh.vault.cookers import COOKER_TYPES
-from swh.vault.backend import VaultBackend, NotFoundExc
+from swh.vault.backend import NotFoundExc
DEFAULT_CONFIG_PATH = 'vault/server'
@@ -33,7 +33,12 @@
},
}),
'client_max_size': ('int', 1024 ** 3),
- 'db': ('str', 'dbname=softwareheritage-vault-dev'),
+ 'vault': ('dict', {
+ 'cls': 'local',
+ 'args': {
+ 'db': 'dbname=softwareheritage-vault-dev',
+ },
+ }),
'scheduler': ('dict', {
'cls': 'remote',
'args': {
@@ -168,10 +173,7 @@
# Web server
-def make_app(config, **kwargs):
- if 'client_max_size' in config:
- kwargs['client_max_size'] = config['client_max_size']
-
+def make_app(backend, **kwargs):
app = SWHRemoteAPI(**kwargs)
app.router.add_route('GET', '/', index)
@@ -190,26 +192,40 @@
app.router.add_route('POST', '/batch_cook', batch_cook)
app.router.add_route('GET', '/batch_progress/{batch_id}', batch_progress)
- app['backend'] = VaultBackend(config)
+ app['backend'] = backend
return app
-def make_app_from_configfile(config_path=DEFAULT_CONFIG_PATH, **kwargs):
- cfg = config.load_named_config(config_path, DEFAULT_CONFIG)
- return make_app(cfg, **kwargs)
+def get_local_backend(config_file):
+ cfg = config.read(config_file, DEFAULT_CONFIG)
+ if 'vault' not in cfg:
+ raise ValueError("missing '%vault' configuration")
+
+ vcfg = cfg['vault']
+ if vcfg['cls'] != 'local':
+ raise EnvironmentError(
+ "The vault backend can only be started with a 'local' "
+ "configuration", err=True)
+ args = vcfg['args']
+ if 'cache' not in args:
+ args['cache'] = cfg.get('cache')
+ if 'storage' not in args:
+ args['storage'] = cfg.get('storage')
+ if 'scheduler' not in args:
+ args['scheduler'] = cfg.get('scheduler')
+
+ for key in ('cache', 'storage', 'scheduler'):
+ if not args.get(key):
+ raise ValueError(
+ "invalid configuration; missing %s config entry." % key)
+
+ return get_vault('local', args)
-@click.command()
-@click.argument('config-path', required=1)
-@click.option('--host', default='0.0.0.0', help="Host to run the server")
-@click.option('--port', default=5005, type=click.INT,
- help="Binding port of the server")
-@click.option('--debug/--nodebug', default=True,
- help="Indicates if the server should run in debug mode")
-def launch(config_path, host, port, debug):
- app = make_app(config.read(config_path, DEFAULT_CONFIG), debug=bool(debug))
- aiohttp.web.run_app(app, host=host, port=int(port))
+def make_app_from_configfile(config_file, **kwargs):
+ vault = get_local_backend(config_file)
+ return make_app(backend=vault, **kwargs)
if __name__ == '__main__':
- launch()
+ print('Deprecated. Use swh-vault ')
diff --git a/swh/vault/backend.py b/swh/vault/backend.py
--- a/swh/vault/backend.py
+++ b/swh/vault/backend.py
@@ -4,17 +4,15 @@
# See top-level LICENSE file for more information
import smtplib
-import psycopg2
-from psycopg2.extras import RealDictCursor
+import psycopg2.extras
-from functools import wraps
from email.mime.text import MIMEText
+from swh.core.db import BaseDb
+from swh.core.db.common import db_transaction
from swh.model import hashutil
-from swh.scheduler import get_scheduler
from swh.scheduler.utils import create_oneshot_task_dict
-from swh.vault.cache import VaultCache
-from swh.vault.cookers import get_cooker
+from swh.vault.cookers import get_cooker_cls
cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask'
@@ -69,146 +67,87 @@
for obj_type, obj_id in batch]
-# TODO: Imported from swh.scheduler.backend. Factorization needed.
-def autocommit(fn):
- @wraps(fn)
- def wrapped(self, *args, **kwargs):
- autocommit = False
- # TODO: I don't like using None, it's confusing for the user. how about
- # a NEW_CURSOR object()?
- if 'cursor' not in kwargs or not kwargs['cursor']:
- autocommit = True
- kwargs['cursor'] = self.cursor()
-
- try:
- ret = fn(self, *args, **kwargs)
- except BaseException:
- if autocommit:
- self.rollback()
- raise
-
- if autocommit:
- self.commit()
-
- return ret
-
- return wrapped
-
-
-# TODO: This has to be factorized with other database base classes and helpers
-# (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...)
-# The three first methods are imported from swh.scheduler.backend.
class VaultBackend:
"""
Backend for the Software Heritage vault.
"""
- def __init__(self, config):
+ def __init__(self, db, cache, scheduler, storage=None, **config):
self.config = config
- self.cache = VaultCache(self.config['cache'])
- self.db = None
- self.reconnect()
+ self.cache = cache
+ self.scheduler = scheduler
+ self.storage = storage
self.smtp_server = smtplib.SMTP()
- if self.config['scheduler'] is not None:
- self.scheduler = get_scheduler(**self.config['scheduler'])
-
- def reconnect(self):
- """Reconnect to the database."""
- if not self.db or self.db.closed:
- self.db = psycopg2.connect(
- dsn=self.config['db'],
- cursor_factory=RealDictCursor,
- )
- def close(self):
- """Close the underlying database connection."""
- self.db.close()
-
- def cursor(self):
- """Return a fresh cursor on the database, with auto-reconnection in
- case of failure"""
- cur = None
-
- # Get a fresh cursor and reconnect at most three times
- tries = 0
- while True:
- tries += 1
- try:
- cur = self.db.cursor()
- cur.execute('select 1')
- break
- except psycopg2.OperationalError:
- if tries < 3:
- self.reconnect()
- else:
- raise
- return cur
-
- def commit(self):
- """Commit a transaction"""
- self.db.commit()
-
- def rollback(self):
- """Rollback a transaction"""
- self.db.rollback()
-
- @autocommit
- def task_info(self, obj_type, obj_id, cursor=None):
+ self._pool = psycopg2.pool.ThreadedConnectionPool(
+ config.get('min_pool_conns', 1),
+ config.get('max_pool_conns', 10),
+ db,
+ cursor_factory=psycopg2.extras.RealDictCursor,
+ )
+ self._db = None
+
+ def get_db(self):
+ if self._db:
+ return self._db
+ return BaseDb.from_pool(self._pool)
+
+ @db_transaction()
+ def task_info(self, obj_type, obj_id, db=None, cur=None):
"""Fetch information from a bundle"""
obj_id = hashutil.hash_to_bytes(obj_id)
- cursor.execute('''
+ cur.execute('''
SELECT id, type, object_id, task_id, task_status, sticky,
ts_created, ts_done, ts_last_access, progress_msg
FROM vault_bundle
WHERE type = %s AND object_id = %s''', (obj_type, obj_id))
- res = cursor.fetchone()
+ res = cur.fetchone()
if res:
res['object_id'] = bytes(res['object_id'])
return res
- def _send_task(self, args):
+ def _send_task(self, *args):
"""Send a cooking task to the celery scheduler"""
task = create_oneshot_task_dict('swh-vault-cooking', *args)
added_tasks = self.scheduler.create_tasks([task])
return added_tasks[0]['id']
- @autocommit
- def create_task(self, obj_type, obj_id, sticky=False, cursor=None):
+ @db_transaction()
+ def create_task(self, obj_type, obj_id, sticky=False, db=None, cur=None):
"""Create and send a cooking task"""
obj_id = hashutil.hash_to_bytes(obj_id)
hex_id = hashutil.hash_to_hex(obj_id)
- args = [obj_type, hex_id]
- backend_storage_config = {'storage': self.config['storage']}
- cooker_class = get_cooker(obj_type)
- cooker = cooker_class(*args, override_cfg=backend_storage_config)
+ cooker_class = get_cooker_cls(obj_type)
+ cooker = cooker_class(obj_type, hex_id,
+ backend=self, storage=self.storage)
if not cooker.check_exists():
raise NotFoundExc("Object {} was not found.".format(hex_id))
- cursor.execute('''
+ cur.execute('''
INSERT INTO vault_bundle (type, object_id, sticky)
VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky))
- self.commit()
+ db.conn.commit()
- task_id = self._send_task(args)
+ task_id = self._send_task(obj_type, hex_id)
- cursor.execute('''
+ cur.execute('''
UPDATE vault_bundle
SET task_id = %s
WHERE type = %s AND object_id = %s''', (task_id, obj_type, obj_id))
- @autocommit
- def add_notif_email(self, obj_type, obj_id, email, cursor=None):
+ @db_transaction()
+ def add_notif_email(self, obj_type, obj_id, email, db=None, cur=None):
"""Add an e-mail address to notify when a given bundle is ready"""
obj_id = hashutil.hash_to_bytes(obj_id)
- cursor.execute('''
+ cur.execute('''
INSERT INTO vault_notif_email (email, bundle_id)
VALUES (%s, (SELECT id FROM vault_bundle
WHERE type = %s AND object_id = %s))''',
- (email, obj_type, obj_id))
+ (email, obj_type, obj_id))
- @autocommit
+ @db_transaction()
def cook_request(self, obj_type, obj_id, *, sticky=False,
- email=None, cursor=None):
+ email=None, db=None, cur=None):
"""Main entry point for cooking requests. This starts a cooking task if
needed, and add the given e-mail to the notify list"""
obj_id = hashutil.hash_to_bytes(obj_id)
@@ -216,10 +155,10 @@
# If there's a failed bundle entry, delete it first.
if info is not None and info['task_status'] == 'failed':
- cursor.execute('''DELETE FROM vault_bundle
+ cur.execute('''DELETE FROM vault_bundle
WHERE type = %s AND object_id = %s''',
- (obj_type, obj_id))
- self.commit()
+ (obj_type, obj_id))
+ db.conn.commit()
info = None
# If there's no bundle entry, create the task.
@@ -238,44 +177,44 @@
info = self.task_info(obj_type, obj_id)
return info
- @autocommit
- def batch_cook(self, batch, cursor=None):
+ @db_transaction()
+ def batch_cook(self, batch, db=None, cur=None):
"""Cook a batch of bundles and returns the cooking id."""
# Import execute_values at runtime only, because it requires
# psycopg2 >= 2.7 (only available on postgresql servers)
from psycopg2.extras import execute_values
- cursor.execute('''
+ cur.execute('''
INSERT INTO vault_batch (id)
VALUES (DEFAULT)
RETURNING id''')
- batch_id = cursor.fetchone()['id']
+ batch_id = cur.fetchone()['id']
batch = batch_to_bytes(batch)
# Delete all failed bundles from the batch
- cursor.execute('''
+ cur.execute('''
DELETE FROM vault_bundle
WHERE task_status = 'failed'
AND (type, object_id) IN %s''', (tuple(batch),))
# Insert all the bundles, return the new ones
- execute_values(cursor, '''
+ execute_values(cur, '''
INSERT INTO vault_bundle (type, object_id)
VALUES %s ON CONFLICT DO NOTHING''', batch)
# Get the bundle ids and task status
- cursor.execute('''
+ cur.execute('''
SELECT id, type, object_id, task_id FROM vault_bundle
WHERE (type, object_id) IN %s''', (tuple(batch),))
- bundles = cursor.fetchall()
+ bundles = cur.fetchall()
# Insert the batch-bundle entries
batch_id_bundle_ids = [(batch_id, row['id']) for row in bundles]
- execute_values(cursor, '''
+ execute_values(cur, '''
INSERT INTO vault_batch_bundle (batch_id, bundle_id)
VALUES %s ON CONFLICT DO NOTHING''',
batch_id_bundle_ids)
- self.commit()
+ db.conn.commit()
# Get the tasks to fetch
batch_new = [(row['type'], bytes(row['object_id']))
@@ -296,7 +235,7 @@
in tasks_ids_bundle_ids]
# Update the task ids
- execute_values(cursor, '''
+ execute_values(cur, '''
UPDATE vault_bundle
SET task_id = s_task_id
FROM (VALUES %s) AS sub (s_task_id, s_type, s_object_id)
@@ -304,50 +243,50 @@
tasks_ids_bundle_ids)
return batch_id
- @autocommit
- def batch_info(self, batch_id, cursor=None):
+ @db_transaction()
+ def batch_info(self, batch_id, db=None, cur=None):
"""Fetch information from a batch of bundles"""
- cursor.execute('''
+ cur.execute('''
SELECT vault_bundle.id as id,
type, object_id, task_id, task_status, sticky,
ts_created, ts_done, ts_last_access, progress_msg
FROM vault_batch_bundle
LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id
WHERE batch_id = %s''', (batch_id,))
- res = cursor.fetchall()
+ res = cur.fetchall()
if res:
for d in res:
d['object_id'] = bytes(d['object_id'])
return res
- @autocommit
- def is_available(self, obj_type, obj_id, cursor=None):
+ @db_transaction()
+ def is_available(self, obj_type, obj_id, db=None, cur=None):
"""Check whether a bundle is available for retrieval"""
- info = self.task_info(obj_type, obj_id, cursor=cursor)
+ info = self.task_info(obj_type, obj_id, cur=cur)
return (info is not None
and info['task_status'] == 'done'
and self.cache.is_cached(obj_type, obj_id))
- @autocommit
- def fetch(self, obj_type, obj_id, cursor=None):
+ @db_transaction()
+ def fetch(self, obj_type, obj_id, db=None, cur=None):
"""Retrieve a bundle from the cache"""
- if not self.is_available(obj_type, obj_id, cursor=cursor):
+ if not self.is_available(obj_type, obj_id, cur=cur):
return None
- self.update_access_ts(obj_type, obj_id, cursor=cursor)
+ self.update_access_ts(obj_type, obj_id, cur=cur)
return self.cache.get(obj_type, obj_id)
- @autocommit
- def update_access_ts(self, obj_type, obj_id, cursor=None):
+ @db_transaction()
+ def update_access_ts(self, obj_type, obj_id, db=None, cur=None):
"""Update the last access timestamp of a bundle"""
obj_id = hashutil.hash_to_bytes(obj_id)
- cursor.execute('''
+ cur.execute('''
UPDATE vault_bundle
SET ts_last_access = NOW()
WHERE type = %s AND object_id = %s''',
- (obj_type, obj_id))
+ (obj_type, obj_id))
- @autocommit
- def set_status(self, obj_type, obj_id, status, cursor=None):
+ @db_transaction()
+ def set_status(self, obj_type, obj_id, status, db=None, cur=None):
"""Set the cooking status of a bundle"""
obj_id = hashutil.hash_to_bytes(obj_id)
req = ('''
@@ -355,36 +294,36 @@
SET task_status = %s '''
+ (''', ts_done = NOW() ''' if status == 'done' else '')
+ '''WHERE type = %s AND object_id = %s''')
- cursor.execute(req, (status, obj_type, obj_id))
+ cur.execute(req, (status, obj_type, obj_id))
- @autocommit
- def set_progress(self, obj_type, obj_id, progress, cursor=None):
+ @db_transaction()
+ def set_progress(self, obj_type, obj_id, progress, db=None, cur=None):
"""Set the cooking progress of a bundle"""
obj_id = hashutil.hash_to_bytes(obj_id)
- cursor.execute('''
+ cur.execute('''
UPDATE vault_bundle
SET progress_msg = %s
WHERE type = %s AND object_id = %s''',
- (progress, obj_type, obj_id))
+ (progress, obj_type, obj_id))
- @autocommit
- def send_all_notifications(self, obj_type, obj_id, cursor=None):
+ @db_transaction()
+ def send_all_notifications(self, obj_type, obj_id, db=None, cur=None):
"""Send all the e-mails in the notification list of a bundle"""
obj_id = hashutil.hash_to_bytes(obj_id)
- cursor.execute('''
+ cur.execute('''
SELECT vault_notif_email.id AS id, email, task_status, progress_msg
FROM vault_notif_email
INNER JOIN vault_bundle ON bundle_id = vault_bundle.id
WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''',
- (obj_type, obj_id))
- for d in cursor:
+ (obj_type, obj_id))
+ for d in cur:
self.send_notification(d['id'], d['email'], obj_type, obj_id,
status=d['task_status'],
progress_msg=d['progress_msg'])
- @autocommit
+ @db_transaction()
def send_notification(self, n_id, email, obj_type, obj_id, status,
- progress_msg=None, cursor=None):
+ progress_msg=None, db=None, cur=None):
"""Send the notification of a bundle to a specific e-mail"""
hex_id = hashutil.hash_to_hex(obj_id)
short_id = hex_id[:7]
@@ -421,7 +360,7 @@
self._smtp_send(msg)
if n_id is not None:
- cursor.execute('''
+ cur.execute('''
DELETE FROM vault_notif_email
WHERE id = %s''', (n_id,))
@@ -437,11 +376,11 @@
# Send the message
self.smtp_server.send_message(msg)
- @autocommit
- def _cache_expire(self, cond, *args, cursor=None):
+ @db_transaction()
+ def _cache_expire(self, cond, *args, db=None, cur=None):
"""Low-level expiration method, used by cache_expire_* methods"""
# Embedded SELECT query to be able to use ORDER BY and LIMIT
- cursor.execute('''
+ cur.execute('''
DELETE FROM vault_bundle
WHERE ctid IN (
SELECT ctid
@@ -452,18 +391,18 @@
RETURNING type, object_id
'''.format(cond), args)
- for d in cursor:
+ for d in cur:
self.cache.delete(d['type'], bytes(d['object_id']))
- @autocommit
- def cache_expire_oldest(self, n=1, by='last_access', cursor=None):
+ @db_transaction()
+ def cache_expire_oldest(self, n=1, by='last_access', db=None, cur=None):
"""Expire the `n` oldest bundles"""
assert by in ('created', 'done', 'last_access')
filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n)
return self._cache_expire(filter)
- @autocommit
- def cache_expire_until(self, date, by='last_access', cursor=None):
+ @db_transaction()
+ def cache_expire_until(self, date, by='last_access', db=None, cur=None):
"""Expire all the bundles until a certain date"""
assert by in ('created', 'done', 'last_access')
filter = '''AND ts_{} <= %s'''.format(by)
diff --git a/swh/vault/cache.py b/swh/vault/cache.py
--- a/swh/vault/cache.py
+++ b/swh/vault/cache.py
@@ -15,7 +15,7 @@
internal identifiers used in the underlying objstorage.
"""
- def __init__(self, objstorage):
+ def __init__(self, **objstorage):
self.objstorage = get_objstorage(**objstorage)
def add(self, obj_type, obj_id, content):
diff --git a/swh/vault/cli.py b/swh/vault/cli.py
new file mode 100644
--- /dev/null
+++ b/swh/vault/cli.py
@@ -0,0 +1,50 @@
+import logging
+import click
+import aiohttp
+
+from swh.vault.api.server import make_app_from_configfile
+
+
+CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
+
+
+@click.command(context_settings=CONTEXT_SETTINGS)
+@click.option('--config-file', '-C', default=None,
+ type=click.Path(exists=True, dir_okay=False,),
+ help="Configuration file.")
+@click.option('--log-level', '-l', default='INFO',
+ type=click.Choice(logging._nameToLevel.keys()),
+ help="Log level (default to INFO)")
+@click.option('--no-stdout', is_flag=True, default=False,
+ help="Do NOT output logs on the console")
+@click.option('--host', default='0.0.0.0', help="Host to run the server")
+@click.option('--port', default=5005, type=click.INT,
+ help="Binding port of the server")
+@click.option('--debug/--nodebug', default=True,
+ help="Indicates if the server should run in debug mode")
+@click.pass_context
+def cli(ctx, config_file, log_level, no_stdout, host, port, debug):
+ """Software Heritage Vault API server
+
+ """
+ from swh.scheduler.celery_backend.config import setup_log_handler
+ log_level = setup_log_handler(
+ loglevel=log_level, colorize=False,
+ format='[%(levelname)s] %(name)s -- %(message)s',
+ log_console=not no_stdout)
+
+ try:
+ app = make_app_from_configfile(config_file, debug=debug)
+ except EnvironmentError as e:
+ click.echo(e.msg, err=True)
+ ctx.exit(1)
+
+ aiohttp.web.run_app(app, host=host, port=int(port))
+
+
+def main():
+ return cli(auto_envvar_prefix='SWH_VAULT')
+
+
+if __name__ == '__main__':
+ main()
diff --git a/swh/vault/cookers/__init__.py b/swh/vault/cookers/__init__.py
--- a/swh/vault/cookers/__init__.py
+++ b/swh/vault/cookers/__init__.py
@@ -2,7 +2,12 @@
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import os
+from swh.core.config import load_named_config, read as read_config
+from swh.storage import get_storage
+from swh.vault import get_vault
+from swh.vault.cookers.base import DEFAULT_CONFIG_PATH, DEFAULT_CONFIG
from swh.vault.cookers.directory import DirectoryCooker
from swh.vault.cookers.revision_flat import RevisionFlatCooker
from swh.vault.cookers.revision_gitfast import RevisionGitfastCooker
@@ -13,4 +18,36 @@
'revision_gitfast': RevisionGitfastCooker,
}
-get_cooker = COOKER_TYPES.__getitem__
+
+def get_cooker_cls(obj_type):
+ return COOKER_TYPES[obj_type]
+
+
+def get_cooker(obj_type, obj_id):
+ if 'SWH_CONFIG_FILENAME' in os.environ:
+ cfg = read_config(os.environ['SWH_CONFIG_FILENAME'], DEFAULT_CONFIG)
+ else:
+ cfg = load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG)
+ cooker_cls = get_cooker_cls(obj_type)
+ if 'vault' not in cfg:
+ raise ValueError("missing '%vault' configuration")
+
+ vcfg = cfg['vault']
+ if vcfg['cls'] != 'remote':
+ raise EnvironmentError(
+ "This vault backend can only be a 'remote' "
+ "configuration", err=True)
+ args = vcfg['args']
+ if 'storage' not in args:
+ args['storage'] = cfg.get('storage')
+
+ if not args.get('storage'):
+ raise ValueError(
+ "invalid configuration; missing 'storage' config entry.")
+
+ storage = get_storage(**args.pop('storage'))
+ backend = get_vault(**vcfg)
+
+ return cooker_cls(obj_type, obj_id,
+ backend=backend, storage=storage,
+ max_bundle_size=cfg['max_bundle_size'])
diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py
--- a/swh/vault/cookers/base.py
+++ b/swh/vault/cookers/base.py
@@ -7,12 +7,9 @@
import io
import logging
-from swh.core import config
from swh.model import hashutil
-from swh.storage import get_storage
-from swh.vault.api.client import RemoteVaultClient
-
+MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB
DEFAULT_CONFIG_PATH = 'vault/cooker'
DEFAULT_CONFIG = {
'storage': ('dict', {
@@ -21,8 +18,13 @@
'url': 'http://localhost:5002/',
},
}),
- 'vault_url': ('str', 'http://localhost:5005/'),
- 'max_bundle_size': ('int', 2 ** 29), # 512 MiB
+ 'vault': ('dict', {
+ 'cls': 'remote',
+ 'args': {
+ 'url': 'http://localhost:5005/',
+ },
+ }),
+ 'max_bundle_size': ('int', MAX_BUNDLE_SIZE),
}
@@ -61,7 +63,8 @@
"""
CACHE_TYPE_KEY = None
- def __init__(self, obj_type, obj_id, *, override_cfg=None):
+ def __init__(self, obj_type, obj_id, backend, storage,
+ max_bundle_size=MAX_BUNDLE_SIZE):
"""Initialize the cooker.
The type of the object represented by the id depends on the
@@ -69,20 +72,17 @@
own cooker class.
Args:
- storage: the storage object
- cache: the cache where to store the bundle
+ obj_type: type of the object to be cooked into a bundle (directory,
+ revision_flat or revision_gitfast; see
+ swh.vault.cooker.COOKER_TYPES).
obj_id: id of the object to be cooked into a bundle.
+ backend: the vault backend (swh.vault.backend.VaultBackend).
"""
- self.config = config.load_named_config(DEFAULT_CONFIG_PATH,
- DEFAULT_CONFIG)
- if override_cfg is not None:
- self.config.update(override_cfg)
-
self.obj_type = obj_type
self.obj_id = hashutil.hash_to_bytes(obj_id)
- self.backend = RemoteVaultClient(self.config['vault_url'])
- self.storage = get_storage(**self.config['storage'])
- self.max_bundle_size = self.config['max_bundle_size']
+ self.backend = backend
+ self.storage = storage
+ self.max_bundle_size = max_bundle_size
@abc.abstractmethod
def check_exists(self):
diff --git a/swh/vault/cooking_tasks.py b/swh/vault/cooking_tasks.py
--- a/swh/vault/cooking_tasks.py
+++ b/swh/vault/cooking_tasks.py
@@ -11,11 +11,11 @@
@app.task(name=__name__ + '.SWHCookingTask')
def cook_bundle(obj_type, obj_id):
"""Main task to cook a bundle."""
- get_cooker(obj_type)(obj_type, obj_id).cook()
+ get_cooker(obj_type, obj_id).cook()
# TODO: remove once the scheduler handles priority tasks
@app.task(name=__name__ + '.SWHBatchCookingTask')
def batch_cook_bundle(obj_type, obj_id):
"""Temporary task for the batch queue."""
- get_cooker(obj_type)(obj_type, obj_id).cook()
+ get_cooker(obj_type, obj_id).cook()
diff --git a/swh/vault/tests/conftest.py b/swh/vault/tests/conftest.py
new file mode 100644
--- /dev/null
+++ b/swh/vault/tests/conftest.py
@@ -0,0 +1,80 @@
+import pytest
+import glob
+import os
+import pkg_resources.extern.packaging.version
+
+from swh.core.utils import numfile_sortkey as sortkey
+from swh.vault import get_vault
+from swh.vault.tests import SQL_DIR
+from swh.storage.tests import SQL_DIR as STORAGE_SQL_DIR
+from pytest_postgresql import factories
+
+
+pytest_v = pkg_resources.get_distribution("pytest").parsed_version
+if pytest_v < pkg_resources.extern.packaging.version.parse('3.9'):
+ @pytest.fixture
+ def tmp_path(request):
+ import tempfile
+ import pathlib
+ with tempfile.TemporaryDirectory() as tmpdir:
+ yield pathlib.Path(tmpdir)
+
+
+def db_url(name, postgresql_proc):
+ return 'postgresql://{user}@{host}:{port}/{dbname}'.format(
+ host=postgresql_proc.host,
+ port=postgresql_proc.port,
+ user='postgres',
+ dbname=name)
+
+
+postgresql2 = factories.postgresql('postgresql_proc', 'tests2')
+
+
+@pytest.fixture
+def swh_vault(request, postgresql_proc, postgresql, postgresql2, tmp_path):
+
+ for sql_dir, pg in ((SQL_DIR, postgresql), (STORAGE_SQL_DIR, postgresql2)):
+ dump_files = os.path.join(sql_dir, '*.sql')
+ all_dump_files = sorted(glob.glob(dump_files), key=sortkey)
+
+ cursor = pg.cursor()
+ for fname in all_dump_files:
+ with open(fname) as fobj:
+ # disable concurrent index creation since we run in a
+ # transaction
+ cursor.execute(fobj.read().replace('concurrently', ''))
+ pg.commit()
+
+ vault_config = {
+ 'db': db_url('tests', postgresql_proc),
+ 'storage': {
+ 'cls': 'local',
+ 'args': {
+ 'db': db_url('tests2', postgresql_proc),
+ 'objstorage': {
+ 'cls': 'pathslicing',
+ 'args': {
+ 'root': str(tmp_path),
+ 'slicing': '0:1/1:5',
+ },
+ },
+ },
+ },
+ 'cache': {
+ 'cls': 'pathslicing',
+ 'args': {
+ 'root': str(tmp_path),
+ 'slicing': '0:1/1:5',
+ 'allow_delete': True,
+ }
+ },
+ 'scheduler': {
+ 'cls': 'remote',
+ 'args': {
+ 'url': 'http://swh-scheduler:5008',
+ },
+ },
+ }
+
+ return get_vault('local', vault_config)
diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py
--- a/swh/vault/tests/test_backend.py
+++ b/swh/vault/tests/test_backend.py
@@ -6,49 +6,50 @@
import contextlib
import datetime
import psycopg2
-import unittest
-from unittest.mock import patch
+from unittest.mock import patch, MagicMock
+import pytest
from swh.model import hashutil
-from swh.vault.tests.vault_testing import VaultTestFixture, hash_content
-
-
-class BaseTestBackend(VaultTestFixture):
- @contextlib.contextmanager
- def mock_cooking(self):
- with patch.object(self.vault_backend, '_send_task') as mt:
- mt.return_value = 42
- with patch('swh.vault.backend.get_cooker') as mg:
- mcc = unittest.mock.MagicMock()
- mc = unittest.mock.MagicMock()
- mg.return_value = mcc
- mcc.return_value = mc
- mc.check_exists.return_value = True
-
- yield {'send_task': mt,
- 'get_cooker': mg,
- 'cooker_cls': mcc,
- 'cooker': mc}
-
- def assertTimestampAlmostNow(self, ts, tolerance_secs=1.0): # noqa
- now = datetime.datetime.now(datetime.timezone.utc)
- creation_delta_secs = (ts - now).total_seconds()
- self.assertLess(creation_delta_secs, tolerance_secs)
-
- def fake_cook(self, obj_type, result_content, sticky=False):
- content, obj_id = hash_content(result_content)
- with self.mock_cooking():
- self.vault_backend.create_task(obj_type, obj_id, sticky)
- self.vault_backend.cache.add(obj_type, obj_id, b'content')
- self.vault_backend.set_status(obj_type, obj_id, 'done')
- return obj_id, content
-
- def fail_cook(self, obj_type, obj_id, failure_reason):
- with self.mock_cooking():
- self.vault_backend.create_task(obj_type, obj_id)
- self.vault_backend.set_status(obj_type, obj_id, 'failed')
- self.vault_backend.set_progress(obj_type, obj_id, failure_reason)
+from swh.vault.tests.vault_testing import hash_content
+
+
+@contextlib.contextmanager
+def mock_cooking(vault_backend):
+ with patch.object(vault_backend, '_send_task') as mt:
+ mt.return_value = 42
+ with patch('swh.vault.backend.get_cooker') as mg:
+ mcc = MagicMock()
+ mc = MagicMock()
+ mg.return_value = mcc
+ mcc.return_value = mc
+ mc.check_exists.return_value = True
+
+ yield {'_send_task': mt,
+ 'get_cooker': mg,
+ 'cooker_cls': mcc,
+ 'cooker': mc}
+
+def assertTimestampAlmostNow(ts, tolerance_secs=1.0): # noqa
+ now = datetime.datetime.now(datetime.timezone.utc)
+ creation_delta_secs = (ts - now).total_seconds()
+ assert creation_delta_secs < tolerance_secs
+
+
+def fake_cook(backend, obj_type, result_content, sticky=False):
+ content, obj_id = hash_content(result_content)
+ with mock_cooking(backend):
+ backend.create_task(obj_type, obj_id, sticky)
+ backend.cache.add(obj_type, obj_id, b'content')
+ backend.set_status(obj_type, obj_id, 'done')
+ return obj_id, content
+
+
+def fail_cook(backend, obj_type, obj_id, failure_reason):
+ with mock_cooking(backend):
+ backend.create_task(obj_type, obj_id)
+ backend.set_status(obj_type, obj_id, 'failed')
+ backend.set_progress(obj_type, obj_id, failure_reason)
TEST_TYPE = 'revision_gitfast'
@@ -56,272 +57,281 @@
TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID)
TEST_PROGRESS = ("Mr. White, You're telling me you're cooking again?"
" \N{ASTONISHED FACE} ")
-TEST_EMAIL = 'ouiche@example.com'
-
-
-class TestBackend(BaseTestBackend, unittest.TestCase):
- def test_create_task_simple(self):
- with self.mock_cooking() as m:
- self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID)
-
- m['get_cooker'].assert_called_once_with(TEST_TYPE)
-
- args = m['cooker_cls'].call_args[0]
- self.assertEqual(args[0], TEST_TYPE)
- self.assertEqual(args[1], TEST_HEX_ID)
-
- self.assertEqual(m['cooker'].check_exists.call_count, 1)
-
- self.assertEqual(m['send_task'].call_count, 1)
- args = m['send_task'].call_args[0][0]
- self.assertEqual(args[0], TEST_TYPE)
- self.assertEqual(args[1], TEST_HEX_ID)
-
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['object_id'], TEST_OBJ_ID)
- self.assertEqual(info['type'], TEST_TYPE)
- self.assertEqual(info['task_status'], 'new')
- self.assertEqual(info['task_id'], 42)
-
- self.assertTimestampAlmostNow(info['ts_created'])
-
- self.assertEqual(info['ts_done'], None)
- self.assertEqual(info['progress_msg'], None)
-
- def test_create_fail_duplicate_task(self):
- with self.mock_cooking():
- self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID)
- with self.assertRaises(psycopg2.IntegrityError):
- self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID)
-
- def test_create_fail_nonexisting_object(self):
- with self.mock_cooking() as m:
- m['cooker'].check_exists.side_effect = ValueError('Nothing here.')
- with self.assertRaises(ValueError):
- self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID)
-
- def test_create_set_progress(self):
- with self.mock_cooking():
- self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID)
-
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['progress_msg'], None)
- self.vault_backend.set_progress(TEST_TYPE, TEST_OBJ_ID,
- TEST_PROGRESS)
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['progress_msg'], TEST_PROGRESS)
-
- def test_create_set_status(self):
- with self.mock_cooking():
- self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID)
-
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['task_status'], 'new')
- self.assertEqual(info['ts_done'], None)
-
- self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'pending')
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['task_status'], 'pending')
- self.assertEqual(info['ts_done'], None)
-
- self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done')
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['task_status'], 'done')
- self.assertTimestampAlmostNow(info['ts_done'])
-
- def test_create_update_access_ts(self):
- with self.mock_cooking():
- self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID)
-
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- access_ts_1 = info['ts_last_access']
- self.assertTimestampAlmostNow(access_ts_1)
-
- self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID)
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- access_ts_2 = info['ts_last_access']
- self.assertTimestampAlmostNow(access_ts_2)
-
- self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID)
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- access_ts_3 = info['ts_last_access']
- self.assertTimestampAlmostNow(access_ts_3)
-
- self.assertLess(access_ts_1, access_ts_2)
- self.assertLess(access_ts_2, access_ts_3)
-
- def test_cook_request_idempotent(self):
- with self.mock_cooking():
- info1 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID)
- info2 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID)
- info3 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info1, info2)
- self.assertEqual(info1, info3)
-
- def test_cook_email_pending_done(self):
- with self.mock_cooking(), \
- patch.object(self.vault_backend, 'add_notif_email') as madd, \
- patch.object(self.vault_backend, 'send_notification') as msend:
-
- self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID)
- madd.assert_not_called()
- msend.assert_not_called()
-
- madd.reset_mock()
- msend.reset_mock()
-
- self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID,
- email=TEST_EMAIL)
- madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL)
- msend.assert_not_called()
-
- madd.reset_mock()
- msend.reset_mock()
-
- self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done')
- self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID,
- email=TEST_EMAIL)
- msend.assert_called_once_with(None, TEST_EMAIL,
- TEST_TYPE, TEST_OBJ_ID, 'done')
- madd.assert_not_called()
-
- def test_send_all_emails(self):
- with self.mock_cooking():
- emails = ('a@example.com',
- 'billg@example.com',
- 'test+42@example.org')
- for email in emails:
- self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID,
- email=email)
-
- self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done')
-
- with patch.object(self.vault_backend, 'smtp_server') as m:
- self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID)
-
- sent_emails = {k[0][0] for k in m.send_message.call_args_list}
- self.assertEqual({k['To'] for k in sent_emails}, set(emails))
-
- for e in sent_emails:
- self.assertIn('info@softwareheritage.org', e['From'])
- self.assertIn(TEST_TYPE, e['Subject'])
- self.assertIn(TEST_HEX_ID[:5], e['Subject'])
- self.assertIn(TEST_TYPE, str(e))
- self.assertIn('https://archive.softwareheritage.org/', str(e))
- self.assertIn(TEST_HEX_ID[:5], str(e))
- self.assertIn('--\x20\n', str(e)) # Well-formated signature!!!
-
- # Check that the entries have been deleted and recalling the
- # function does not re-send the e-mails
- m.reset_mock()
- self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID)
- m.assert_not_called()
-
- def test_available(self):
- self.assertFalse(self.vault_backend.is_available(TEST_TYPE,
- TEST_OBJ_ID))
- with self.mock_cooking():
- self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID)
- self.assertFalse(self.vault_backend.is_available(TEST_TYPE,
- TEST_OBJ_ID))
- self.vault_backend.cache.add(TEST_TYPE, TEST_OBJ_ID, b'content')
- self.assertFalse(self.vault_backend.is_available(TEST_TYPE,
- TEST_OBJ_ID))
- self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done')
- self.assertTrue(self.vault_backend.is_available(TEST_TYPE,
- TEST_OBJ_ID))
-
- def test_fetch(self):
- self.assertEqual(self.vault_backend.fetch(TEST_TYPE, TEST_OBJ_ID),
- None)
- obj_id, content = self.fake_cook(TEST_TYPE, b'content')
-
- info = self.vault_backend.task_info(TEST_TYPE, obj_id)
- access_ts_before = info['ts_last_access']
-
- self.assertEqual(self.vault_backend.fetch(TEST_TYPE, obj_id),
- b'content')
-
- info = self.vault_backend.task_info(TEST_TYPE, obj_id)
- access_ts_after = info['ts_last_access']
-
- self.assertTimestampAlmostNow(access_ts_after)
- self.assertLess(access_ts_before, access_ts_after)
-
- def test_cache_expire_oldest(self):
- r = range(1, 10)
- inserted = {}
- for i in r:
- sticky = (i == 5)
- content = b'content%s' % str(i).encode()
- obj_id, content = self.fake_cook(TEST_TYPE, content, sticky)
- inserted[i] = (obj_id, content)
-
- self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0])
- self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0])
- self.vault_backend.cache_expire_oldest(n=4)
-
- should_be_still_here = {2, 3, 5, 8, 9}
- for i in r:
- self.assertEqual(self.vault_backend.is_available(
- TEST_TYPE, inserted[i][0]), i in should_be_still_here)
-
- def test_cache_expire_until(self):
- r = range(1, 10)
- inserted = {}
- for i in r:
- sticky = (i == 5)
- content = b'content%s' % str(i).encode()
- obj_id, content = self.fake_cook(TEST_TYPE, content, sticky)
- inserted[i] = (obj_id, content)
-
- if i == 7:
- cutoff_date = datetime.datetime.now()
-
- self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0])
- self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0])
- self.vault_backend.cache_expire_until(date=cutoff_date)
-
- should_be_still_here = {2, 3, 5, 8, 9}
- for i in r:
- self.assertEqual(self.vault_backend.is_available(
- TEST_TYPE, inserted[i][0]), i in should_be_still_here)
-
- def test_fail_cook_simple(self):
- self.fail_cook(TEST_TYPE, TEST_OBJ_ID, 'error42')
- self.assertFalse(self.vault_backend.is_available(TEST_TYPE,
- TEST_OBJ_ID))
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['progress_msg'], 'error42')
-
- def test_send_failure_email(self):
- with self.mock_cooking():
- self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID,
- email='a@example.com')
-
- self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'failed')
- self.vault_backend.set_progress(TEST_TYPE, TEST_OBJ_ID, 'test error')
-
- with patch.object(self.vault_backend, 'smtp_server') as m:
- self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID)
-
- e = [k[0][0] for k in m.send_message.call_args_list][0]
- self.assertEqual(e['To'], 'a@example.com')
-
- self.assertIn('info@softwareheritage.org', e['From'])
- self.assertIn(TEST_TYPE, e['Subject'])
- self.assertIn(TEST_HEX_ID[:5], e['Subject'])
- self.assertIn('fail', e['Subject'])
- self.assertIn(TEST_TYPE, str(e))
- self.assertIn(TEST_HEX_ID[:5], str(e))
- self.assertIn('test error', str(e))
- self.assertIn('--\x20\n', str(e)) # Well-formated signature
-
- def test_retry_failed_bundle(self):
- self.fail_cook(TEST_TYPE, TEST_OBJ_ID, 'error42')
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['task_status'], 'failed')
- with self.mock_cooking():
- self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID)
- info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID)
- self.assertEqual(info['task_status'], 'new')
+TEST_EMAIL = 'ouiche@lorraine.fr'
+
+
+def test_create_task_simple(swh_vault):
+ with mock_cooking(swh_vault) as m:
+ swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID)
+
+ m['get_cooker'].assert_called_once_with(TEST_TYPE)
+
+ args = m['cooker_cls'].call_args[0]
+ assert args[0] == TEST_TYPE
+ assert args[1] == TEST_HEX_ID
+
+ assert m['cooker'].check_exists.call_count == 1
+ assert m['_send_task'].call_count == 1
+
+ args = m['_send_task'].call_args[0]
+ assert args[0] == TEST_TYPE
+ assert args[1] == TEST_HEX_ID
+
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['object_id'] == TEST_OBJ_ID
+ assert info['type'] == TEST_TYPE
+ assert info['task_status'] == 'new'
+ assert info['task_id'] == 42
+
+ assertTimestampAlmostNow(info['ts_created'])
+
+ assert info['ts_done'] is None
+ assert info['progress_msg'] is None
+
+
+def test_create_fail_duplicate_task(swh_vault):
+ with mock_cooking(swh_vault):
+ swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID)
+ with pytest.raises(psycopg2.IntegrityError):
+ swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID)
+
+
+def test_create_fail_nonexisting_object(swh_vault):
+ with mock_cooking(swh_vault) as m:
+ m['cooker'].check_exists.side_effect = ValueError('Nothing here.')
+ with pytest.raises(ValueError):
+ swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID)
+
+
+def test_create_set_progress(swh_vault):
+ with mock_cooking(swh_vault):
+ swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID)
+
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['progress_msg'] is None
+ swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID,
+ TEST_PROGRESS)
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['progress_msg'] == TEST_PROGRESS
+
+
+def test_create_set_status(swh_vault):
+ with mock_cooking(swh_vault):
+ swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID)
+
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['task_status'] == 'new'
+ assert info['ts_done'] is None
+
+ swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'pending')
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['task_status'] == 'pending'
+ assert info['ts_done'] is None
+
+ swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done')
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['task_status'] == 'done'
+ assertTimestampAlmostNow(info['ts_done'])
+
+
+def test_create_update_access_ts(swh_vault):
+ with mock_cooking(swh_vault):
+ swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID)
+
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ access_ts_1 = info['ts_last_access']
+ assertTimestampAlmostNow(access_ts_1)
+
+ swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID)
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ access_ts_2 = info['ts_last_access']
+ assertTimestampAlmostNow(access_ts_2)
+
+ swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID)
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ access_ts_3 = info['ts_last_access']
+ assertTimestampAlmostNow(access_ts_3)
+
+ assert access_ts_1 < access_ts_2
+ assert access_ts_2 < access_ts_3
+
+
+def test_cook_request_idempotent(swh_vault):
+ with mock_cooking(swh_vault):
+ info1 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID)
+ info2 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID)
+ info3 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID)
+ assert info1 == info2
+ assert info1 == info3
+
+
+def test_cook_email_pending_done(swh_vault):
+ with mock_cooking(swh_vault), \
+ patch.object(swh_vault, 'add_notif_email') as madd, \
+ patch.object(swh_vault, 'send_notification') as msend:
+
+ swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID)
+ madd.assert_not_called()
+ msend.assert_not_called()
+
+ madd.reset_mock()
+ msend.reset_mock()
+
+ swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID,
+ email=TEST_EMAIL)
+ madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL)
+ msend.assert_not_called()
+
+ madd.reset_mock()
+ msend.reset_mock()
+
+ swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done')
+ swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID,
+ email=TEST_EMAIL)
+ msend.assert_called_once_with(None, TEST_EMAIL,
+ TEST_TYPE, TEST_OBJ_ID, 'done')
+ madd.assert_not_called()
+
+
+def test_send_all_emails(swh_vault):
+ with mock_cooking(swh_vault):
+ emails = ('a@example.com',
+ 'billg@example.com',
+ 'test+42@example.org')
+ for email in emails:
+ swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID,
+ email=email)
+
+ swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done')
+
+ with patch.object(swh_vault, 'smtp_server') as m:
+ swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID)
+
+ sent_emails = {k[0][0] for k in m.send_message.call_args_list}
+ assert {k['To'] for k in sent_emails} == set(emails)
+
+ for e in sent_emails:
+ assert 'info@softwareheritage.org' in e['From']
+ assert TEST_TYPE in e['Subject']
+ assert TEST_HEX_ID[:5] in e['Subject']
+ assert TEST_TYPE in str(e)
+ assert 'https://archive.softwareheritage.org/' in str(e)
+ assert TEST_HEX_ID[:5] in str(e)
+ assert '--\x20\n' in str(e) # Well-formated signature!!!
+
+ # Check that the entries have been deleted and recalling the
+ # function does not re-send the e-mails
+ m.reset_mock()
+ swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID)
+ m.assert_not_called()
+
+
+def test_available(swh_vault):
+ assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID)
+
+ with mock_cooking(swh_vault):
+ swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID)
+ assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID)
+
+ swh_vault.cache.add(TEST_TYPE, TEST_OBJ_ID, b'content')
+ assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID)
+
+ swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done')
+ assert swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID)
+
+
+def test_fetch(swh_vault):
+ assert swh_vault.fetch(TEST_TYPE, TEST_OBJ_ID) is None
+ obj_id, content = fake_cook(swh_vault, TEST_TYPE, b'content')
+
+ info = swh_vault.task_info(TEST_TYPE, obj_id)
+ access_ts_before = info['ts_last_access']
+
+ assert swh_vault.fetch(TEST_TYPE, obj_id) == b'content'
+
+ info = swh_vault.task_info(TEST_TYPE, obj_id)
+ access_ts_after = info['ts_last_access']
+
+ assertTimestampAlmostNow(access_ts_after)
+ assert access_ts_before < access_ts_after
+
+
+def test_cache_expire_oldest(swh_vault):
+ r = range(1, 10)
+ inserted = {}
+ for i in r:
+ sticky = (i == 5)
+ content = b'content%s' % str(i).encode()
+ obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky)
+ inserted[i] = (obj_id, content)
+
+ swh_vault.update_access_ts(TEST_TYPE, inserted[2][0])
+ swh_vault.update_access_ts(TEST_TYPE, inserted[3][0])
+ swh_vault.cache_expire_oldest(n=4)
+
+ should_be_still_here = {2, 3, 5, 8, 9}
+ for i in r:
+ assert swh_vault.is_available(
+ TEST_TYPE, inserted[i][0]) == (i in should_be_still_here)
+
+
+def test_cache_expire_until(swh_vault):
+ r = range(1, 10)
+ inserted = {}
+ for i in r:
+ sticky = (i == 5)
+ content = b'content%s' % str(i).encode()
+ obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky)
+ inserted[i] = (obj_id, content)
+
+ if i == 7:
+ cutoff_date = datetime.datetime.now()
+
+ swh_vault.update_access_ts(TEST_TYPE, inserted[2][0])
+ swh_vault.update_access_ts(TEST_TYPE, inserted[3][0])
+ swh_vault.cache_expire_until(date=cutoff_date)
+
+ should_be_still_here = {2, 3, 5, 8, 9}
+ for i in r:
+ assert swh_vault.is_available(
+ TEST_TYPE, inserted[i][0]) == (i in should_be_still_here)
+
+
+def test_fail_cook_simple(swh_vault):
+ fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, 'error42')
+ assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID)
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['progress_msg'] == 'error42'
+
+
+def test_send_failure_email(swh_vault):
+ with mock_cooking(swh_vault):
+ swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email='a@example.com')
+
+ swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'failed')
+ swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, 'test error')
+
+ with patch.object(swh_vault, 'smtp_server') as m:
+ swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID)
+
+ e = [k[0][0] for k in m.send_message.call_args_list][0]
+ assert e['To'] == 'a@example.com'
+
+ assert 'info@softwareheritage.org' in e['From']
+ assert TEST_TYPE in e['Subject']
+ assert TEST_HEX_ID[:5] in e['Subject']
+ assert 'fail' in e['Subject']
+ assert TEST_TYPE in str(e)
+ assert TEST_HEX_ID[:5] in str(e)
+ assert 'test error' in str(e)
+ assert '--\x20\n' in str(e) # Well-formated signature
+
+
+def test_retry_failed_bundle(swh_vault):
+ fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, 'error42')
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['task_status'] == 'failed'
+ with mock_cooking(swh_vault):
+ swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID)
+ info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID)
+ assert info['task_status'] == 'new'
diff --git a/swh/vault/tests/test_cache.py b/swh/vault/tests/test_cache.py
--- a/swh/vault/tests/test_cache.py
+++ b/swh/vault/tests/test_cache.py
@@ -3,10 +3,8 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import unittest
from swh.model import hashutil
-from swh.vault.tests.vault_testing import VaultTestFixture
TEST_TYPE_1 = 'revision_gitfast'
TEST_TYPE_2 = 'directory'
@@ -21,54 +19,51 @@
TEST_CONTENT_2 = b'test content 2'
-class BaseTestVaultCache(VaultTestFixture):
- def setUp(self):
- super().setUp()
- self.cache = self.vault_backend.cache # little shortcut
-
-
-class TestVaultCache(BaseTestVaultCache, unittest.TestCase):
- # Let's try to avoid replicating edge-cases already tested in
- # swh-objstorage, and instead focus on testing behaviors specific to the
- # Vault cache here.
-
- def test_internal_id(self):
- sid = self.cache._get_internal_id(TEST_TYPE_1, TEST_OBJ_ID_1)
- self.assertEqual(hashutil.hash_to_hex(sid),
- '6829cda55b54c295aa043a611a4e0320239988d9')
-
- def test_simple_add_get(self):
- self.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1)
- self.assertEqual(self.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1),
- TEST_CONTENT_1)
- self.assertTrue(self.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1))
-
- def test_different_type_same_id(self):
- self.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1)
- self.cache.add(TEST_TYPE_2, TEST_OBJ_ID_1, TEST_CONTENT_2)
- self.assertEqual(self.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1),
- TEST_CONTENT_1)
- self.assertEqual(self.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1),
- TEST_CONTENT_2)
- self.assertTrue(self.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1))
- self.assertTrue(self.cache.is_cached(TEST_TYPE_2, TEST_OBJ_ID_1))
-
- def test_different_type_same_content(self):
- self.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1)
- self.cache.add(TEST_TYPE_2, TEST_OBJ_ID_1, TEST_CONTENT_1)
- self.assertEqual(self.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1),
- TEST_CONTENT_1)
- self.assertEqual(self.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1),
- TEST_CONTENT_1)
- self.assertTrue(self.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1))
- self.assertTrue(self.cache.is_cached(TEST_TYPE_2, TEST_OBJ_ID_1))
-
- def test_different_id_same_type(self):
- self.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1)
- self.cache.add(TEST_TYPE_1, TEST_OBJ_ID_2, TEST_CONTENT_2)
- self.assertEqual(self.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1),
- TEST_CONTENT_1)
- self.assertEqual(self.cache.get(TEST_TYPE_1, TEST_OBJ_ID_2),
- TEST_CONTENT_2)
- self.assertTrue(self.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1))
- self.assertTrue(self.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_2))
+# Let's try to avoid replicating edge-cases already tested in
+# swh-objstorage, and instead focus on testing behaviors specific to the
+# Vault cache here.
+
+def test_internal_id(swh_vault):
+ sid = swh_vault.cache._get_internal_id(TEST_TYPE_1, TEST_OBJ_ID_1)
+ assert hashutil.hash_to_hex(sid) == \
+ '6829cda55b54c295aa043a611a4e0320239988d9'
+
+
+def test_simple_add_get(swh_vault):
+ swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1)
+ assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == \
+ TEST_CONTENT_1
+ assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1)
+
+
+def test_different_type_same_id(swh_vault):
+ swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1)
+ swh_vault.cache.add(TEST_TYPE_2, TEST_OBJ_ID_1, TEST_CONTENT_2)
+ assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == \
+ TEST_CONTENT_1
+ assert swh_vault.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1) == \
+ TEST_CONTENT_2
+ assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1)
+ assert swh_vault.cache.is_cached(TEST_TYPE_2, TEST_OBJ_ID_1)
+
+
+def test_different_type_same_content(swh_vault):
+ swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1)
+ swh_vault.cache.add(TEST_TYPE_2, TEST_OBJ_ID_1, TEST_CONTENT_1)
+ assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == \
+ TEST_CONTENT_1
+ assert swh_vault.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1) == \
+ TEST_CONTENT_1
+ assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1)
+ assert swh_vault.cache.is_cached(TEST_TYPE_2, TEST_OBJ_ID_1)
+
+
+def test_different_id_same_type(swh_vault):
+ swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1)
+ swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_2, TEST_CONTENT_2)
+ assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == \
+ TEST_CONTENT_1
+ assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_2) == \
+ TEST_CONTENT_2
+ assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1)
+ assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_2)
diff --git a/swh/vault/tests/test_cookers.py b/swh/vault/tests/test_cookers.py
--- a/swh/vault/tests/test_cookers.py
+++ b/swh/vault/tests/test_cookers.py
@@ -26,7 +26,7 @@
from swh.model import hashutil
from swh.model.from_disk import Directory
from swh.vault.cookers import DirectoryCooker, RevisionGitfastCooker
-from swh.vault.tests.vault_testing import VaultTestFixture, hash_content
+from swh.vault.tests.vault_testing import hash_content
from swh.vault.to_disk import SKIPPED_MESSAGE, HIDDEN_MESSAGE
@@ -101,61 +101,60 @@
self.git_shell(*args, stdout=None)
-@pytest.mark.config_issue
-class BaseTestCookers(VaultTestFixture):
- """Base class of cookers unit tests"""
- def setUp(self):
- super().setUp()
- self.loader = GitLoaderFromDisk()
- self.loader.storage = self.storage
-
- def tearDown(self):
- self.loader = None
- super().tearDown()
-
- def load(self, repo_path):
- """Load a repository in the test storage"""
- self.loader.load('fake_origin', repo_path, datetime.datetime.now())
-
- @contextlib.contextmanager
- def cook_extract_directory(self, obj_id):
- """Context manager that cooks a directory and extract it."""
- cooker = DirectoryCooker('directory', obj_id)
- cooker.storage = self.storage
- cooker.backend = unittest.mock.MagicMock()
- cooker.fileobj = io.BytesIO()
- assert cooker.check_exists()
- cooker.prepare_bundle()
- cooker.fileobj.seek(0)
- with tempfile.TemporaryDirectory(prefix='tmp-vault-extract-') as td:
- with tarfile.open(fileobj=cooker.fileobj, mode='r') as tar:
- tar.extractall(td)
- yield pathlib.Path(td) / hashutil.hash_to_hex(obj_id)
- cooker.storage = None
-
- @contextlib.contextmanager
- def cook_stream_revision_gitfast(self, obj_id):
- """Context manager that cooks a revision and stream its fastexport."""
- cooker = RevisionGitfastCooker('revision_gitfast', obj_id)
- cooker.storage = self.storage
- cooker.backend = unittest.mock.MagicMock()
- cooker.fileobj = io.BytesIO()
- assert cooker.check_exists()
- cooker.prepare_bundle()
- cooker.fileobj.seek(0)
- fastexport_stream = gzip.GzipFile(fileobj=cooker.fileobj)
- yield fastexport_stream
- cooker.storage = None
-
- @contextlib.contextmanager
- def cook_extract_revision_gitfast(self, obj_id):
- """Context manager that cooks a revision and extract it."""
- test_repo = TestRepo()
- with self.cook_stream_revision_gitfast(obj_id) as stream, \
- test_repo as p:
- processor = dulwich.fastexport.GitImportProcessor(test_repo.repo)
- processor.import_stream(stream)
- yield test_repo, p
+@pytest.fixture
+def swh_git_loader(swh_vault):
+ loader = GitLoaderFromDisk()
+ loader.storage = swh_vault.storage
+ return loader
+
+
+def load(loader, repo_path):
+ """Load a repository in the test storage"""
+ loader.load('fake_origin', repo_path, datetime.datetime.now())
+
+
+@contextlib.contextmanager
+def cook_extract_directory(storage, obj_id):
+ """Context manager that cooks a directory and extract it."""
+ backend = unittest.mock.MagicMock()
+ backend.storage = storage
+ cooker = DirectoryCooker('directory', obj_id, backend=backend)
+ cooker.fileobj = io.BytesIO()
+ assert cooker.check_exists()
+ cooker.prepare_bundle()
+ cooker.fileobj.seek(0)
+ with tempfile.TemporaryDirectory(prefix='tmp-vault-extract-') as td:
+ with tarfile.open(fileobj=cooker.fileobj, mode='r') as tar:
+ tar.extractall(td)
+ yield pathlib.Path(td) / hashutil.hash_to_hex(obj_id)
+ cooker.storage = None
+
+
+@contextlib.contextmanager
+def cook_stream_revision_gitfast(storage, obj_id):
+ """Context manager that cooks a revision and stream its fastexport."""
+ backend = unittest.mock.MagicMock()
+ backend.storage = storage
+ cooker = RevisionGitfastCooker(
+ 'revision_gitfast', obj_id, backend=backend)
+ cooker.fileobj = io.BytesIO()
+ assert cooker.check_exists()
+ cooker.prepare_bundle()
+ cooker.fileobj.seek(0)
+ fastexport_stream = gzip.GzipFile(fileobj=cooker.fileobj)
+ yield fastexport_stream
+ cooker.storage = None
+
+
+@contextlib.contextmanager
+def cook_extract_revision_gitfast(storage, obj_id):
+ """Context manager that cooks a revision and extract it."""
+ test_repo = TestRepo()
+ with cook_stream_revision_gitfast(storage, obj_id) as stream, \
+ test_repo as p:
+ processor = dulwich.fastexport.GitImportProcessor(test_repo.repo)
+ processor.import_stream(stream)
+ yield test_repo, p
TEST_CONTENT = (" test content\n"
@@ -164,8 +163,8 @@
TEST_EXECUTABLE = b'\x42\x40\x00\x00\x05'
-class TestDirectoryCooker(BaseTestCookers, unittest.TestCase):
- def test_directory_simple(self):
+class TestDirectoryCooker:
+ def test_directory_simple(self, swh_git_loader):
repo = TestRepo()
with repo as rp:
(rp / 'file').write_text(TEST_CONTENT)
@@ -175,25 +174,25 @@
(rp / 'dir1/dir2').mkdir(parents=True)
(rp / 'dir1/dir2/file').write_text(TEST_CONTENT)
c = repo.commit()
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
obj_id_hex = repo.repo[c].tree.decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
- with self.cook_extract_directory(obj_id) as p:
- self.assertEqual((p / 'file').stat().st_mode, 0o100644)
- self.assertEqual((p / 'file').read_text(), TEST_CONTENT)
- self.assertEqual((p / 'executable').stat().st_mode, 0o100755)
- self.assertEqual((p / 'executable').read_bytes(), TEST_EXECUTABLE)
- self.assertTrue((p / 'link').is_symlink)
- self.assertEqual(os.readlink(str(p / 'link')), 'file')
- self.assertEqual((p / 'dir1/dir2/file').stat().st_mode, 0o100644)
- self.assertEqual((p / 'dir1/dir2/file').read_text(), TEST_CONTENT)
+ with cook_extract_directory(swh_git_loader.storage, obj_id) as p:
+ assert (p / 'file').stat().st_mode == 0o100644
+ assert (p / 'file').read_text() == TEST_CONTENT
+ assert (p / 'executable').stat().st_mode == 0o100755
+ assert (p / 'executable').read_bytes() == TEST_EXECUTABLE
+ assert (p / 'link').is_symlink
+ assert os.readlink(str(p / 'link')) == 'file'
+ assert (p / 'dir1/dir2/file').stat().st_mode == 0o100644
+ assert (p / 'dir1/dir2/file').read_text() == TEST_CONTENT
directory = Directory.from_disk(path=bytes(p))
- self.assertEqual(obj_id_hex, hashutil.hash_to_hex(directory.hash))
+ assert obj_id_hex == hashutil.hash_to_hex(directory.hash)
- def test_directory_filtered_objects(self):
+ def test_directory_filtered_objects(self, swh_git_loader):
repo = TestRepo()
with repo as rp:
file_1, id_1 = hash_content(b'test1')
@@ -205,14 +204,14 @@
(rp / 'absent_file').write_bytes(file_3)
c = repo.commit()
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
obj_id_hex = repo.repo[c].tree.decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
# FIXME: storage.content_update() should be changed to allow things
# like that
- with self.storage.get_db().transaction() as cur:
+ with swh_git_loader.storage.get_db().transaction() as cur:
cur.execute("""update content set status = 'visible'
where sha1 = %s""", (id_1,))
cur.execute("""update content set status = 'hidden'
@@ -220,12 +219,12 @@
cur.execute("""update content set status = 'absent'
where sha1 = %s""", (id_3,))
- with self.cook_extract_directory(obj_id) as p:
- self.assertEqual((p / 'file').read_bytes(), b'test1')
- self.assertEqual((p / 'hidden_file').read_bytes(), HIDDEN_MESSAGE)
- self.assertEqual((p / 'absent_file').read_bytes(), SKIPPED_MESSAGE)
+ with cook_extract_directory(swh_git_loader.storage, obj_id) as p:
+ assert (p / 'file').read_bytes() == b'test1'
+ assert (p / 'hidden_file').read_bytes() == HIDDEN_MESSAGE
+ assert (p / 'absent_file').read_bytes() == SKIPPED_MESSAGE
- def test_directory_bogus_perms(self):
+ def test_directory_bogus_perms(self, swh_git_loader):
# Some early git repositories have 664/775 permissions... let's check
# if all the weird modes are properly normalized in the directory
# cooker.
@@ -238,17 +237,17 @@
(rp / 'wat').write_text(TEST_CONTENT)
(rp / 'wat').chmod(0o604)
c = repo.commit()
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
obj_id_hex = repo.repo[c].tree.decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
- with self.cook_extract_directory(obj_id) as p:
- self.assertEqual((p / 'file').stat().st_mode, 0o100644)
- self.assertEqual((p / 'executable').stat().st_mode, 0o100755)
- self.assertEqual((p / 'wat').stat().st_mode, 0o100644)
+ with cook_extract_directory(swh_git_loader.storage, obj_id) as p:
+ assert (p / 'file').stat().st_mode == 0o100644
+ assert (p / 'executable').stat().st_mode == 0o100755
+ assert (p / 'wat').stat().st_mode == 0o100644
- def test_directory_revision_data(self):
+ def test_directory_revision_data(self, swh_git_loader):
target_rev = '0e8a3ad980ec179856012b7eecf4327e99cd44cd'
d = hashutil.hash_to_bytes('17a3e48bce37be5226490e750202ad3a9a1a3fe9')
@@ -263,18 +262,19 @@
}
],
}
- self.storage.directory_add([dir])
+ swh_git_loader.storage.directory_add([dir])
- with self.cook_extract_directory(d) as p:
- self.assertTrue((p / 'submodule').is_symlink())
- self.assertEqual(os.readlink(str(p / 'submodule')), target_rev)
+ with cook_extract_directory(swh_git_loader.storage, d) as p:
+ assert (p / 'submodule').is_symlink()
+ assert os.readlink(str(p / 'submodule')) == target_rev
-class TestRevisionGitfastCooker(BaseTestCookers, unittest.TestCase):
- def test_revision_simple(self):
+class TestRevisionGitfastCooker:
+ def test_revision_simple(self, swh_git_loader):
#
# 1--2--3--4--5--6--7
#
+ storage = swh_git_loader.storage
repo = TestRepo()
with repo as rp:
(rp / 'file1').write_text(TEST_CONTENT)
@@ -293,28 +293,29 @@
repo.commit('remove file2')
(rp / 'bin1').rename(rp / 'bin')
repo.commit('rename bin1 to bin')
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
obj_id_hex = repo.repo.refs[b'HEAD'].decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
- with self.cook_extract_revision_gitfast(obj_id) as (ert, p):
+ with cook_extract_revision_gitfast(storage, obj_id) as (ert, p):
ert.checkout(b'HEAD')
- self.assertEqual((p / 'file1').stat().st_mode, 0o100644)
- self.assertEqual((p / 'file1').read_text(), TEST_CONTENT)
- self.assertTrue((p / 'link1').is_symlink)
- self.assertEqual(os.readlink(str(p / 'link1')), 'file1')
- self.assertEqual((p / 'bin').stat().st_mode, 0o100755)
- self.assertEqual((p / 'bin').read_bytes(), TEST_EXECUTABLE)
- self.assertEqual((p / 'dir1/dir2/file').read_text(), TEST_CONTENT)
- self.assertEqual((p / 'dir1/dir2/file').stat().st_mode, 0o100644)
- self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex)
-
- def test_revision_two_roots(self):
+ assert (p / 'file1').stat().st_mode == 0o100644
+ assert (p / 'file1').read_text() == TEST_CONTENT
+ assert (p / 'link1').is_symlink
+ assert os.readlink(str(p / 'link1')) == 'file1'
+ assert (p / 'bin').stat().st_mode == 0o100755
+ assert (p / 'bin').read_bytes() == TEST_EXECUTABLE
+ assert (p / 'dir1/dir2/file').read_text() == TEST_CONTENT
+ assert (p / 'dir1/dir2/file').stat().st_mode == 0o100644
+ assert ert.repo.refs[b'HEAD'].decode() == obj_id_hex
+
+ def test_revision_two_roots(self, swh_git_loader):
#
# 1----3---4
# /
# 2----
#
+ storage = swh_git_loader.storage
repo = TestRepo()
with repo as rp:
(rp / 'file1').write_text(TEST_CONTENT)
@@ -327,17 +328,18 @@
repo.commit('add file3')
obj_id_hex = repo.repo.refs[b'HEAD'].decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
- with self.cook_extract_revision_gitfast(obj_id) as (ert, p):
- self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex)
+ with cook_extract_revision_gitfast(storage, obj_id) as (ert, p):
+ assert ert.repo.refs[b'HEAD'].decode() == obj_id_hex
- def test_revision_two_double_fork_merge(self):
+ def test_revision_two_double_fork_merge(self, swh_git_loader):
#
# 2---4---6
# / / /
# 1---3---5
#
+ storage = swh_git_loader.storage
repo = TestRepo()
with repo as rp:
(rp / 'file1').write_text(TEST_CONTENT)
@@ -360,12 +362,12 @@
obj_id_hex = repo.repo.refs[b'HEAD'].decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
- with self.cook_extract_revision_gitfast(obj_id) as (ert, p):
- self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex)
+ with cook_extract_revision_gitfast(storage, obj_id) as (ert, p):
+ assert ert.repo.refs[b'HEAD'].decode() == obj_id_hex
- def test_revision_triple_merge(self):
+ def test_revision_triple_merge(self, swh_git_loader):
#
# .---.---5
# / / /
@@ -373,6 +375,7 @@
# / / /
# 1---.---.
#
+ storage = swh_git_loader.storage
repo = TestRepo()
with repo as rp:
(rp / 'file1').write_text(TEST_CONTENT)
@@ -387,12 +390,13 @@
obj_id_hex = repo.repo.refs[b'HEAD'].decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
- with self.cook_extract_revision_gitfast(obj_id) as (ert, p):
- self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex)
+ with cook_extract_revision_gitfast(storage, obj_id) as (ert, p):
+ assert ert.repo.refs[b'HEAD'].decode() == obj_id_hex
- def test_revision_filtered_objects(self):
+ def test_revision_filtered_objects(self, swh_git_loader):
+ storage = swh_git_loader.storage
repo = TestRepo()
with repo as rp:
file_1, id_1 = hash_content(b'test1')
@@ -406,11 +410,11 @@
repo.commit()
obj_id_hex = repo.repo.refs[b'HEAD'].decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
# FIXME: storage.content_update() should be changed to allow things
# like that
- with self.storage.get_db().transaction() as cur:
+ with storage.get_db().transaction() as cur:
cur.execute("""update content set status = 'visible'
where sha1 = %s""", (id_1,))
cur.execute("""update content set status = 'hidden'
@@ -418,16 +422,17 @@
cur.execute("""update content set status = 'absent'
where sha1 = %s""", (id_3,))
- with self.cook_extract_revision_gitfast(obj_id) as (ert, p):
+ with cook_extract_revision_gitfast(storage, obj_id) as (ert, p):
ert.checkout(b'HEAD')
- self.assertEqual((p / 'file').read_bytes(), b'test1')
- self.assertEqual((p / 'hidden_file').read_bytes(), HIDDEN_MESSAGE)
- self.assertEqual((p / 'absent_file').read_bytes(), SKIPPED_MESSAGE)
+ assert (p / 'file').read_bytes() == b'test1'
+ assert (p / 'hidden_file').read_bytes() == HIDDEN_MESSAGE
+ assert (p / 'absent_file').read_bytes() == SKIPPED_MESSAGE
- def test_revision_bogus_perms(self):
+ def test_revision_bogus_perms(self, swh_git_loader):
# Some early git repositories have 664/775 permissions... let's check
# if all the weird modes are properly normalized in the revision
# cooker.
+ storage = swh_git_loader.storage
repo = TestRepo()
with repo as rp:
(rp / 'file').write_text(TEST_CONTENT)
@@ -437,24 +442,25 @@
(rp / 'wat').write_text(TEST_CONTENT)
(rp / 'wat').chmod(0o604)
repo.commit('initial commit')
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
obj_id_hex = repo.repo.refs[b'HEAD'].decode()
obj_id = hashutil.hash_to_bytes(obj_id_hex)
- with self.cook_extract_revision_gitfast(obj_id) as (ert, p):
+ with cook_extract_revision_gitfast(storage, obj_id) as (ert, p):
ert.checkout(b'HEAD')
- self.assertEqual((p / 'file').stat().st_mode, 0o100644)
- self.assertEqual((p / 'executable').stat().st_mode, 0o100755)
- self.assertEqual((p / 'wat').stat().st_mode, 0o100644)
+ assert (p / 'file').stat().st_mode == 0o100644
+ assert (p / 'executable').stat().st_mode == 0o100755
+ assert (p / 'wat').stat().st_mode == 0o100644
- def test_revision_null_fields(self):
+ def test_revision_null_fields(self, swh_git_loader):
# Our schema doesn't enforce a lot of non-null revision fields. We need
# to check these cases don't break the cooker.
+ storage = swh_git_loader.storage
repo = TestRepo()
with repo as rp:
(rp / 'file').write_text(TEST_CONTENT)
c = repo.commit('initial commit')
- self.load(str(rp))
+ load(swh_git_loader, str(rp))
repo.repo.refs[b'HEAD'].decode()
dir_id_hex = repo.repo[c].tree.decode()
dir_id = hashutil.hash_to_bytes(dir_id_hex)
@@ -474,13 +480,14 @@
'synthetic': True
}
- self.storage.revision_add([test_revision])
+ storage.revision_add([test_revision])
- with self.cook_extract_revision_gitfast(test_id) as (ert, p):
+ with cook_extract_revision_gitfast(storage, test_id) as (ert, p):
ert.checkout(b'HEAD')
- self.assertEqual((p / 'file').stat().st_mode, 0o100644)
+ assert (p / 'file').stat().st_mode == 0o100644
- def test_revision_revision_data(self):
+ def test_revision_revision_data(self, swh_git_loader):
+ storage = swh_git_loader.storage
target_rev = '0e8a3ad980ec179856012b7eecf4327e99cd44cd'
d = hashutil.hash_to_bytes('17a3e48bce37be5226490e750202ad3a9a1a3fe9')
r = hashutil.hash_to_bytes('1ecc9270c4fc61cfddbc65a774e91ef5c425a6f0')
@@ -496,7 +503,7 @@
}
],
}
- self.storage.directory_add([dir])
+ storage.directory_add([dir])
rev = {
'id': r,
@@ -511,8 +518,8 @@
'metadata': {},
'synthetic': True
}
- self.storage.revision_add([rev])
+ storage.revision_add([rev])
- with self.cook_stream_revision_gitfast(r) as stream:
+ with cook_stream_revision_gitfast(storage, r) as stream:
pattern = 'M 160000 {} submodule'.format(target_rev).encode()
- self.assertIn(pattern, stream.read())
+ assert pattern in stream.read()
diff --git a/swh/vault/tests/test_cookers_base.py b/swh/vault/tests/test_cookers_base.py
--- a/swh/vault/tests/test_cookers_base.py
+++ b/swh/vault/tests/test_cookers_base.py
@@ -3,7 +3,6 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import unittest
from unittest.mock import MagicMock
from swh.model import hashutil
@@ -22,10 +21,15 @@
class BaseVaultCookerMock(BaseVaultCooker):
CACHE_TYPE_KEY = TEST_OBJ_TYPE
- def __init__(self, *args, **kwargs):
- super().__init__(self.CACHE_TYPE_KEY, TEST_OBJ_ID, *args, **kwargs)
+ def __init__(self):
+ # we do not call super() here to bypass the building of db objects from
+ # config since we do mock these db objects
+ self.config = {}
self.storage = MagicMock()
self.backend = MagicMock()
+ self.obj_type = self.CACHE_TYPE_KEY
+ self.obj_id = hashutil.hash_to_bytes(TEST_OBJ_ID)
+ self.max_bundle_size = 1024
def check_exists(self):
return True
@@ -35,44 +39,43 @@
self.write(chunk)
-class TestBaseVaultCooker(unittest.TestCase):
- def test_simple_cook(self):
- cooker = BaseVaultCookerMock()
- cooker.cook()
- cooker.backend.put_bundle.assert_called_once_with(
- TEST_OBJ_TYPE, TEST_OBJ_ID, TEST_BUNDLE_CONTENT)
- cooker.backend.set_status.assert_called_with(
- TEST_OBJ_TYPE, TEST_OBJ_ID, 'done')
- cooker.backend.set_progress.assert_called_with(
- TEST_OBJ_TYPE, TEST_OBJ_ID, None)
- cooker.backend.send_notif.assert_called_with(
- TEST_OBJ_TYPE, TEST_OBJ_ID)
-
- def test_code_exception_cook(self):
- cooker = BaseVaultCookerMock()
- cooker.prepare_bundle = MagicMock()
- cooker.prepare_bundle.side_effect = RuntimeError("Nope")
- cooker.cook()
-
- # Potentially remove this when we have objstorage streaming
- cooker.backend.put_bundle.assert_not_called()
-
- cooker.backend.set_status.assert_called_with(
- TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed')
- self.assertNotIn("Nope", cooker.backend.set_progress.call_args[0][2])
- cooker.backend.send_notif.assert_called_with(
- TEST_OBJ_TYPE, TEST_OBJ_ID)
-
- def test_policy_exception_cook(self):
- cooker = BaseVaultCookerMock()
- cooker.max_bundle_size = 8
- cooker.cook()
-
- # Potentially remove this when we have objstorage streaming
- cooker.backend.put_bundle.assert_not_called()
-
- cooker.backend.set_status.assert_called_with(
- TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed')
- self.assertIn("exceeds", cooker.backend.set_progress.call_args[0][2])
- cooker.backend.send_notif.assert_called_with(
- TEST_OBJ_TYPE, TEST_OBJ_ID)
+def test_simple_cook():
+ cooker = BaseVaultCookerMock()
+ cooker.cook()
+ cooker.backend.put_bundle.assert_called_once_with(
+ TEST_OBJ_TYPE, TEST_OBJ_ID, TEST_BUNDLE_CONTENT)
+ cooker.backend.set_status.assert_called_with(
+ TEST_OBJ_TYPE, TEST_OBJ_ID, 'done')
+ cooker.backend.set_progress.assert_called_with(
+ TEST_OBJ_TYPE, TEST_OBJ_ID, None)
+ cooker.backend.send_notif.assert_called_with(
+ TEST_OBJ_TYPE, TEST_OBJ_ID)
+
+
+def test_code_exception_cook():
+ cooker = BaseVaultCookerMock()
+ cooker.prepare_bundle = MagicMock()
+ cooker.prepare_bundle.side_effect = RuntimeError("Nope")
+ cooker.cook()
+
+ # Potentially remove this when we have objstorage streaming
+ cooker.backend.put_bundle.assert_not_called()
+
+ cooker.backend.set_status.assert_called_with(
+ TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed')
+ assert "Nope" not in cooker.backend.set_progress.call_args[0][2]
+ cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID)
+
+
+def test_policy_exception_cook():
+ cooker = BaseVaultCookerMock()
+ cooker.max_bundle_size = 8
+ cooker.cook()
+
+ # Potentially remove this when we have objstorage streaming
+ cooker.backend.put_bundle.assert_not_called()
+
+ cooker.backend.set_status.assert_called_with(
+ TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed')
+ assert "exceeds" in cooker.backend.set_progress.call_args[0][2]
+ cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID)
diff --git a/swh/vault/tests/vault_testing.py b/swh/vault/tests/vault_testing.py
--- a/swh/vault/tests/vault_testing.py
+++ b/swh/vault/tests/vault_testing.py
@@ -3,60 +3,7 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import os
-import tempfile
-
from swh.model import hashutil
-from swh.vault.backend import VaultBackend
-
-from swh.storage.tests.storage_testing import StorageTestFixture
-from swh.vault.tests import SQL_DIR
-
-
-class VaultTestFixture(StorageTestFixture):
- """Mix this in a test subject class to get Vault Database testing support.
-
- This fixture requires to come before DbTestFixture and StorageTestFixture
- in the inheritance list as it uses their methods to setup its own internal
- components.
-
- Usage example:
-
- class TestVault(VaultTestFixture, unittest.TestCase):
- ...
- """
- TEST_DB_NAME = 'softwareheritage-test-vault'
- TEST_DB_DUMP = [StorageTestFixture.TEST_DB_DUMP,
- os.path.join(SQL_DIR, '*.sql')]
-
- def setUp(self):
- super().setUp()
- self.cache_root = tempfile.TemporaryDirectory('vault-cache-')
- self.vault_config = {
- 'storage': self.storage_config,
- 'cache': {
- 'cls': 'pathslicing',
- 'args': {
- 'root': self.cache_root.name,
- 'slicing': '0:1/1:5',
- 'allow_delete': True,
- }
- },
- 'db': 'postgresql:///' + self.TEST_DB_NAME,
- 'scheduler': None,
- }
- self.vault_backend = VaultBackend(self.vault_config)
-
- def tearDown(self):
- self.cache_root.cleanup()
- self.vault_backend.close()
- self.reset_storage_tables()
- self.reset_vault_tables()
- super().tearDown()
-
- def reset_vault_tables(self):
- excluded = {'dbversion'}
- self.reset_db_tables(self.TEST_DB_NAME, excluded=excluded)
def hash_content(content):
diff --git a/tox.ini b/tox.ini
--- a/tox.ini
+++ b/tox.ini
@@ -5,9 +5,8 @@
deps =
.[testing]
pytest-cov
- pifpaf
commands =
- pifpaf run postgresql -- pytest --cov=swh --cov-branch {posargs} -m 'not config_issue'
+ pytest --cov=swh --cov-branch {posargs}
[testenv:flake8]
skip_install = true
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 19 2024, 11:05 PM (11 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3232091
Attached To
D1108: Refactor Vault a bit, mainly pytest, config and BaseDb-based VaultBackend
Event Timeline
Log In to Comment