diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 380c658..69b3349 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,46 +1,40 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v2.4.0 hooks: - id: trailing-whitespace - id: flake8 - id: check-json - id: check-yaml - repo: https://github.com/codespell-project/codespell rev: v1.16.0 hooks: - id: codespell - repo: local hooks: - id: mypy name: mypy entry: mypy args: [swh] pass_filenames: false language: system types: [python] +- repo: https://github.com/python/black + rev: 19.10b0 + hooks: + - id: black + # unfortunately, we are far from being able to enable this... # - repo: https://github.com/PyCQA/pydocstyle.git # rev: 4.0.0 # hooks: # - id: pydocstyle # name: pydocstyle # description: pydocstyle is a static analysis tool for checking compliance with Python docstring conventions. # entry: pydocstyle --convention=google # language: python # types: [python] -# black requires py3.6+ -#- repo: https://github.com/python/black -# rev: 19.3b0 -# hooks: -# - id: black -# language_version: python3 -#- repo: https://github.com/asottile/blacken-docs -# rev: v1.0.0-1 -# hooks: -# - id: blacken-docs -# additional_dependencies: [black==19.3b0] diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..8d79b7e --- /dev/null +++ b/setup.cfg @@ -0,0 +1,6 @@ +[flake8] +# E203: whitespaces before ':' +# E231: missing whitespace after ',' +# W503: line break before binary operator +ignore = E203,E231,W503 +max-line-length = 88 diff --git a/setup.py b/setup.py index c0dcb71..5dbcf3a 100644 --- a/setup.py +++ b/setup.py @@ -1,71 +1,71 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from setuptools import setup, find_packages from os import path from io import open here = path.abspath(path.dirname(__file__)) # Get the long description from the README file -with open(path.join(here, 'README.md'), encoding='utf-8') as f: +with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: - reqf = 'requirements-%s.txt' % name + reqf = "requirements-%s.txt" % name else: - reqf = 'requirements.txt' + reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() - if not line or line.startswith('#'): + if not line or line.startswith("#"): continue requirements.append(line) return requirements setup( - name='swh.vault', - description='Software Heritage vault', + name="swh.vault", + description="Software Heritage vault", long_description=long_description, - long_description_content_type='text/markdown', - author='Software Heritage developers', - author_email='swh-devel@inria.fr', - url='https://forge.softwareheritage.org/diffusion/DVAU/', + long_description_content_type="text/markdown", + author="Software Heritage developers", + author_email="swh-devel@inria.fr", + url="https://forge.softwareheritage.org/diffusion/DVAU/", packages=find_packages(), - install_requires=parse_requirements() + parse_requirements('swh'), - setup_requires=['vcversioner'], - extras_require={'testing': parse_requirements('test')}, + install_requires=parse_requirements() + parse_requirements("swh"), + setup_requires=["vcversioner"], + extras_require={"testing": parse_requirements("test")}, vcversioner={}, include_package_data=True, zip_safe=False, - entry_points=''' + entry_points=""" [console_scripts] swh-vault=swh.vault.cli:vault [swh.cli.subcommands] vault=swh.vault.cli:vault - ''', + """, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ - 'Bug Reports': 'https://forge.softwareheritage.org/maniphest', - 'Funding': 'https://www.softwareheritage.org/donate', - 'Source': 'https://forge.softwareheritage.org/source/swh-vault', + "Bug Reports": "https://forge.softwareheritage.org/maniphest", + "Funding": "https://www.softwareheritage.org/donate", + "Source": "https://forge.softwareheritage.org/source/swh-vault", }, ) diff --git a/swh/vault/__init__.py b/swh/vault/__init__.py index f9bcd02..032e997 100644 --- a/swh/vault/__init__.py +++ b/swh/vault/__init__.py @@ -1,40 +1,41 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import logging logger = logging.getLogger(__name__) -def get_vault(cls='remote', args={}): +def get_vault(cls="remote", args={}): """ Get a vault object of class `vault_class` with arguments `vault_args`. Args: vault (dict): dictionary with keys: - cls (str): vault's class, either 'remote' - args (dict): dictionary with keys Returns: an instance of VaultBackend (either local or remote) Raises: ValueError if passed an unknown storage class. """ - if cls == 'remote': + if cls == "remote": from .api.client import RemoteVaultClient as Vault - elif cls == 'local': + elif cls == "local": from swh.scheduler import get_scheduler from swh.storage import get_storage from swh.vault.cache import VaultCache from swh.vault.backend import VaultBackend as Vault - args['cache'] = VaultCache(**args['cache']) - args['storage'] = get_storage(**args['storage']) - args['scheduler'] = get_scheduler(**args['scheduler']) + + args["cache"] = VaultCache(**args["cache"]) + args["storage"] = get_storage(**args["storage"]) + args["scheduler"] = get_scheduler(**args["scheduler"]) else: - raise ValueError('Unknown storage class `%s`' % cls) - logger.debug('Instantiating %s with %s' % (Vault, args)) + raise ValueError("Unknown storage class `%s`" % cls) + logger.debug("Instantiating %s with %s" % (Vault, args)) return Vault(**args) diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py index 2d26fd2..101a0a0 100644 --- a/swh/vault/api/client.py +++ b/swh/vault/api/client.py @@ -1,58 +1,56 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.model import hashutil from swh.core.api import RPCClient class RemoteVaultClient(RPCClient): """Client to the Software Heritage vault cache.""" # Web API endpoints def fetch(self, obj_type, obj_id): hex_id = hashutil.hash_to_hex(obj_id) - return self.get('fetch/{}/{}'.format(obj_type, hex_id)) + return self.get("fetch/{}/{}".format(obj_type, hex_id)) def cook(self, obj_type, obj_id, email=None): hex_id = hashutil.hash_to_hex(obj_id) - return self.post('cook/{}/{}'.format(obj_type, hex_id), - data={}, - params=({'email': email} if email else None)) + return self.post( + "cook/{}/{}".format(obj_type, hex_id), + data={}, + params=({"email": email} if email else None), + ) def progress(self, obj_type, obj_id): hex_id = hashutil.hash_to_hex(obj_id) - return self.get('progress/{}/{}'.format(obj_type, hex_id)) + return self.get("progress/{}/{}".format(obj_type, hex_id)) # Cookers endpoints def set_progress(self, obj_type, obj_id, progress): hex_id = hashutil.hash_to_hex(obj_id) - return self.post('set_progress/{}/{}'.format(obj_type, hex_id), - data=progress) + return self.post("set_progress/{}/{}".format(obj_type, hex_id), data=progress) def set_status(self, obj_type, obj_id, status): hex_id = hashutil.hash_to_hex(obj_id) - return self.post('set_status/{}/{}' .format(obj_type, hex_id), - data=status) + return self.post("set_status/{}/{}".format(obj_type, hex_id), data=status) # TODO: handle streaming properly def put_bundle(self, obj_type, obj_id, bundle): hex_id = hashutil.hash_to_hex(obj_id) - return self.post('put_bundle/{}/{}' .format(obj_type, hex_id), - data=bundle) + return self.post("put_bundle/{}/{}".format(obj_type, hex_id), data=bundle) def send_notif(self, obj_type, obj_id): hex_id = hashutil.hash_to_hex(obj_id) - return self.post('send_notif/{}/{}' .format(obj_type, hex_id), - data=None) + return self.post("send_notif/{}/{}".format(obj_type, hex_id), data=None) # Batch endpoints def batch_cook(self, batch): - return self.post('batch_cook', data=batch) + return self.post("batch_cook", data=batch) def batch_progress(self, batch_id): - return self.get('batch_progress/{}'.format(batch_id)) + return self.get("batch_progress/{}".format(batch_id)) diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py index 5d242eb..ee2620e 100644 --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -1,241 +1,241 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import aiohttp.web import asyncio import collections from swh.core import config from swh.core.api.asynchronous import ( RPCServerApp, encode_data_server as encode_data, decode_request, ) from swh.model import hashutil from swh.vault import get_vault from swh.vault.cookers import COOKER_TYPES from swh.vault.backend import NotFoundExc -DEFAULT_CONFIG_PATH = 'vault/server' +DEFAULT_CONFIG_PATH = "vault/server" DEFAULT_CONFIG = { - 'storage': ('dict', { - 'cls': 'remote', - 'args': { - 'url': 'http://localhost:5002/', + "storage": ("dict", {"cls": "remote", "args": {"url": "http://localhost:5002/",},}), + "cache": ( + "dict", + { + "cls": "pathslicing", + "args": {"root": "/srv/softwareheritage/vault", "slicing": "0:1/1:5",}, }, - }), - 'cache': ('dict', { - 'cls': 'pathslicing', - 'args': { - 'root': '/srv/softwareheritage/vault', - 'slicing': '0:1/1:5', - }, - }), - 'client_max_size': ('int', 1024 ** 3), - 'vault': ('dict', { - 'cls': 'local', - 'args': { - 'db': 'dbname=softwareheritage-vault-dev', - }, - }), - 'scheduler': ('dict', { - 'cls': 'remote', - 'args': { - 'url': 'http://localhost:5008/', - } - }), + ), + "client_max_size": ("int", 1024 ** 3), + "vault": ( + "dict", + {"cls": "local", "args": {"db": "dbname=softwareheritage-vault-dev",},}, + ), + "scheduler": ( + "dict", + {"cls": "remote", "args": {"url": "http://localhost:5008/",}}, + ), } @asyncio.coroutine def index(request): return aiohttp.web.Response(body="SWH Vault API server") # Web API endpoints + @asyncio.coroutine def vault_fetch(request): - obj_type = request.match_info['type'] - obj_id = request.match_info['id'] + obj_type = request.match_info["type"] + obj_id = request.match_info["id"] - if not request.app['backend'].is_available(obj_type, obj_id): + if not request.app["backend"].is_available(obj_type, obj_id): raise aiohttp.web.HTTPNotFound - return encode_data(request.app['backend'].fetch(obj_type, obj_id)) + return encode_data(request.app["backend"].fetch(obj_type, obj_id)) def user_info(task_info): - return {'id': task_info['id'], - 'status': task_info['task_status'], - 'progress_message': task_info['progress_msg'], - 'obj_type': task_info['type'], - 'obj_id': hashutil.hash_to_hex(task_info['object_id'])} + return { + "id": task_info["id"], + "status": task_info["task_status"], + "progress_message": task_info["progress_msg"], + "obj_type": task_info["type"], + "obj_id": hashutil.hash_to_hex(task_info["object_id"]), + } @asyncio.coroutine def vault_cook(request): - obj_type = request.match_info['type'] - obj_id = request.match_info['id'] - email = request.query.get('email') - sticky = request.query.get('sticky') in ('true', '1') + obj_type = request.match_info["type"] + obj_id = request.match_info["id"] + email = request.query.get("email") + sticky = request.query.get("sticky") in ("true", "1") if obj_type not in COOKER_TYPES: raise aiohttp.web.HTTPNotFound try: - info = request.app['backend'].cook_request(obj_type, obj_id, - email=email, sticky=sticky) + info = request.app["backend"].cook_request( + obj_type, obj_id, email=email, sticky=sticky + ) except NotFoundExc: raise aiohttp.web.HTTPNotFound # TODO: return 201 status (Created) once the api supports it return encode_data(user_info(info)) @asyncio.coroutine def vault_progress(request): - obj_type = request.match_info['type'] - obj_id = request.match_info['id'] + obj_type = request.match_info["type"] + obj_id = request.match_info["id"] - info = request.app['backend'].task_info(obj_type, obj_id) + info = request.app["backend"].task_info(obj_type, obj_id) if not info: raise aiohttp.web.HTTPNotFound return encode_data(user_info(info)) # Cookers endpoints + @asyncio.coroutine def set_progress(request): - obj_type = request.match_info['type'] - obj_id = request.match_info['id'] + obj_type = request.match_info["type"] + obj_id = request.match_info["id"] progress = yield from decode_request(request) - request.app['backend'].set_progress(obj_type, obj_id, progress) + request.app["backend"].set_progress(obj_type, obj_id, progress) return encode_data(True) # FIXME: success value? @asyncio.coroutine def set_status(request): - obj_type = request.match_info['type'] - obj_id = request.match_info['id'] + obj_type = request.match_info["type"] + obj_id = request.match_info["id"] status = yield from decode_request(request) - request.app['backend'].set_status(obj_type, obj_id, status) + request.app["backend"].set_status(obj_type, obj_id, status) return encode_data(True) # FIXME: success value? @asyncio.coroutine def put_bundle(request): - obj_type = request.match_info['type'] - obj_id = request.match_info['id'] + obj_type = request.match_info["type"] + obj_id = request.match_info["id"] # TODO: handle streaming properly content = yield from decode_request(request) - request.app['backend'].cache.add(obj_type, obj_id, content) + request.app["backend"].cache.add(obj_type, obj_id, content) return encode_data(True) # FIXME: success value? @asyncio.coroutine def send_notif(request): - obj_type = request.match_info['type'] - obj_id = request.match_info['id'] - request.app['backend'].send_all_notifications(obj_type, obj_id) + obj_type = request.match_info["type"] + obj_id = request.match_info["id"] + request.app["backend"].send_all_notifications(obj_type, obj_id) return encode_data(True) # FIXME: success value? # Batch endpoints + @asyncio.coroutine def batch_cook(request): batch = yield from decode_request(request) for obj_type, obj_id in batch: if obj_type not in COOKER_TYPES: raise aiohttp.web.HTTPNotFound - batch_id = request.app['backend'].batch_cook(batch) - return encode_data({'id': batch_id}) + batch_id = request.app["backend"].batch_cook(batch) + return encode_data({"id": batch_id}) @asyncio.coroutine def batch_progress(request): - batch_id = request.match_info['batch_id'] - bundles = request.app['backend'].batch_info(batch_id) + batch_id = request.match_info["batch_id"] + bundles = request.app["backend"].batch_info(batch_id) if not bundles: raise aiohttp.web.HTTPNotFound bundles = [user_info(bundle) for bundle in bundles] - counter = collections.Counter(b['status'] for b in bundles) - res = {'bundles': bundles, 'total': len(bundles), - **{k: 0 for k in ('new', 'pending', 'done', 'failed')}, - **dict(counter)} + counter = collections.Counter(b["status"] for b in bundles) + res = { + "bundles": bundles, + "total": len(bundles), + **{k: 0 for k in ("new", "pending", "done", "failed")}, + **dict(counter), + } return encode_data(res) # Web server + def make_app(backend, **kwargs): app = RPCServerApp(**kwargs) - app.router.add_route('GET', '/', index) + app.router.add_route("GET", "/", index) # Endpoints used by the web API - app.router.add_route('GET', '/fetch/{type}/{id}', vault_fetch) - app.router.add_route('POST', '/cook/{type}/{id}', vault_cook) - app.router.add_route('GET', '/progress/{type}/{id}', vault_progress) + app.router.add_route("GET", "/fetch/{type}/{id}", vault_fetch) + app.router.add_route("POST", "/cook/{type}/{id}", vault_cook) + app.router.add_route("GET", "/progress/{type}/{id}", vault_progress) # Endpoints used by the Cookers - app.router.add_route('POST', '/set_progress/{type}/{id}', set_progress) - app.router.add_route('POST', '/set_status/{type}/{id}', set_status) - app.router.add_route('POST', '/put_bundle/{type}/{id}', put_bundle) - app.router.add_route('POST', '/send_notif/{type}/{id}', send_notif) + app.router.add_route("POST", "/set_progress/{type}/{id}", set_progress) + app.router.add_route("POST", "/set_status/{type}/{id}", set_status) + app.router.add_route("POST", "/put_bundle/{type}/{id}", put_bundle) + app.router.add_route("POST", "/send_notif/{type}/{id}", send_notif) # Endpoints for batch requests - app.router.add_route('POST', '/batch_cook', batch_cook) - app.router.add_route('GET', '/batch_progress/{batch_id}', batch_progress) + app.router.add_route("POST", "/batch_cook", batch_cook) + app.router.add_route("GET", "/batch_progress/{batch_id}", batch_progress) - app['backend'] = backend + app["backend"] = backend return app def get_local_backend(cfg): - if 'vault' not in cfg: + if "vault" not in cfg: raise ValueError("missing '%vault' configuration") - vcfg = cfg['vault'] - if vcfg['cls'] != 'local': + vcfg = cfg["vault"] + if vcfg["cls"] != "local": raise EnvironmentError( - "The vault backend can only be started with a 'local' " - "configuration", err=True) - args = vcfg['args'] - if 'cache' not in args: - args['cache'] = cfg.get('cache') - if 'storage' not in args: - args['storage'] = cfg.get('storage') - if 'scheduler' not in args: - args['scheduler'] = cfg.get('scheduler') - - for key in ('cache', 'storage', 'scheduler'): + "The vault backend can only be started with a 'local' " "configuration", + err=True, + ) + args = vcfg["args"] + if "cache" not in args: + args["cache"] = cfg.get("cache") + if "storage" not in args: + args["storage"] = cfg.get("storage") + if "scheduler" not in args: + args["scheduler"] = cfg.get("scheduler") + + for key in ("cache", "storage", "scheduler"): if not args.get(key): - raise ValueError( - "invalid configuration; missing %s config entry." % key) + raise ValueError("invalid configuration; missing %s config entry." % key) - return get_vault('local', args) + return get_vault("local", args) def make_app_from_configfile(config_file=None, **kwargs): if config_file is None: config_file = DEFAULT_CONFIG_PATH - config_file = os.environ.get('SWH_CONFIG_FILENAME', config_file) + config_file = os.environ.get("SWH_CONFIG_FILENAME", config_file) if os.path.isfile(config_file): cfg = config.read(config_file, DEFAULT_CONFIG) else: cfg = config.load_named_config(config_file, DEFAULT_CONFIG) vault = get_local_backend(cfg) - return make_app(backend=vault, client_max_size=cfg['client_max_size'], - **kwargs) + return make_app(backend=vault, client_max_size=cfg["client_max_size"], **kwargs) -if __name__ == '__main__': - print('Deprecated. Use swh-vault ') +if __name__ == "__main__": + print("Deprecated. Use swh-vault ") diff --git a/swh/vault/backend.py b/swh/vault/backend.py index fa18550..c9e67a8 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,414 +1,492 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import smtplib import psycopg2.extras import psycopg2.pool from email.mime.text import MIMEText from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.model import hashutil from swh.scheduler.utils import create_oneshot_task_dict from swh.vault.cookers import get_cooker_cls -cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' +cooking_task_name = "swh.vault.cooking_tasks.SWHCookingTask" -NOTIF_EMAIL_FROM = ('"Software Heritage Vault" ' - '') -NOTIF_EMAIL_SUBJECT_SUCCESS = ("Bundle ready: {obj_type} {short_id}") -NOTIF_EMAIL_SUBJECT_FAILURE = ("Bundle failed: {obj_type} {short_id}") +NOTIF_EMAIL_FROM = '"Software Heritage Vault" ' "" +NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {obj_type} {short_id}" +NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {obj_type} {short_id}" NOTIF_EMAIL_BODY_SUCCESS = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ NOTIF_EMAIL_BODY_FAILURE = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle could not be cooked for the following reason: {progress_msg} We apologize for the inconvenience. --\x20 The Software Heritage Developers """ class NotFoundExc(Exception): """Bundle was not found.""" + pass def batch_to_bytes(batch): - return [(obj_type, hashutil.hash_to_bytes(obj_id)) - for obj_type, obj_id in batch] + return [(obj_type, hashutil.hash_to_bytes(obj_id)) for obj_type, obj_id in batch] class VaultBackend: """ Backend for the Software Heritage vault. """ + def __init__(self, db, cache, scheduler, storage=None, **config): self.config = config self.cache = cache self.scheduler = scheduler self.storage = storage self.smtp_server = smtplib.SMTP() self._pool = psycopg2.pool.ThreadedConnectionPool( - config.get('min_pool_conns', 1), - config.get('max_pool_conns', 10), + config.get("min_pool_conns", 1), + config.get("max_pool_conns", 10), db, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() @db_transaction() def task_info(self, obj_type, obj_id, db=None, cur=None): """Fetch information from a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) - cur.execute(''' + cur.execute( + """ SELECT id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle - WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) + WHERE type = %s AND object_id = %s""", + (obj_type, obj_id), + ) res = cur.fetchone() if res: - res['object_id'] = bytes(res['object_id']) + res["object_id"] = bytes(res["object_id"]) return res def _send_task(self, *args): """Send a cooking task to the celery scheduler""" - task = create_oneshot_task_dict('cook-vault-bundle', *args) + task = create_oneshot_task_dict("cook-vault-bundle", *args) added_tasks = self.scheduler.create_tasks([task]) - return added_tasks[0]['id'] + return added_tasks[0]["id"] @db_transaction() def create_task(self, obj_type, obj_id, sticky=False, db=None, cur=None): """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) hex_id = hashutil.hash_to_hex(obj_id) cooker_class = get_cooker_cls(obj_type) - cooker = cooker_class(obj_type, hex_id, - backend=self, storage=self.storage) + cooker = cooker_class(obj_type, hex_id, backend=self, storage=self.storage) if not cooker.check_exists(): raise NotFoundExc("Object {} was not found.".format(hex_id)) - cur.execute(''' + cur.execute( + """ INSERT INTO vault_bundle (type, object_id, sticky) - VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky)) + VALUES (%s, %s, %s)""", + (obj_type, obj_id, sticky), + ) db.conn.commit() task_id = self._send_task(obj_type, hex_id) - cur.execute(''' + cur.execute( + """ UPDATE vault_bundle SET task_id = %s - WHERE type = %s AND object_id = %s''', (task_id, obj_type, obj_id)) + WHERE type = %s AND object_id = %s""", + (task_id, obj_type, obj_id), + ) @db_transaction() def add_notif_email(self, obj_type, obj_id, email, db=None, cur=None): """Add an e-mail address to notify when a given bundle is ready""" obj_id = hashutil.hash_to_bytes(obj_id) - cur.execute(''' + cur.execute( + """ INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle - WHERE type = %s AND object_id = %s))''', - (email, obj_type, obj_id)) + WHERE type = %s AND object_id = %s))""", + (email, obj_type, obj_id), + ) @db_transaction() - def cook_request(self, obj_type, obj_id, *, sticky=False, - email=None, db=None, cur=None): + def cook_request( + self, obj_type, obj_id, *, sticky=False, email=None, db=None, cur=None + ): """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" obj_id = hashutil.hash_to_bytes(obj_id) info = self.task_info(obj_type, obj_id) # If there's a failed bundle entry, delete it first. - if info is not None and info['task_status'] == 'failed': - cur.execute('''DELETE FROM vault_bundle - WHERE type = %s AND object_id = %s''', - (obj_type, obj_id)) + if info is not None and info["task_status"] == "failed": + cur.execute( + """DELETE FROM vault_bundle + WHERE type = %s AND object_id = %s""", + (obj_type, obj_id), + ) db.conn.commit() info = None # If there's no bundle entry, create the task. if info is None: self.create_task(obj_type, obj_id, sticky) if email is not None: # If the task is already done, send the email directly - if info is not None and info['task_status'] == 'done': - self.send_notification(None, email, obj_type, obj_id, - info['task_status']) + if info is not None and info["task_status"] == "done": + self.send_notification( + None, email, obj_type, obj_id, info["task_status"] + ) # Else, add it to the notification queue else: self.add_notif_email(obj_type, obj_id, email) info = self.task_info(obj_type, obj_id) return info @db_transaction() def batch_cook(self, batch, db=None, cur=None): """Cook a batch of bundles and returns the cooking id.""" # Import execute_values at runtime only, because it requires # psycopg2 >= 2.7 (only available on postgresql servers) from psycopg2.extras import execute_values - cur.execute(''' + cur.execute( + """ INSERT INTO vault_batch (id) VALUES (DEFAULT) - RETURNING id''') - batch_id = cur.fetchone()['id'] + RETURNING id""" + ) + batch_id = cur.fetchone()["id"] batch = batch_to_bytes(batch) # Delete all failed bundles from the batch - cur.execute(''' + cur.execute( + """ DELETE FROM vault_bundle WHERE task_status = 'failed' - AND (type, object_id) IN %s''', (tuple(batch),)) + AND (type, object_id) IN %s""", + (tuple(batch),), + ) # Insert all the bundles, return the new ones - execute_values(cur, ''' + execute_values( + cur, + """ INSERT INTO vault_bundle (type, object_id) - VALUES %s ON CONFLICT DO NOTHING''', batch) + VALUES %s ON CONFLICT DO NOTHING""", + batch, + ) # Get the bundle ids and task status - cur.execute(''' + cur.execute( + """ SELECT id, type, object_id, task_id FROM vault_bundle - WHERE (type, object_id) IN %s''', (tuple(batch),)) + WHERE (type, object_id) IN %s""", + (tuple(batch),), + ) bundles = cur.fetchall() # Insert the batch-bundle entries - batch_id_bundle_ids = [(batch_id, row['id']) for row in bundles] - execute_values(cur, ''' + batch_id_bundle_ids = [(batch_id, row["id"]) for row in bundles] + execute_values( + cur, + """ INSERT INTO vault_batch_bundle (batch_id, bundle_id) - VALUES %s ON CONFLICT DO NOTHING''', - batch_id_bundle_ids) + VALUES %s ON CONFLICT DO NOTHING""", + batch_id_bundle_ids, + ) db.conn.commit() # Get the tasks to fetch - batch_new = [(row['type'], bytes(row['object_id'])) - for row in bundles if row['task_id'] is None] + batch_new = [ + (row["type"], bytes(row["object_id"])) + for row in bundles + if row["task_id"] is None + ] # Send the tasks - args_batch = [(obj_type, hashutil.hash_to_hex(obj_id)) - for obj_type, obj_id in batch_new] + args_batch = [ + (obj_type, hashutil.hash_to_hex(obj_id)) for obj_type, obj_id in batch_new + ] # TODO: change once the scheduler handles priority tasks - tasks = [create_oneshot_task_dict('swh-vault-batch-cooking', *args) - for args in args_batch] + tasks = [ + create_oneshot_task_dict("swh-vault-batch-cooking", *args) + for args in args_batch + ] added_tasks = self.scheduler.create_tasks(tasks) - tasks_ids_bundle_ids = zip([task['id'] for task in added_tasks], - batch_new) - tasks_ids_bundle_ids = [(task_id, obj_type, obj_id) - for task_id, (obj_type, obj_id) - in tasks_ids_bundle_ids] + tasks_ids_bundle_ids = zip([task["id"] for task in added_tasks], batch_new) + tasks_ids_bundle_ids = [ + (task_id, obj_type, obj_id) + for task_id, (obj_type, obj_id) in tasks_ids_bundle_ids + ] # Update the task ids - execute_values(cur, ''' + execute_values( + cur, + """ UPDATE vault_bundle SET task_id = s_task_id FROM (VALUES %s) AS sub (s_task_id, s_type, s_object_id) - WHERE type = s_type::cook_type AND object_id = s_object_id ''', - tasks_ids_bundle_ids) + WHERE type = s_type::cook_type AND object_id = s_object_id """, + tasks_ids_bundle_ids, + ) return batch_id @db_transaction() def batch_info(self, batch_id, db=None, cur=None): """Fetch information from a batch of bundles""" - cur.execute(''' + cur.execute( + """ SELECT vault_bundle.id as id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_batch_bundle LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id - WHERE batch_id = %s''', (batch_id,)) + WHERE batch_id = %s""", + (batch_id,), + ) res = cur.fetchall() if res: for d in res: - d['object_id'] = bytes(d['object_id']) + d["object_id"] = bytes(d["object_id"]) return res @db_transaction() def is_available(self, obj_type, obj_id, db=None, cur=None): """Check whether a bundle is available for retrieval""" info = self.task_info(obj_type, obj_id, cur=cur) - return (info is not None - and info['task_status'] == 'done' - and self.cache.is_cached(obj_type, obj_id)) + return ( + info is not None + and info["task_status"] == "done" + and self.cache.is_cached(obj_type, obj_id) + ) @db_transaction() def fetch(self, obj_type, obj_id, db=None, cur=None): """Retrieve a bundle from the cache""" if not self.is_available(obj_type, obj_id, cur=cur): return None self.update_access_ts(obj_type, obj_id, cur=cur) return self.cache.get(obj_type, obj_id) @db_transaction() def update_access_ts(self, obj_type, obj_id, db=None, cur=None): """Update the last access timestamp of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) - cur.execute(''' + cur.execute( + """ UPDATE vault_bundle SET ts_last_access = NOW() - WHERE type = %s AND object_id = %s''', - (obj_type, obj_id)) + WHERE type = %s AND object_id = %s""", + (obj_type, obj_id), + ) @db_transaction() def set_status(self, obj_type, obj_id, status, db=None, cur=None): """Set the cooking status of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) - req = (''' + req = ( + """ UPDATE vault_bundle - SET task_status = %s ''' - + (''', ts_done = NOW() ''' if status == 'done' else '') - + '''WHERE type = %s AND object_id = %s''') + SET task_status = %s """ + + (""", ts_done = NOW() """ if status == "done" else "") + + """WHERE type = %s AND object_id = %s""" + ) cur.execute(req, (status, obj_type, obj_id)) @db_transaction() def set_progress(self, obj_type, obj_id, progress, db=None, cur=None): """Set the cooking progress of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) - cur.execute(''' + cur.execute( + """ UPDATE vault_bundle SET progress_msg = %s - WHERE type = %s AND object_id = %s''', - (progress, obj_type, obj_id)) + WHERE type = %s AND object_id = %s""", + (progress, obj_type, obj_id), + ) @db_transaction() def send_all_notifications(self, obj_type, obj_id, db=None, cur=None): """Send all the e-mails in the notification list of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) - cur.execute(''' + cur.execute( + """ SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id - WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', - (obj_type, obj_id)) + WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s""", + (obj_type, obj_id), + ) for d in cur: - self.send_notification(d['id'], d['email'], obj_type, obj_id, - status=d['task_status'], - progress_msg=d['progress_msg']) + self.send_notification( + d["id"], + d["email"], + obj_type, + obj_id, + status=d["task_status"], + progress_msg=d["progress_msg"], + ) @db_transaction() - def send_notification(self, n_id, email, obj_type, obj_id, status, - progress_msg=None, db=None, cur=None): + def send_notification( + self, + n_id, + email, + obj_type, + obj_id, + status, + progress_msg=None, + db=None, + cur=None, + ): """Send the notification of a bundle to a specific e-mail""" hex_id = hashutil.hash_to_hex(obj_id) short_id = hex_id[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail - url = ('https://archive.softwareheritage.org/api/1/vault/{}/{}/' - 'raw'.format(obj_type, hex_id)) + url = "https://archive.softwareheritage.org/api/1/vault/{}/{}/" "raw".format( + obj_type, hex_id + ) - if status == 'done': + if status == "done": text = NOTIF_EMAIL_BODY_SUCCESS.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) msg = MIMEText(text) - msg['Subject'] = (NOTIF_EMAIL_SUBJECT_SUCCESS - .format(obj_type=obj_type, short_id=short_id)) - elif status == 'failed': + msg["Subject"] = NOTIF_EMAIL_SUBJECT_SUCCESS.format( + obj_type=obj_type, short_id=short_id + ) + elif status == "failed": text = NOTIF_EMAIL_BODY_FAILURE.strip() - text = text.format(obj_type=obj_type, hex_id=hex_id, - progress_msg=progress_msg) + text = text.format( + obj_type=obj_type, hex_id=hex_id, progress_msg=progress_msg + ) msg = MIMEText(text) - msg['Subject'] = (NOTIF_EMAIL_SUBJECT_FAILURE - .format(obj_type=obj_type, short_id=short_id)) + msg["Subject"] = NOTIF_EMAIL_SUBJECT_FAILURE.format( + obj_type=obj_type, short_id=short_id + ) else: - raise RuntimeError("send_notification called on a '{}' bundle" - .format(status)) + raise RuntimeError( + "send_notification called on a '{}' bundle".format(status) + ) - msg['From'] = NOTIF_EMAIL_FROM - msg['To'] = email + msg["From"] = NOTIF_EMAIL_FROM + msg["To"] = email self._smtp_send(msg) if n_id is not None: - cur.execute(''' + cur.execute( + """ DELETE FROM vault_notif_email - WHERE id = %s''', (n_id,)) + WHERE id = %s""", + (n_id,), + ) def _smtp_send(self, msg): # Reconnect if needed try: status = self.smtp_server.noop()[0] except smtplib.SMTPException: status = -1 if status != 250: - self.smtp_server.connect('localhost', 25) + self.smtp_server.connect("localhost", 25) # Send the message self.smtp_server.send_message(msg) @db_transaction() def _cache_expire(self, cond, *args, db=None, cur=None): """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT - cur.execute(''' + cur.execute( + """ DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, object_id - '''.format(cond), args) + """.format( + cond + ), + args, + ) for d in cur: - self.cache.delete(d['type'], bytes(d['object_id'])) + self.cache.delete(d["type"], bytes(d["object_id"])) @db_transaction() - def cache_expire_oldest(self, n=1, by='last_access', db=None, cur=None): + def cache_expire_oldest(self, n=1, by="last_access", db=None, cur=None): """Expire the `n` oldest bundles""" - assert by in ('created', 'done', 'last_access') - filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n) + assert by in ("created", "done", "last_access") + filter = """ORDER BY ts_{} LIMIT {}""".format(by, n) return self._cache_expire(filter) @db_transaction() - def cache_expire_until(self, date, by='last_access', db=None, cur=None): + def cache_expire_until(self, date, by="last_access", db=None, cur=None): """Expire all the bundles until a certain date""" - assert by in ('created', 'done', 'last_access') - filter = '''AND ts_{} <= %s'''.format(by) + assert by in ("created", "done", "last_access") + filter = """AND ts_{} <= %s""".format(by) return self._cache_expire(filter, date) diff --git a/swh/vault/cache.py b/swh/vault/cache.py index 2a6b231..c2f67f3 100644 --- a/swh/vault/cache.py +++ b/swh/vault/cache.py @@ -1,47 +1,47 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.model import hashutil from swh.objstorage import get_objstorage from swh.objstorage.objstorage import compute_hash class VaultCache: """The Vault cache is an object storage that stores Vault bundles. This implementation computes sha1(':') as the internal identifiers used in the underlying objstorage. """ def __init__(self, **objstorage): self.objstorage = get_objstorage(**objstorage) def add(self, obj_type, obj_id, content): sid = self._get_internal_id(obj_type, obj_id) return self.objstorage.add(content, sid) def get(self, obj_type, obj_id): sid = self._get_internal_id(obj_type, obj_id) return self.objstorage.get(hashutil.hash_to_bytes(sid)) def delete(self, obj_type, obj_id): sid = self._get_internal_id(obj_type, obj_id) return self.objstorage.delete(hashutil.hash_to_bytes(sid)) def add_stream(self, obj_type, obj_id, content_iter): sid = self._get_internal_id(obj_type, obj_id) return self.objstorage.add_stream(content_iter, sid) def get_stream(self, obj_type, obj_id): sid = self._get_internal_id(obj_type, obj_id) return self.objstorage.get_stream(hashutil.hash_to_bytes(sid)) def is_cached(self, obj_type, obj_id): sid = self._get_internal_id(obj_type, obj_id) return hashutil.hash_to_bytes(sid) in self.objstorage def _get_internal_id(self, obj_type, obj_id): obj_id = hashutil.hash_to_hex(obj_id) - return compute_hash('{}:{}'.format(obj_type, obj_id).encode()) + return compute_hash("{}:{}".format(obj_type, obj_id).encode()) diff --git a/swh/vault/cli.py b/swh/vault/cli.py index fd0133e..631ba0e 100644 --- a/swh/vault/cli.py +++ b/swh/vault/cli.py @@ -1,71 +1,92 @@ import os import logging import click from swh.core.config import SWH_CONFIG_DIRECTORIES, SWH_CONFIG_EXTENSIONS from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.vault.api.server import make_app_from_configfile, DEFAULT_CONFIG_PATH CFG_HELP = """Software Heritage Vault RPC server. If the CONFIGFILE option is not set, the default config file search will be used; first the SWH_CONFIG_FILENAME environment variable will be checked, then the config file will be searched in: -%s""" % ('\n\n'.join('- %s(%s)' % ( - os.path.join(d, DEFAULT_CONFIG_PATH), '|'.join(SWH_CONFIG_EXTENSIONS)) - for d in SWH_CONFIG_DIRECTORIES)) +%s""" % ( + "\n\n".join( + "- %s(%s)" + % (os.path.join(d, DEFAULT_CONFIG_PATH), "|".join(SWH_CONFIG_EXTENSIONS)) + for d in SWH_CONFIG_DIRECTORIES + ) +) -@click.group(name='vault', context_settings=CONTEXT_SETTINGS, - cls=AliasedGroup) +@click.group(name="vault", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) @click.pass_context def vault(ctx): - '''Software Heritage Vault tools.''' + """Software Heritage Vault tools.""" pass -@vault.command(name='rpc-serve', help=CFG_HELP) -@click.option('--config-file', '-C', default=None, - metavar='CONFIGFILE', - type=click.Path(exists=True, dir_okay=False,), - help="Configuration file.") -@click.option('--no-stdout', is_flag=True, default=False, - help="Do NOT output logs on the console") -@click.option('--host', default='0.0.0.0', - metavar='IP', show_default=True, - help="Host ip address to bind the server on") -@click.option('--port', default=5005, type=click.INT, - metavar='PORT', - help="Binding port of the server") -@click.option('--debug/--no-debug', default=True, - help="Indicates if the server should run in debug mode") +@vault.command(name="rpc-serve", help=CFG_HELP) +@click.option( + "--config-file", + "-C", + default=None, + metavar="CONFIGFILE", + type=click.Path(exists=True, dir_okay=False,), + help="Configuration file.", +) +@click.option( + "--no-stdout", is_flag=True, default=False, help="Do NOT output logs on the console" +) +@click.option( + "--host", + default="0.0.0.0", + metavar="IP", + show_default=True, + help="Host ip address to bind the server on", +) +@click.option( + "--port", + default=5005, + type=click.INT, + metavar="PORT", + help="Binding port of the server", +) +@click.option( + "--debug/--no-debug", + default=True, + help="Indicates if the server should run in debug mode", +) @click.pass_context def serve(ctx, config_file, no_stdout, host, port, debug): import aiohttp from swh.scheduler.celery_backend.config import setup_log_handler ctx.ensure_object(dict) setup_log_handler( - loglevel=ctx.obj.get('log_level', logging.INFO), colorize=False, - format='[%(levelname)s] %(name)s -- %(message)s', - log_console=not no_stdout) + loglevel=ctx.obj.get("log_level", logging.INFO), + colorize=False, + format="[%(levelname)s] %(name)s -- %(message)s", + log_console=not no_stdout, + ) try: app = make_app_from_configfile(config_file, debug=debug) except EnvironmentError as e: click.echo(e.msg, err=True) ctx.exit(1) aiohttp.web.run_app(app, host=host, port=int(port)) def main(): logging.basicConfig() - return serve(auto_envvar_prefix='SWH_VAULT') + return serve(auto_envvar_prefix="SWH_VAULT") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/swh/vault/cookers/__init__.py b/swh/vault/cookers/__init__.py index f4124da..717478e 100644 --- a/swh/vault/cookers/__init__.py +++ b/swh/vault/cookers/__init__.py @@ -1,53 +1,56 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from swh.core.config import load_named_config, read as read_config from swh.storage import get_storage from swh.vault import get_vault from swh.vault.cookers.base import DEFAULT_CONFIG_PATH, DEFAULT_CONFIG from swh.vault.cookers.directory import DirectoryCooker from swh.vault.cookers.revision_flat import RevisionFlatCooker from swh.vault.cookers.revision_gitfast import RevisionGitfastCooker COOKER_TYPES = { - 'directory': DirectoryCooker, - 'revision_flat': RevisionFlatCooker, - 'revision_gitfast': RevisionGitfastCooker, + "directory": DirectoryCooker, + "revision_flat": RevisionFlatCooker, + "revision_gitfast": RevisionGitfastCooker, } def get_cooker_cls(obj_type): return COOKER_TYPES[obj_type] def get_cooker(obj_type, obj_id): - if 'SWH_CONFIG_FILENAME' in os.environ: - cfg = read_config(os.environ['SWH_CONFIG_FILENAME'], DEFAULT_CONFIG) + if "SWH_CONFIG_FILENAME" in os.environ: + cfg = read_config(os.environ["SWH_CONFIG_FILENAME"], DEFAULT_CONFIG) else: cfg = load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG) cooker_cls = get_cooker_cls(obj_type) - if 'vault' not in cfg: + if "vault" not in cfg: raise ValueError("missing '%vault' configuration") - vcfg = cfg['vault'] - if vcfg['cls'] != 'remote': + vcfg = cfg["vault"] + if vcfg["cls"] != "remote": raise EnvironmentError( - "This vault backend can only be a 'remote' " - "configuration", err=True) - args = vcfg['args'] - if 'storage' not in args: - args['storage'] = cfg.get('storage') + "This vault backend can only be a 'remote' " "configuration", err=True + ) + args = vcfg["args"] + if "storage" not in args: + args["storage"] = cfg.get("storage") - if not args.get('storage'): - raise ValueError( - "invalid configuration; missing 'storage' config entry.") + if not args.get("storage"): + raise ValueError("invalid configuration; missing 'storage' config entry.") - storage = get_storage(**args.pop('storage')) + storage = get_storage(**args.pop("storage")) backend = get_vault(**vcfg) - return cooker_cls(obj_type, obj_id, - backend=backend, storage=storage, - max_bundle_size=cfg['max_bundle_size']) + return cooker_cls( + obj_type, + obj_id, + backend=backend, + storage=storage, + max_bundle_size=cfg["max_bundle_size"], + ) diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index f355586..5357735 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,138 +1,138 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import logging from psycopg2.extensions import QueryCanceledError from typing import Optional from swh.model import hashutil MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB -DEFAULT_CONFIG_PATH = 'vault/cooker' +DEFAULT_CONFIG_PATH = "vault/cooker" DEFAULT_CONFIG = { - 'storage': ('dict', { - 'cls': 'remote', - 'args': { - 'url': 'http://localhost:5002/', - }, - }), - 'vault': ('dict', { - 'cls': 'remote', - 'args': { - 'url': 'http://localhost:5005/', - }, - }), - 'max_bundle_size': ('int', MAX_BUNDLE_SIZE), + "storage": ("dict", {"cls": "remote", "args": {"url": "http://localhost:5002/",},}), + "vault": ("dict", {"cls": "remote", "args": {"url": "http://localhost:5005/",},}), + "max_bundle_size": ("int", MAX_BUNDLE_SIZE), } class PolicyError(Exception): """Raised when the bundle violates the cooking policy.""" + pass class BundleTooLargeError(PolicyError): """Raised when the bundle is too large to be cooked.""" + pass class BytesIOBundleSizeLimit(io.BytesIO): def __init__(self, *args, size_limit=None, **kwargs): super().__init__(*args, **kwargs) self.size_limit = size_limit def write(self, chunk): - if ((self.size_limit is not None - and self.getbuffer().nbytes + len(chunk) > self.size_limit)): + if ( + self.size_limit is not None + and self.getbuffer().nbytes + len(chunk) > self.size_limit + ): raise BundleTooLargeError( "The requested bundle exceeds the maximum allowed " - "size of {} bytes.".format(self.size_limit)) + "size of {} bytes.".format(self.size_limit) + ) return super().write(chunk) class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle """ + CACHE_TYPE_KEY = None # type: Optional[str] - def __init__(self, obj_type, obj_id, backend, storage, - max_bundle_size=MAX_BUNDLE_SIZE): + def __init__( + self, obj_type, obj_id, backend, storage, max_bundle_size=MAX_BUNDLE_SIZE + ): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: obj_type: type of the object to be cooked into a bundle (directory, revision_flat or revision_gitfast; see swh.vault.cooker.COOKER_TYPES). obj_id: id of the object to be cooked into a bundle. backend: the vault backend (swh.vault.backend.VaultBackend). """ self.obj_type = obj_type self.obj_id = hashutil.hash_to_bytes(obj_id) self.backend = backend self.storage = storage self.max_bundle_size = max_bundle_size @abc.abstractmethod def check_exists(self): """Checks that the requested object exists and can be cooked. Override this in the cooker implementation. """ raise NotImplementedError @abc.abstractmethod def prepare_bundle(self): """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplementedError def write(self, chunk): self.fileobj.write(chunk) def cook(self): """Cook the requested object into a bundle """ - self.backend.set_status(self.obj_type, self.obj_id, 'pending') - self.backend.set_progress(self.obj_type, self.obj_id, 'Processing...') + self.backend.set_status(self.obj_type, self.obj_id, "pending") + self.backend.set_progress(self.obj_type, self.obj_id, "Processing...") self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) try: try: self.prepare_bundle() except QueryCanceledError: raise PolicyError( - "Timeout reached while assembling the requested bundle") + "Timeout reached while assembling the requested bundle" + ) bundle = self.fileobj.getvalue() # TODO: use proper content streaming instead of put_bundle() self.backend.put_bundle(self.CACHE_TYPE_KEY, self.obj_id, bundle) except PolicyError as e: - self.backend.set_status(self.obj_type, self.obj_id, 'failed') + self.backend.set_status(self.obj_type, self.obj_id, "failed") self.backend.set_progress(self.obj_type, self.obj_id, str(e)) except Exception: - self.backend.set_status(self.obj_type, self.obj_id, 'failed') + self.backend.set_status(self.obj_type, self.obj_id, "failed") self.backend.set_progress( - self.obj_type, self.obj_id, - "Internal Server Error. This incident will be reported.") + self.obj_type, + self.obj_id, + "Internal Server Error. This incident will be reported.", + ) logging.exception("Bundle cooking failed.") else: - self.backend.set_status(self.obj_type, self.obj_id, 'done') + self.backend.set_status(self.obj_type, self.obj_id, "done") self.backend.set_progress(self.obj_type, self.obj_id, None) finally: self.backend.send_notif(self.obj_type, self.obj_id) diff --git a/swh/vault/cookers/directory.py b/swh/vault/cookers/directory.py index ebcd346..4c66ba9 100644 --- a/swh/vault/cookers/directory.py +++ b/swh/vault/cookers/directory.py @@ -1,27 +1,27 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tarfile import tempfile from swh.model import hashutil from swh.vault.cookers.base import BaseVaultCooker from swh.vault.to_disk import DirectoryBuilder class DirectoryCooker(BaseVaultCooker): """Cooker to create a directory bundle """ - CACHE_TYPE_KEY = 'directory' + + CACHE_TYPE_KEY = "directory" def check_exists(self): return not list(self.storage.directory_missing([self.obj_id])) def prepare_bundle(self): - with tempfile.TemporaryDirectory(prefix='tmp-vault-directory-') as td: - directory_builder = DirectoryBuilder( - self.storage, td.encode(), self.obj_id) + with tempfile.TemporaryDirectory(prefix="tmp-vault-directory-") as td: + directory_builder = DirectoryBuilder(self.storage, td.encode(), self.obj_id) directory_builder.build() - with tarfile.open(fileobj=self.fileobj, mode='w:gz') as tar: + with tarfile.open(fileobj=self.fileobj, mode="w:gz") as tar: tar.add(td, arcname=hashutil.hash_to_hex(self.obj_id)) diff --git a/swh/vault/cookers/revision_flat.py b/swh/vault/cookers/revision_flat.py index 74289fa..f310404 100644 --- a/swh/vault/cookers/revision_flat.py +++ b/swh/vault/cookers/revision_flat.py @@ -1,33 +1,35 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tarfile import tempfile from pathlib import Path from swh.model import hashutil from swh.vault.cookers.base import BaseVaultCooker from swh.vault.cookers.utils import revision_log from swh.vault.to_disk import DirectoryBuilder class RevisionFlatCooker(BaseVaultCooker): """Cooker to create a revision_flat bundle """ - CACHE_TYPE_KEY = 'revision_flat' + + CACHE_TYPE_KEY = "revision_flat" def check_exists(self): return not list(self.storage.revision_missing([self.obj_id])) def prepare_bundle(self): - with tempfile.TemporaryDirectory(prefix='tmp-vault-revision-') as td: + with tempfile.TemporaryDirectory(prefix="tmp-vault-revision-") as td: root = Path(td) for revision in revision_log(self.storage, self.obj_id): - revdir = root / hashutil.hash_to_hex(revision['id']) + revdir = root / hashutil.hash_to_hex(revision["id"]) revdir.mkdir() directory_builder = DirectoryBuilder( - self.storage, str(revdir).encode(), revision['directory']) + self.storage, str(revdir).encode(), revision["directory"] + ) directory_builder.build() - with tarfile.open(fileobj=self.fileobj, mode='w:gz') as tar: + with tarfile.open(fileobj=self.fileobj, mode="w:gz") as tar: tar.add(td, arcname=hashutil.hash_to_hex(self.obj_id)) diff --git a/swh/vault/cookers/revision_gitfast.py b/swh/vault/cookers/revision_gitfast.py index 9bb042c..22e2b92 100644 --- a/swh/vault/cookers/revision_gitfast.py +++ b/swh/vault/cookers/revision_gitfast.py @@ -1,209 +1,219 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import os import time import zlib -from fastimport.commands import (CommitCommand, ResetCommand, BlobCommand, - FileDeleteCommand, FileModifyCommand) +from fastimport.commands import ( + CommitCommand, + ResetCommand, + BlobCommand, + FileDeleteCommand, + FileModifyCommand, +) from swh.model import hashutil from swh.model.toposort import toposort from swh.model.from_disk import mode_to_perms from swh.vault.cookers.base import BaseVaultCooker from swh.vault.cookers.utils import revision_log from swh.vault.to_disk import get_filtered_files_content class RevisionGitfastCooker(BaseVaultCooker): """Cooker to create a git fast-import bundle """ - CACHE_TYPE_KEY = 'revision_gitfast' + + CACHE_TYPE_KEY = "revision_gitfast" def check_exists(self): return not list(self.storage.revision_missing([self.obj_id])) def prepare_bundle(self): self.log = list(toposort(revision_log(self.storage, self.obj_id))) self.gzobj = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) self.fastexport() self.write(self.gzobj.flush()) def write_cmd(self, cmd): - chunk = bytes(cmd) + b'\n' + chunk = bytes(cmd) + b"\n" super().write(self.gzobj.compress(chunk)) def fastexport(self): """Generate all the git fast-import commands from a given log. """ - self.rev_by_id = {r['id']: r for r in self.log} + self.rev_by_id = {r["id"]: r for r in self.log} self.obj_done = set() self.obj_to_mark = {} self.next_available_mark = 1 last_progress_report = None for i, rev in enumerate(self.log, 1): # Update progress if needed ct = time.time() - if (last_progress_report is None - or last_progress_report + 2 <= ct): + if last_progress_report is None or last_progress_report + 2 <= ct: last_progress_report = ct - pg = ('Computing revision {}/{}'.format(i, len(self.log))) + pg = "Computing revision {}/{}".format(i, len(self.log)) self.backend.set_progress(self.obj_type, self.obj_id, pg) # Compute the current commit self._compute_commit_command(rev) def mark(self, obj_id): """Get the mark ID as bytes of a git object. If the object has not yet been marked, assign a new ID and add it to the mark dictionary. """ if obj_id not in self.obj_to_mark: self.obj_to_mark[obj_id] = self.next_available_mark self.next_available_mark += 1 return str(self.obj_to_mark[obj_id]).encode() def _compute_blob_command_content(self, file_data): """Compute the blob command of a file entry if it has not been computed yet. """ - obj_id = file_data['sha1'] + obj_id = file_data["sha1"] if obj_id in self.obj_done: return contents = list(get_filtered_files_content(self.storage, [file_data])) - content = contents[0]['content'] + content = contents[0]["content"] self.write_cmd(BlobCommand(mark=self.mark(obj_id), data=content)) self.obj_done.add(obj_id) def _author_tuple_format(self, author, date): # We never want to have None values here so we replace null entries # by ''. if author is not None: - author_tuple = (author.get('name') or b'', - author.get('email') or b'') + author_tuple = (author.get("name") or b"", author.get("email") or b"") else: - author_tuple = (b'', b'') + author_tuple = (b"", b"") if date is not None: - date_tuple = (date.get('timestamp', {}).get('seconds') or 0, - (date.get('offset') or 0) * 60) + date_tuple = ( + date.get("timestamp", {}).get("seconds") or 0, + (date.get("offset") or 0) * 60, + ) else: date_tuple = (0, 0) return author_tuple + date_tuple def _compute_commit_command(self, rev): """Compute a commit command from a specific revision. """ - if 'parents' in rev and rev['parents']: - from_ = b':' + self.mark(rev['parents'][0]) - merges = [b':' + self.mark(r) for r in rev['parents'][1:]] - parent = self.rev_by_id[rev['parents'][0]] + if "parents" in rev and rev["parents"]: + from_ = b":" + self.mark(rev["parents"][0]) + merges = [b":" + self.mark(r) for r in rev["parents"][1:]] + parent = self.rev_by_id[rev["parents"][0]] else: # We issue a reset command before all the new roots so that they # are not automatically added as children of the current branch. - self.write_cmd(ResetCommand(b'refs/heads/master', None)) + self.write_cmd(ResetCommand(b"refs/heads/master", None)) from_ = None merges = None parent = None # Retrieve the file commands while yielding new blob commands if # needed. files = list(self._compute_file_commands(rev, parent)) # Construct and write the commit command - author = self._author_tuple_format(rev['author'], rev['date']) - committer = self._author_tuple_format(rev['committer'], - rev['committer_date']) - self.write_cmd(CommitCommand( - ref=b'refs/heads/master', - mark=self.mark(rev['id']), - author=author, - committer=committer, - message=rev['message'] or b'', - from_=from_, - merges=merges, - file_iter=files)) + author = self._author_tuple_format(rev["author"], rev["date"]) + committer = self._author_tuple_format(rev["committer"], rev["committer_date"]) + self.write_cmd( + CommitCommand( + ref=b"refs/heads/master", + mark=self.mark(rev["id"]), + author=author, + committer=committer, + message=rev["message"] or b"", + from_=from_, + merges=merges, + file_iter=files, + ) + ) @functools.lru_cache(maxsize=4096) def _get_dir_ents(self, dir_id=None): """Get the entities of a directory as a dictionary (name -> entity). This function has a cache to avoid doing multiple requests to retrieve the same entities, as doing a directory_ls() is expensive. """ - data = (self.storage.directory_ls(dir_id) - if dir_id is not None else []) - return {f['name']: f for f in data} + data = self.storage.directory_ls(dir_id) if dir_id is not None else [] + return {f["name"]: f for f in data} def _compute_file_commands(self, rev, parent=None): """Compute all the file commands of a revision. Generate a diff of the files between the revision and its main parent to find the necessary file commands to apply. """ # Initialize the stack with the root of the tree. - cur_dir = rev['directory'] - parent_dir = parent['directory'] if parent else None - stack = [(b'', cur_dir, parent_dir)] + cur_dir = rev["directory"] + parent_dir = parent["directory"] if parent else None + stack = [(b"", cur_dir, parent_dir)] while stack: # Retrieve the current directory and the directory of the parent # commit in order to compute the diff of the trees. root, cur_dir_id, prev_dir_id = stack.pop() cur_dir = self._get_dir_ents(cur_dir_id) prev_dir = self._get_dir_ents(prev_dir_id) # Find subtrees to delete: # - Subtrees that are not in the new tree (file or directory # deleted). # - Subtrees that do not have the same type in the new tree # (file -> directory or directory -> file) # After this step, every node remaining in the previous directory # has the same type than the one in the current directory. for fname, f in prev_dir.items(): - if ((fname not in cur_dir - or f['type'] != cur_dir[fname]['type'])): + if fname not in cur_dir or f["type"] != cur_dir[fname]["type"]: yield FileDeleteCommand(path=os.path.join(root, fname)) # Find subtrees to modify: # - Leaves (files) will be added or modified using `filemodify` # - Other subtrees (directories) will be added to the stack and # processed in the next iteration. for fname, f in cur_dir.items(): # A file is added or modified if it was not in the tree, if its # permissions changed or if its content changed. - if (f['type'] == 'file' - and (fname not in prev_dir - or f['sha1'] != prev_dir[fname]['sha1'] - or f['perms'] != prev_dir[fname]['perms'])): + if f["type"] == "file" and ( + fname not in prev_dir + or f["sha1"] != prev_dir[fname]["sha1"] + or f["perms"] != prev_dir[fname]["perms"] + ): # Issue a blob command for the new blobs if needed. self._compute_blob_command_content(f) yield FileModifyCommand( path=os.path.join(root, fname), - mode=mode_to_perms(f['perms']).value, - dataref=(b':' + self.mark(f['sha1'])), - data=None) + mode=mode_to_perms(f["perms"]).value, + dataref=(b":" + self.mark(f["sha1"])), + data=None, + ) # A revision is added or modified if it was not in the tree or # if its target changed - elif (f['type'] == 'rev' - and (fname not in prev_dir - or f['target'] != prev_dir[fname]['target'])): + elif f["type"] == "rev" and ( + fname not in prev_dir or f["target"] != prev_dir[fname]["target"] + ): yield FileModifyCommand( path=os.path.join(root, fname), mode=0o160000, - dataref=hashutil.hash_to_hex(f['target']).encode(), - data=None) + dataref=hashutil.hash_to_hex(f["target"]).encode(), + data=None, + ) # A directory is added or modified if it was not in the tree or # if its target changed. - elif f['type'] == 'dir': + elif f["type"] == "dir": f_prev_target = None - if fname in prev_dir and prev_dir[fname]['type'] == 'dir': - f_prev_target = prev_dir[fname]['target'] - if f_prev_target is None or f['target'] != f_prev_target: - stack.append((os.path.join(root, fname), - f['target'], f_prev_target)) + if fname in prev_dir and prev_dir[fname]["type"] == "dir": + f_prev_target = prev_dir[fname]["target"] + if f_prev_target is None or f["target"] != f_prev_target: + stack.append( + (os.path.join(root, fname), f["target"], f_prev_target) + ) diff --git a/swh/vault/cookers/utils.py b/swh/vault/cookers/utils.py index 8a16699..5b940cb 100644 --- a/swh/vault/cookers/utils.py +++ b/swh/vault/cookers/utils.py @@ -1,47 +1,47 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.storage.algos.revisions_walker import get_revisions_walker def revision_log(storage, rev_id, per_page=1000): """Retrieve a revision log in a paginated way in order to avoid storage timeouts when the total number of revisions to fetch is large. Args: storage (swh.storage.storage.Storage): instance of swh storage (either local or remote) rev_id (bytes): a revision identifier per_page (Optional[int]): the maximum number of revisions to return in each page Yields: dict: Revision information as a dictionary """ rw_state = {} nb_revs = 0 max_revs = per_page while True: # Get an iterator returning the commits log from rev_id. # At most max_revs visited revisions from rev_id in the commits graph # will be returned. - revs_walker = get_revisions_walker('bfs', storage, rev_id, - max_revs=max_revs, - state=rw_state) + revs_walker = get_revisions_walker( + "bfs", storage, rev_id, max_revs=max_revs, state=rw_state + ) # Iterate on at most per_page revisions in the commits log. for rev in revs_walker: nb_revs += 1 yield rev # If the total number of iterated revisions is lesser than the # maximum requested one, it means that we hit the initial revision # in the log. if nb_revs < max_revs: break # Backup iterator state to continue the revisions iteration # from where we left it. rw_state = revs_walker.export_state() # Increment the maximum of revisions to iterate from rev_id # to get next revisions in the log. max_revs += per_page diff --git a/swh/vault/cooking_tasks.py b/swh/vault/cooking_tasks.py index 4cf9ced..cb41261 100644 --- a/swh/vault/cooking_tasks.py +++ b/swh/vault/cooking_tasks.py @@ -1,21 +1,21 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import current_app as app from swh.vault.cookers import get_cooker -@app.task(name=__name__ + '.SWHCookingTask') +@app.task(name=__name__ + ".SWHCookingTask") def cook_bundle(obj_type, obj_id): """Main task to cook a bundle.""" get_cooker(obj_type, obj_id).cook() # TODO: remove once the scheduler handles priority tasks -@app.task(name=__name__ + '.SWHBatchCookingTask') +@app.task(name=__name__ + ".SWHBatchCookingTask") def batch_cook_bundle(obj_type, obj_id): """Temporary task for the batch queue.""" get_cooker(obj_type, obj_id).cook() diff --git a/swh/vault/tests/__init__.py b/swh/vault/tests/__init__.py index d3d2d8a..cd10fe1 100644 --- a/swh/vault/tests/__init__.py +++ b/swh/vault/tests/__init__.py @@ -1,5 +1,5 @@ from os import path import swh.vault -SQL_DIR = path.join(path.dirname(swh.vault.__file__), 'sql') +SQL_DIR = path.join(path.dirname(swh.vault.__file__), "sql") diff --git a/swh/vault/tests/conftest.py b/swh/vault/tests/conftest.py index f7c4d55..f68956a 100644 --- a/swh/vault/tests/conftest.py +++ b/swh/vault/tests/conftest.py @@ -1,85 +1,80 @@ import pytest import glob import os import pkg_resources.extern.packaging.version from swh.core.utils import numfile_sortkey as sortkey from swh.vault import get_vault from swh.vault.tests import SQL_DIR from swh.storage.tests import SQL_DIR as STORAGE_SQL_DIR from pytest_postgresql import factories -os.environ['LC_ALL'] = 'C.UTF-8' +os.environ["LC_ALL"] = "C.UTF-8" pytest_v = pkg_resources.get_distribution("pytest").parsed_version -if pytest_v < pkg_resources.extern.packaging.version.parse('3.9'): +if pytest_v < pkg_resources.extern.packaging.version.parse("3.9"): + @pytest.fixture def tmp_path(request): import tempfile import pathlib + with tempfile.TemporaryDirectory() as tmpdir: yield pathlib.Path(tmpdir) def db_url(name, postgresql_proc): - return 'postgresql://{user}@{host}:{port}/{dbname}'.format( + return "postgresql://{user}@{host}:{port}/{dbname}".format( host=postgresql_proc.host, port=postgresql_proc.port, - user='postgres', - dbname=name) + user="postgres", + dbname=name, + ) -postgresql2 = factories.postgresql('postgresql_proc', 'tests2') +postgresql2 = factories.postgresql("postgresql_proc", "tests2") @pytest.fixture def swh_vault(request, postgresql_proc, postgresql, postgresql2, tmp_path): for sql_dir, pg in ((SQL_DIR, postgresql), (STORAGE_SQL_DIR, postgresql2)): - dump_files = os.path.join(sql_dir, '*.sql') + dump_files = os.path.join(sql_dir, "*.sql") all_dump_files = sorted(glob.glob(dump_files), key=sortkey) cursor = pg.cursor() for fname in all_dump_files: with open(fname) as fobj: # disable concurrent index creation since we run in a # transaction - cursor.execute(fobj.read().replace('concurrently', '')) + cursor.execute(fobj.read().replace("concurrently", "")) pg.commit() vault_config = { - 'db': db_url('tests', postgresql_proc), - 'storage': { - 'cls': 'local', - 'db': db_url('tests2', postgresql_proc), - 'objstorage': { - 'cls': 'pathslicing', - 'args': { - 'root': str(tmp_path), - 'slicing': '0:1/1:5', - }, + "db": db_url("tests", postgresql_proc), + "storage": { + "cls": "local", + "db": db_url("tests2", postgresql_proc), + "objstorage": { + "cls": "pathslicing", + "args": {"root": str(tmp_path), "slicing": "0:1/1:5",}, }, }, - 'cache': { - 'cls': 'pathslicing', - 'args': { - 'root': str(tmp_path), - 'slicing': '0:1/1:5', - 'allow_delete': True, - } - }, - 'scheduler': { - 'cls': 'remote', - 'args': { - 'url': 'http://swh-scheduler:5008', + "cache": { + "cls": "pathslicing", + "args": { + "root": str(tmp_path), + "slicing": "0:1/1:5", + "allow_delete": True, }, }, + "scheduler": {"cls": "remote", "args": {"url": "http://swh-scheduler:5008",},}, } - return get_vault('local', vault_config) + return get_vault("local", vault_config) @pytest.fixture def swh_storage(swh_vault): return swh_vault.storage diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py index fed6097..05d0499 100644 --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -1,337 +1,336 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime import psycopg2 from unittest.mock import patch, MagicMock import pytest from swh.model import hashutil from swh.vault.tests.vault_testing import hash_content @contextlib.contextmanager def mock_cooking(vault_backend): - with patch.object(vault_backend, '_send_task') as mt: + with patch.object(vault_backend, "_send_task") as mt: mt.return_value = 42 - with patch('swh.vault.backend.get_cooker_cls') as mg: + with patch("swh.vault.backend.get_cooker_cls") as mg: mcc = MagicMock() mc = MagicMock() mg.return_value = mcc mcc.return_value = mc mc.check_exists.return_value = True - yield {'_send_task': mt, - 'get_cooker_cls': mg, - 'cooker_cls': mcc, - 'cooker': mc} + yield { + "_send_task": mt, + "get_cooker_cls": mg, + "cooker_cls": mcc, + "cooker": mc, + } + def assertTimestampAlmostNow(ts, tolerance_secs=1.0): # noqa now = datetime.datetime.now(datetime.timezone.utc) creation_delta_secs = (ts - now).total_seconds() assert creation_delta_secs < tolerance_secs def fake_cook(backend, obj_type, result_content, sticky=False): content, obj_id = hash_content(result_content) with mock_cooking(backend): backend.create_task(obj_type, obj_id, sticky) - backend.cache.add(obj_type, obj_id, b'content') - backend.set_status(obj_type, obj_id, 'done') + backend.cache.add(obj_type, obj_id, b"content") + backend.set_status(obj_type, obj_id, "done") return obj_id, content def fail_cook(backend, obj_type, obj_id, failure_reason): with mock_cooking(backend): backend.create_task(obj_type, obj_id) - backend.set_status(obj_type, obj_id, 'failed') + backend.set_status(obj_type, obj_id, "failed") backend.set_progress(obj_type, obj_id, failure_reason) -TEST_TYPE = 'revision_gitfast' -TEST_HEX_ID = '4a4b9771542143cf070386f86b4b92d42966bdbc' +TEST_TYPE = "revision_gitfast" +TEST_HEX_ID = "4a4b9771542143cf070386f86b4b92d42966bdbc" TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) -TEST_PROGRESS = ("Mr. White, You're telling me you're cooking again?" - " \N{ASTONISHED FACE} ") -TEST_EMAIL = 'ouiche@lorraine.fr' +TEST_PROGRESS = ( + "Mr. White, You're telling me you're cooking again?" " \N{ASTONISHED FACE} " +) +TEST_EMAIL = "ouiche@lorraine.fr" def test_create_task_simple(swh_vault): with mock_cooking(swh_vault) as m: swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) - m['get_cooker_cls'].assert_called_once_with(TEST_TYPE) + m["get_cooker_cls"].assert_called_once_with(TEST_TYPE) - args = m['cooker_cls'].call_args[0] + args = m["cooker_cls"].call_args[0] assert args[0] == TEST_TYPE assert args[1] == TEST_HEX_ID - assert m['cooker'].check_exists.call_count == 1 - assert m['_send_task'].call_count == 1 + assert m["cooker"].check_exists.call_count == 1 + assert m["_send_task"].call_count == 1 - args = m['_send_task'].call_args[0] + args = m["_send_task"].call_args[0] assert args[0] == TEST_TYPE assert args[1] == TEST_HEX_ID info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) - assert info['object_id'] == TEST_OBJ_ID - assert info['type'] == TEST_TYPE - assert info['task_status'] == 'new' - assert info['task_id'] == 42 + assert info["object_id"] == TEST_OBJ_ID + assert info["type"] == TEST_TYPE + assert info["task_status"] == "new" + assert info["task_id"] == 42 - assertTimestampAlmostNow(info['ts_created']) + assertTimestampAlmostNow(info["ts_created"]) - assert info['ts_done'] is None - assert info['progress_msg'] is None + assert info["ts_done"] is None + assert info["progress_msg"] is None def test_create_fail_duplicate_task(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) with pytest.raises(psycopg2.IntegrityError): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) def test_create_fail_nonexisting_object(swh_vault): with mock_cooking(swh_vault) as m: - m['cooker'].check_exists.side_effect = ValueError('Nothing here.') + m["cooker"].check_exists.side_effect = ValueError("Nothing here.") with pytest.raises(ValueError): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) def test_create_set_progress(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) - assert info['progress_msg'] is None - swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, - TEST_PROGRESS) + assert info["progress_msg"] is None + swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, TEST_PROGRESS) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) - assert info['progress_msg'] == TEST_PROGRESS + assert info["progress_msg"] == TEST_PROGRESS def test_create_set_status(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) - assert info['task_status'] == 'new' - assert info['ts_done'] is None + assert info["task_status"] == "new" + assert info["ts_done"] is None - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'pending') + swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "pending") info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) - assert info['task_status'] == 'pending' - assert info['ts_done'] is None + assert info["task_status"] == "pending" + assert info["ts_done"] is None - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') + swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "done") info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) - assert info['task_status'] == 'done' - assertTimestampAlmostNow(info['ts_done']) + assert info["task_status"] == "done" + assertTimestampAlmostNow(info["ts_done"]) def test_create_update_access_ts(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) - access_ts_1 = info['ts_last_access'] + access_ts_1 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_1) swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) - access_ts_2 = info['ts_last_access'] + access_ts_2 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_2) swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) - access_ts_3 = info['ts_last_access'] + access_ts_3 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_3) assert access_ts_1 < access_ts_2 assert access_ts_2 < access_ts_3 def test_cook_request_idempotent(swh_vault): with mock_cooking(swh_vault): info1 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) info2 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) info3 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) assert info1 == info2 assert info1 == info3 def test_cook_email_pending_done(swh_vault): - with mock_cooking(swh_vault), \ - patch.object(swh_vault, 'add_notif_email') as madd, \ - patch.object(swh_vault, 'send_notification') as msend: + with mock_cooking(swh_vault), patch.object( + swh_vault, "add_notif_email" + ) as madd, patch.object(swh_vault, "send_notification") as msend: swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) madd.assert_not_called() msend.assert_not_called() madd.reset_mock() msend.reset_mock() - swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, - email=TEST_EMAIL) + swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) msend.assert_not_called() madd.reset_mock() msend.reset_mock() - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') - swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, - email=TEST_EMAIL) - msend.assert_called_once_with(None, TEST_EMAIL, - TEST_TYPE, TEST_OBJ_ID, 'done') + swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "done") + swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) + msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_OBJ_ID, "done") madd.assert_not_called() def test_send_all_emails(swh_vault): with mock_cooking(swh_vault): - emails = ('a@example.com', - 'billg@example.com', - 'test+42@example.org') + emails = ("a@example.com", "billg@example.com", "test+42@example.org") for email in emails: - swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, - email=email) + swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email=email) - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') + swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "done") - with patch.object(swh_vault, 'smtp_server') as m: + with patch.object(swh_vault, "smtp_server") as m: swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) sent_emails = {k[0][0] for k in m.send_message.call_args_list} - assert {k['To'] for k in sent_emails} == set(emails) + assert {k["To"] for k in sent_emails} == set(emails) for e in sent_emails: - assert 'bot@softwareheritage.org' in e['From'] - assert TEST_TYPE in e['Subject'] - assert TEST_HEX_ID[:5] in e['Subject'] + assert "bot@softwareheritage.org" in e["From"] + assert TEST_TYPE in e["Subject"] + assert TEST_HEX_ID[:5] in e["Subject"] assert TEST_TYPE in str(e) - assert 'https://archive.softwareheritage.org/' in str(e) + assert "https://archive.softwareheritage.org/" in str(e) assert TEST_HEX_ID[:5] in str(e) - assert '--\x20\n' in str(e) # Well-formated signature!!! + assert "--\x20\n" in str(e) # Well-formated signature!!! # Check that the entries have been deleted and recalling the # function does not re-send the e-mails m.reset_mock() swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) m.assert_not_called() def test_available(swh_vault): assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) - swh_vault.cache.add(TEST_TYPE, TEST_OBJ_ID, b'content') + swh_vault.cache.add(TEST_TYPE, TEST_OBJ_ID, b"content") assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') + swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "done") assert swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) def test_fetch(swh_vault): assert swh_vault.fetch(TEST_TYPE, TEST_OBJ_ID) is None - obj_id, content = fake_cook(swh_vault, TEST_TYPE, b'content') + obj_id, content = fake_cook(swh_vault, TEST_TYPE, b"content") info = swh_vault.task_info(TEST_TYPE, obj_id) - access_ts_before = info['ts_last_access'] + access_ts_before = info["ts_last_access"] - assert swh_vault.fetch(TEST_TYPE, obj_id) == b'content' + assert swh_vault.fetch(TEST_TYPE, obj_id) == b"content" info = swh_vault.task_info(TEST_TYPE, obj_id) - access_ts_after = info['ts_last_access'] + access_ts_after = info["ts_last_access"] assertTimestampAlmostNow(access_ts_after) assert access_ts_before < access_ts_after def test_cache_expire_oldest(swh_vault): r = range(1, 10) inserted = {} for i in r: - sticky = (i == 5) - content = b'content%s' % str(i).encode() + sticky = i == 5 + content = b"content%s" % str(i).encode() obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) swh_vault.cache_expire_oldest(n=4) should_be_still_here = {2, 3, 5, 8, 9} for i in r: - assert swh_vault.is_available( - TEST_TYPE, inserted[i][0]) == (i in should_be_still_here) + assert swh_vault.is_available(TEST_TYPE, inserted[i][0]) == ( + i in should_be_still_here + ) def test_cache_expire_until(swh_vault): r = range(1, 10) inserted = {} for i in r: - sticky = (i == 5) - content = b'content%s' % str(i).encode() + sticky = i == 5 + content = b"content%s" % str(i).encode() obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) if i == 7: cutoff_date = datetime.datetime.now() swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) swh_vault.cache_expire_until(date=cutoff_date) should_be_still_here = {2, 3, 5, 8, 9} for i in r: - assert swh_vault.is_available( - TEST_TYPE, inserted[i][0]) == (i in should_be_still_here) + assert swh_vault.is_available(TEST_TYPE, inserted[i][0]) == ( + i in should_be_still_here + ) def test_fail_cook_simple(swh_vault): - fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, 'error42') + fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, "error42") assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) - assert info['progress_msg'] == 'error42' + assert info["progress_msg"] == "error42" def test_send_failure_email(swh_vault): with mock_cooking(swh_vault): - swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email='a@example.com') + swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email="a@example.com") - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, 'failed') - swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, 'test error') + swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "failed") + swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, "test error") - with patch.object(swh_vault, 'smtp_server') as m: + with patch.object(swh_vault, "smtp_server") as m: swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) e = [k[0][0] for k in m.send_message.call_args_list][0] - assert e['To'] == 'a@example.com' + assert e["To"] == "a@example.com" - assert 'bot@softwareheritage.org' in e['From'] - assert TEST_TYPE in e['Subject'] - assert TEST_HEX_ID[:5] in e['Subject'] - assert 'fail' in e['Subject'] + assert "bot@softwareheritage.org" in e["From"] + assert TEST_TYPE in e["Subject"] + assert TEST_HEX_ID[:5] in e["Subject"] + assert "fail" in e["Subject"] assert TEST_TYPE in str(e) assert TEST_HEX_ID[:5] in str(e) - assert 'test error' in str(e) - assert '--\x20\n' in str(e) # Well-formated signature + assert "test error" in str(e) + assert "--\x20\n" in str(e) # Well-formated signature def test_retry_failed_bundle(swh_vault): - fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, 'error42') + fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, "error42") info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) - assert info['task_status'] == 'failed' + assert info["task_status"] == "failed" with mock_cooking(swh_vault): swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) - assert info['task_status'] == 'new' + assert info["task_status"] == "new" diff --git a/swh/vault/tests/test_cache.py b/swh/vault/tests/test_cache.py index 0084b52..0e73c9e 100644 --- a/swh/vault/tests/test_cache.py +++ b/swh/vault/tests/test_cache.py @@ -1,69 +1,62 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.model import hashutil -TEST_TYPE_1 = 'revision_gitfast' -TEST_TYPE_2 = 'directory' +TEST_TYPE_1 = "revision_gitfast" +TEST_TYPE_2 = "directory" -TEST_HEX_ID_1 = '4a4b9771542143cf070386f86b4b92d42966bdbc' -TEST_HEX_ID_2 = '17a3e48bce37be5226490e750202ad3a9a1a3fe9' +TEST_HEX_ID_1 = "4a4b9771542143cf070386f86b4b92d42966bdbc" +TEST_HEX_ID_2 = "17a3e48bce37be5226490e750202ad3a9a1a3fe9" TEST_OBJ_ID_1 = hashutil.hash_to_bytes(TEST_HEX_ID_1) TEST_OBJ_ID_2 = hashutil.hash_to_bytes(TEST_HEX_ID_2) -TEST_CONTENT_1 = b'test content 1' -TEST_CONTENT_2 = b'test content 2' +TEST_CONTENT_1 = b"test content 1" +TEST_CONTENT_2 = b"test content 2" # Let's try to avoid replicating edge-cases already tested in # swh-objstorage, and instead focus on testing behaviors specific to the # Vault cache here. + def test_internal_id(swh_vault): sid = swh_vault.cache._get_internal_id(TEST_TYPE_1, TEST_OBJ_ID_1) - assert hashutil.hash_to_hex(sid) == \ - '6829cda55b54c295aa043a611a4e0320239988d9' + assert hashutil.hash_to_hex(sid) == "6829cda55b54c295aa043a611a4e0320239988d9" def test_simple_add_get(swh_vault): swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1) - assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == \ - TEST_CONTENT_1 + assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == TEST_CONTENT_1 assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1) def test_different_type_same_id(swh_vault): swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1) swh_vault.cache.add(TEST_TYPE_2, TEST_OBJ_ID_1, TEST_CONTENT_2) - assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == \ - TEST_CONTENT_1 - assert swh_vault.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1) == \ - TEST_CONTENT_2 + assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == TEST_CONTENT_1 + assert swh_vault.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1) == TEST_CONTENT_2 assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1) assert swh_vault.cache.is_cached(TEST_TYPE_2, TEST_OBJ_ID_1) def test_different_type_same_content(swh_vault): swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1) swh_vault.cache.add(TEST_TYPE_2, TEST_OBJ_ID_1, TEST_CONTENT_1) - assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == \ - TEST_CONTENT_1 - assert swh_vault.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1) == \ - TEST_CONTENT_1 + assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == TEST_CONTENT_1 + assert swh_vault.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1) == TEST_CONTENT_1 assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1) assert swh_vault.cache.is_cached(TEST_TYPE_2, TEST_OBJ_ID_1) def test_different_id_same_type(swh_vault): swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1) swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_2, TEST_CONTENT_2) - assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == \ - TEST_CONTENT_1 - assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_2) == \ - TEST_CONTENT_2 + assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == TEST_CONTENT_1 + assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_2) == TEST_CONTENT_2 assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1) assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_2) diff --git a/swh/vault/tests/test_cookers.py b/swh/vault/tests/test_cookers.py index 17088a6..c3b60e6 100644 --- a/swh/vault/tests/test_cookers.py +++ b/swh/vault/tests/test_cookers.py @@ -1,531 +1,558 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime import gzip import io import os import pathlib import subprocess import tarfile import tempfile import unittest import unittest.mock import dulwich.fastexport import dulwich.index import dulwich.objects import dulwich.porcelain import dulwich.repo from swh.loader.git.from_disk import GitLoaderFromDisk from swh.model import hashutil from swh.model import from_disk -from swh.model.model import ( - Directory, DirectoryEntry, Person, Revision, RevisionType -) +from swh.model.model import Directory, DirectoryEntry, Person, Revision, RevisionType from swh.vault.cookers import DirectoryCooker, RevisionGitfastCooker from swh.vault.tests.vault_testing import hash_content from swh.vault.to_disk import SKIPPED_MESSAGE, HIDDEN_MESSAGE class TestRepo: """A tiny context manager for a test git repository, with some utility functions to perform basic git stuff. """ + def __enter__(self): - self.tmp_dir = tempfile.TemporaryDirectory(prefix='tmp-vault-repo-') + self.tmp_dir = tempfile.TemporaryDirectory(prefix="tmp-vault-repo-") self.repo_dir = self.tmp_dir.__enter__() self.repo = dulwich.repo.Repo.init(self.repo_dir) - self.author_name = b'Test Author' - self.author_email = b'test@softwareheritage.org' - self.author = b'%s <%s>' % (self.author_name, self.author_email) + self.author_name = b"Test Author" + self.author_email = b"test@softwareheritage.org" + self.author = b"%s <%s>" % (self.author_name, self.author_email) self.base_date = 258244200 self.counter = 0 return pathlib.Path(self.repo_dir) def __exit__(self, exc, value, tb): self.tmp_dir.__exit__(exc, value, tb) def checkout(self, rev_sha): rev = self.repo[rev_sha] - dulwich.index.build_index_from_tree(self.repo_dir, - self.repo.index_path(), - self.repo.object_store, - rev.tree) + dulwich.index.build_index_from_tree( + self.repo_dir, self.repo.index_path(), self.repo.object_store, rev.tree + ) def git_shell(self, *cmd, stdout=subprocess.DEVNULL, **kwargs): name = self.author_name email = self.author_email - date = '%d +0000' % (self.base_date + self.counter) + date = "%d +0000" % (self.base_date + self.counter) env = { # Set git commit format - 'GIT_AUTHOR_NAME': name, - 'GIT_AUTHOR_EMAIL': email, - 'GIT_AUTHOR_DATE': date, - 'GIT_COMMITTER_NAME': name, - 'GIT_COMMITTER_EMAIL': email, - 'GIT_COMMITTER_DATE': date, + "GIT_AUTHOR_NAME": name, + "GIT_AUTHOR_EMAIL": email, + "GIT_AUTHOR_DATE": date, + "GIT_COMMITTER_NAME": name, + "GIT_COMMITTER_EMAIL": email, + "GIT_COMMITTER_DATE": date, # Ignore all the system-wide and user configurations - 'GIT_CONFIG_NOSYSTEM': '1', - 'HOME': str(self.tmp_dir), - 'XDG_CONFIG_HOME': str(self.tmp_dir), + "GIT_CONFIG_NOSYSTEM": "1", + "HOME": str(self.tmp_dir), + "XDG_CONFIG_HOME": str(self.tmp_dir), } - kwargs.setdefault('env', {}).update(env) + kwargs.setdefault("env", {}).update(env) - subprocess.check_call(('git', '-C', self.repo_dir) + cmd, - stdout=stdout, **kwargs) + subprocess.check_call( + ("git", "-C", self.repo_dir) + cmd, stdout=stdout, **kwargs + ) - def commit(self, message='Commit test\n', ref=b'HEAD'): + def commit(self, message="Commit test\n", ref=b"HEAD"): """Commit the current working tree in a new commit with message on the branch 'ref'. At the end of the commit, the reference should stay the same and the index should be clean. """ - self.git_shell('add', '.') - message = message.encode() + b'\n' + self.git_shell("add", ".") + message = message.encode() + b"\n" ret = self.repo.do_commit( - message=message, committer=self.author, + message=message, + committer=self.author, commit_timestamp=self.base_date + self.counter, commit_timezone=0, - ref=ref) + ref=ref, + ) self.counter += 1 # committing on another branch leaves # dangling files in index - if ref != b'HEAD': + if ref != b"HEAD": # XXX this should work (but does not) # dulwich.porcelain.reset(self.repo, 'hard') - self.git_shell('reset', '--hard', 'HEAD') + self.git_shell("reset", "--hard", "HEAD") return ret - def merge(self, parent_sha_list, message='Merge branches.'): - self.git_shell('merge', '--allow-unrelated-histories', - '-m', message, *[p.decode() for p in parent_sha_list]) + def merge(self, parent_sha_list, message="Merge branches."): + self.git_shell( + "merge", + "--allow-unrelated-histories", + "-m", + message, + *[p.decode() for p in parent_sha_list] + ) self.counter += 1 - return self.repo.refs[b'HEAD'] + return self.repo.refs[b"HEAD"] def print_debug_graph(self, reflog=False): - args = ['log', '--all', '--graph', '--decorate'] + args = ["log", "--all", "--graph", "--decorate"] if reflog: - args.append('--reflog') + args.append("--reflog") self.git_shell(*args, stdout=None) def git_loader(storage, repo_path, visit_date=datetime.datetime.now()): """Instantiate a Git Loader using the storage instance as storage. """ loader = GitLoaderFromDisk( - 'fake_origin', directory=repo_path, visit_date=visit_date) + "fake_origin", directory=repo_path, visit_date=visit_date + ) loader.storage = storage return loader @contextlib.contextmanager def cook_extract_directory(storage, obj_id): """Context manager that cooks a directory and extract it.""" backend = unittest.mock.MagicMock() backend.storage = storage - cooker = DirectoryCooker( - 'directory', obj_id, backend=backend, storage=storage) + cooker = DirectoryCooker("directory", obj_id, backend=backend, storage=storage) cooker.fileobj = io.BytesIO() assert cooker.check_exists() cooker.prepare_bundle() cooker.fileobj.seek(0) - with tempfile.TemporaryDirectory(prefix='tmp-vault-extract-') as td: - with tarfile.open(fileobj=cooker.fileobj, mode='r') as tar: + with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: + with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: tar.extractall(td) yield pathlib.Path(td) / hashutil.hash_to_hex(obj_id) cooker.storage = None @contextlib.contextmanager def cook_stream_revision_gitfast(storage, obj_id): """Context manager that cooks a revision and stream its fastexport.""" backend = unittest.mock.MagicMock() backend.storage = storage cooker = RevisionGitfastCooker( - 'revision_gitfast', obj_id, backend=backend, storage=storage) + "revision_gitfast", obj_id, backend=backend, storage=storage + ) cooker.fileobj = io.BytesIO() assert cooker.check_exists() cooker.prepare_bundle() cooker.fileobj.seek(0) fastexport_stream = gzip.GzipFile(fileobj=cooker.fileobj) yield fastexport_stream cooker.storage = None @contextlib.contextmanager def cook_extract_revision_gitfast(storage, obj_id): """Context manager that cooks a revision and extract it.""" test_repo = TestRepo() - with cook_stream_revision_gitfast(storage, obj_id) as stream, \ - test_repo as p: + with cook_stream_revision_gitfast(storage, obj_id) as stream, test_repo as p: processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) processor.import_stream(stream) yield test_repo, p -TEST_CONTENT = (" test content\n" - "and unicode \N{BLACK HEART SUIT}\n" - " and trailing spaces ") -TEST_EXECUTABLE = b'\x42\x40\x00\x00\x05' +TEST_CONTENT = ( + " test content\n" "and unicode \N{BLACK HEART SUIT}\n" " and trailing spaces " +) +TEST_EXECUTABLE = b"\x42\x40\x00\x00\x05" class TestDirectoryCooker: def test_directory_simple(self, swh_storage): repo = TestRepo() with repo as rp: - (rp / 'file').write_text(TEST_CONTENT) - (rp / 'executable').write_bytes(TEST_EXECUTABLE) - (rp / 'executable').chmod(0o755) - (rp / 'link').symlink_to('file') - (rp / 'dir1/dir2').mkdir(parents=True) - (rp / 'dir1/dir2/file').write_text(TEST_CONTENT) + (rp / "file").write_text(TEST_CONTENT) + (rp / "executable").write_bytes(TEST_EXECUTABLE) + (rp / "executable").chmod(0o755) + (rp / "link").symlink_to("file") + (rp / "dir1/dir2").mkdir(parents=True) + (rp / "dir1/dir2/file").write_text(TEST_CONTENT) c = repo.commit() loader = git_loader(swh_storage, str(rp)) loader.load() obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with cook_extract_directory(swh_storage, obj_id) as p: - assert (p / 'file').stat().st_mode == 0o100644 - assert (p / 'file').read_text() == TEST_CONTENT - assert (p / 'executable').stat().st_mode == 0o100755 - assert (p / 'executable').read_bytes() == TEST_EXECUTABLE - assert (p / 'link').is_symlink - assert os.readlink(str(p / 'link')) == 'file' - assert (p / 'dir1/dir2/file').stat().st_mode == 0o100644 - assert (p / 'dir1/dir2/file').read_text() == TEST_CONTENT + assert (p / "file").stat().st_mode == 0o100644 + assert (p / "file").read_text() == TEST_CONTENT + assert (p / "executable").stat().st_mode == 0o100755 + assert (p / "executable").read_bytes() == TEST_EXECUTABLE + assert (p / "link").is_symlink + assert os.readlink(str(p / "link")) == "file" + assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 + assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT directory = from_disk.Directory.from_disk(path=bytes(p)) assert obj_id_hex == hashutil.hash_to_hex(directory.hash) def test_directory_filtered_objects(self, swh_storage): repo = TestRepo() with repo as rp: - file_1, id_1 = hash_content(b'test1') - file_2, id_2 = hash_content(b'test2') - file_3, id_3 = hash_content(b'test3') + file_1, id_1 = hash_content(b"test1") + file_2, id_2 = hash_content(b"test2") + file_3, id_3 = hash_content(b"test3") - (rp / 'file').write_bytes(file_1) - (rp / 'hidden_file').write_bytes(file_2) - (rp / 'absent_file').write_bytes(file_3) + (rp / "file").write_bytes(file_1) + (rp / "hidden_file").write_bytes(file_2) + (rp / "absent_file").write_bytes(file_3) c = repo.commit() loader = git_loader(swh_storage, str(rp)) loader.load() obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) # FIXME: storage.content_update() should be changed to allow things # like that with swh_storage.get_db().transaction() as cur: - cur.execute("""update content set status = 'visible' - where sha1 = %s""", (id_1,)) - cur.execute("""update content set status = 'hidden' - where sha1 = %s""", (id_2,)) - cur.execute("""update content set status = 'absent' - where sha1 = %s""", (id_3,)) + cur.execute( + """update content set status = 'visible' + where sha1 = %s""", + (id_1,), + ) + cur.execute( + """update content set status = 'hidden' + where sha1 = %s""", + (id_2,), + ) + cur.execute( + """update content set status = 'absent' + where sha1 = %s""", + (id_3,), + ) with cook_extract_directory(swh_storage, obj_id) as p: - assert (p / 'file').read_bytes() == b'test1' - assert (p / 'hidden_file').read_bytes() == HIDDEN_MESSAGE - assert (p / 'absent_file').read_bytes() == SKIPPED_MESSAGE + assert (p / "file").read_bytes() == b"test1" + assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE + assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE def test_directory_bogus_perms(self, swh_storage): # Some early git repositories have 664/775 permissions... let's check # if all the weird modes are properly normalized in the directory # cooker. repo = TestRepo() with repo as rp: - (rp / 'file').write_text(TEST_CONTENT) - (rp / 'file').chmod(0o664) - (rp / 'executable').write_bytes(TEST_EXECUTABLE) - (rp / 'executable').chmod(0o775) - (rp / 'wat').write_text(TEST_CONTENT) - (rp / 'wat').chmod(0o604) + (rp / "file").write_text(TEST_CONTENT) + (rp / "file").chmod(0o664) + (rp / "executable").write_bytes(TEST_EXECUTABLE) + (rp / "executable").chmod(0o775) + (rp / "wat").write_text(TEST_CONTENT) + (rp / "wat").chmod(0o604) c = repo.commit() loader = git_loader(swh_storage, str(rp)) loader.load() obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with cook_extract_directory(swh_storage, obj_id) as p: - assert (p / 'file').stat().st_mode == 0o100644 - assert (p / 'executable').stat().st_mode == 0o100755 - assert (p / 'wat').stat().st_mode == 0o100644 + assert (p / "file").stat().st_mode == 0o100644 + assert (p / "executable").stat().st_mode == 0o100755 + assert (p / "wat").stat().st_mode == 0o100644 def test_directory_revision_data(self, swh_storage): - target_rev = '0e8a3ad980ec179856012b7eecf4327e99cd44cd' + target_rev = "0e8a3ad980ec179856012b7eecf4327e99cd44cd" dir = Directory( entries=[ - DirectoryEntry(name=b'submodule', - type='rev', - target=hashutil.hash_to_bytes(target_rev), - perms=0o100644) + DirectoryEntry( + name=b"submodule", + type="rev", + target=hashutil.hash_to_bytes(target_rev), + perms=0o100644, + ) ], ) swh_storage.directory_add([dir]) with cook_extract_directory(swh_storage, dir.id) as p: - assert (p / 'submodule').is_symlink() - assert os.readlink(str(p / 'submodule')) == target_rev + assert (p / "submodule").is_symlink() + assert os.readlink(str(p / "submodule")) == target_rev class TestRevisionGitfastCooker: def test_revision_simple(self, swh_storage): # # 1--2--3--4--5--6--7 # repo = TestRepo() with repo as rp: - (rp / 'file1').write_text(TEST_CONTENT) - repo.commit('add file1') - (rp / 'file2').write_text(TEST_CONTENT) - repo.commit('add file2') - (rp / 'dir1/dir2').mkdir(parents=True) - (rp / 'dir1/dir2/file').write_text(TEST_CONTENT) - repo.commit('add dir1/dir2/file') - (rp / 'bin1').write_bytes(TEST_EXECUTABLE) - (rp / 'bin1').chmod(0o755) - repo.commit('add bin1') - (rp / 'link1').symlink_to('file1') - repo.commit('link link1 to file1') - (rp / 'file2').unlink() - repo.commit('remove file2') - (rp / 'bin1').rename(rp / 'bin') - repo.commit('rename bin1 to bin') + (rp / "file1").write_text(TEST_CONTENT) + repo.commit("add file1") + (rp / "file2").write_text(TEST_CONTENT) + repo.commit("add file2") + (rp / "dir1/dir2").mkdir(parents=True) + (rp / "dir1/dir2/file").write_text(TEST_CONTENT) + repo.commit("add dir1/dir2/file") + (rp / "bin1").write_bytes(TEST_EXECUTABLE) + (rp / "bin1").chmod(0o755) + repo.commit("add bin1") + (rp / "link1").symlink_to("file1") + repo.commit("link link1 to file1") + (rp / "file2").unlink() + repo.commit("remove file2") + (rp / "bin1").rename(rp / "bin") + repo.commit("rename bin1 to bin") loader = git_loader(swh_storage, str(rp)) loader.load() - obj_id_hex = repo.repo.refs[b'HEAD'].decode() + obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with cook_extract_revision_gitfast(swh_storage, obj_id) as (ert, p): - ert.checkout(b'HEAD') - assert (p / 'file1').stat().st_mode == 0o100644 - assert (p / 'file1').read_text() == TEST_CONTENT - assert (p / 'link1').is_symlink - assert os.readlink(str(p / 'link1')) == 'file1' - assert (p / 'bin').stat().st_mode == 0o100755 - assert (p / 'bin').read_bytes() == TEST_EXECUTABLE - assert (p / 'dir1/dir2/file').read_text() == TEST_CONTENT - assert (p / 'dir1/dir2/file').stat().st_mode == 0o100644 - assert ert.repo.refs[b'HEAD'].decode() == obj_id_hex + ert.checkout(b"HEAD") + assert (p / "file1").stat().st_mode == 0o100644 + assert (p / "file1").read_text() == TEST_CONTENT + assert (p / "link1").is_symlink + assert os.readlink(str(p / "link1")) == "file1" + assert (p / "bin").stat().st_mode == 0o100755 + assert (p / "bin").read_bytes() == TEST_EXECUTABLE + assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT + assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 + assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex def test_revision_two_roots(self, swh_storage): # # 1----3---4 # / # 2---- # repo = TestRepo() with repo as rp: - (rp / 'file1').write_text(TEST_CONTENT) - c1 = repo.commit('Add file1') - del repo.repo.refs[b'refs/heads/master'] # git update-ref -d HEAD - (rp / 'file2').write_text(TEST_CONTENT) - repo.commit('Add file2') + (rp / "file1").write_text(TEST_CONTENT) + c1 = repo.commit("Add file1") + del repo.repo.refs[b"refs/heads/master"] # git update-ref -d HEAD + (rp / "file2").write_text(TEST_CONTENT) + repo.commit("Add file2") repo.merge([c1]) - (rp / 'file3').write_text(TEST_CONTENT) - repo.commit('add file3') - obj_id_hex = repo.repo.refs[b'HEAD'].decode() + (rp / "file3").write_text(TEST_CONTENT) + repo.commit("add file3") + obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) loader = git_loader(swh_storage, str(rp)) loader.load() with cook_extract_revision_gitfast(swh_storage, obj_id) as (ert, p): - assert ert.repo.refs[b'HEAD'].decode() == obj_id_hex + assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex def test_revision_two_double_fork_merge(self, swh_storage): # # 2---4---6 # / / / # 1---3---5 # repo = TestRepo() with repo as rp: - (rp / 'file1').write_text(TEST_CONTENT) - c1 = repo.commit('Add file1') - repo.repo.refs[b'refs/heads/c1'] = c1 + (rp / "file1").write_text(TEST_CONTENT) + c1 = repo.commit("Add file1") + repo.repo.refs[b"refs/heads/c1"] = c1 - (rp / 'file2').write_text(TEST_CONTENT) - repo.commit('Add file2') + (rp / "file2").write_text(TEST_CONTENT) + repo.commit("Add file2") - (rp / 'file3').write_text(TEST_CONTENT) - c3 = repo.commit('Add file3', ref=b'refs/heads/c1') - repo.repo.refs[b'refs/heads/c3'] = c3 + (rp / "file3").write_text(TEST_CONTENT) + c3 = repo.commit("Add file3", ref=b"refs/heads/c1") + repo.repo.refs[b"refs/heads/c3"] = c3 repo.merge([c3]) - (rp / 'file5').write_text(TEST_CONTENT) - c5 = repo.commit('Add file3', ref=b'refs/heads/c3') + (rp / "file5").write_text(TEST_CONTENT) + c5 = repo.commit("Add file3", ref=b"refs/heads/c3") repo.merge([c5]) - obj_id_hex = repo.repo.refs[b'HEAD'].decode() + obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) loader = git_loader(swh_storage, str(rp)) loader.load() with cook_extract_revision_gitfast(swh_storage, obj_id) as (ert, p): - assert ert.repo.refs[b'HEAD'].decode() == obj_id_hex + assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex def test_revision_triple_merge(self, swh_storage): # # .---.---5 # / / / # 2 3 4 # / / / # 1---.---. # repo = TestRepo() with repo as rp: - (rp / 'file1').write_text(TEST_CONTENT) - c1 = repo.commit('Commit 1') - repo.repo.refs[b'refs/heads/b1'] = c1 - repo.repo.refs[b'refs/heads/b2'] = c1 - - repo.commit('Commit 2') - c3 = repo.commit('Commit 3', ref=b'refs/heads/b1') - c4 = repo.commit('Commit 4', ref=b'refs/heads/b2') + (rp / "file1").write_text(TEST_CONTENT) + c1 = repo.commit("Commit 1") + repo.repo.refs[b"refs/heads/b1"] = c1 + repo.repo.refs[b"refs/heads/b2"] = c1 + + repo.commit("Commit 2") + c3 = repo.commit("Commit 3", ref=b"refs/heads/b1") + c4 = repo.commit("Commit 4", ref=b"refs/heads/b2") repo.merge([c3, c4]) - obj_id_hex = repo.repo.refs[b'HEAD'].decode() + obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) loader = git_loader(swh_storage, str(rp)) loader.load() with cook_extract_revision_gitfast(swh_storage, obj_id) as (ert, p): - assert ert.repo.refs[b'HEAD'].decode() == obj_id_hex + assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex def test_revision_filtered_objects(self, swh_storage): repo = TestRepo() with repo as rp: - file_1, id_1 = hash_content(b'test1') - file_2, id_2 = hash_content(b'test2') - file_3, id_3 = hash_content(b'test3') + file_1, id_1 = hash_content(b"test1") + file_2, id_2 = hash_content(b"test2") + file_3, id_3 = hash_content(b"test3") - (rp / 'file').write_bytes(file_1) - (rp / 'hidden_file').write_bytes(file_2) - (rp / 'absent_file').write_bytes(file_3) + (rp / "file").write_bytes(file_1) + (rp / "hidden_file").write_bytes(file_2) + (rp / "absent_file").write_bytes(file_3) repo.commit() - obj_id_hex = repo.repo.refs[b'HEAD'].decode() + obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) loader = git_loader(swh_storage, str(rp)) loader.load() # FIXME: storage.content_update() should be changed to allow things # like that with swh_storage.get_db().transaction() as cur: - cur.execute("""update content set status = 'visible' - where sha1 = %s""", (id_1,)) - cur.execute("""update content set status = 'hidden' - where sha1 = %s""", (id_2,)) - cur.execute("""update content set status = 'absent' - where sha1 = %s""", (id_3,)) + cur.execute( + """update content set status = 'visible' + where sha1 = %s""", + (id_1,), + ) + cur.execute( + """update content set status = 'hidden' + where sha1 = %s""", + (id_2,), + ) + cur.execute( + """update content set status = 'absent' + where sha1 = %s""", + (id_3,), + ) with cook_extract_revision_gitfast(swh_storage, obj_id) as (ert, p): - ert.checkout(b'HEAD') - assert (p / 'file').read_bytes() == b'test1' - assert (p / 'hidden_file').read_bytes() == HIDDEN_MESSAGE - assert (p / 'absent_file').read_bytes() == SKIPPED_MESSAGE + ert.checkout(b"HEAD") + assert (p / "file").read_bytes() == b"test1" + assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE + assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE def test_revision_bogus_perms(self, swh_storage): # Some early git repositories have 664/775 permissions... let's check # if all the weird modes are properly normalized in the revision # cooker. repo = TestRepo() with repo as rp: - (rp / 'file').write_text(TEST_CONTENT) - (rp / 'file').chmod(0o664) - (rp / 'executable').write_bytes(TEST_EXECUTABLE) - (rp / 'executable').chmod(0o775) - (rp / 'wat').write_text(TEST_CONTENT) - (rp / 'wat').chmod(0o604) - repo.commit('initial commit') + (rp / "file").write_text(TEST_CONTENT) + (rp / "file").chmod(0o664) + (rp / "executable").write_bytes(TEST_EXECUTABLE) + (rp / "executable").chmod(0o775) + (rp / "wat").write_text(TEST_CONTENT) + (rp / "wat").chmod(0o604) + repo.commit("initial commit") loader = git_loader(swh_storage, str(rp)) loader.load() - obj_id_hex = repo.repo.refs[b'HEAD'].decode() + obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with cook_extract_revision_gitfast(swh_storage, obj_id) as (ert, p): - ert.checkout(b'HEAD') - assert (p / 'file').stat().st_mode == 0o100644 - assert (p / 'executable').stat().st_mode == 0o100755 - assert (p / 'wat').stat().st_mode == 0o100644 + ert.checkout(b"HEAD") + assert (p / "file").stat().st_mode == 0o100644 + assert (p / "executable").stat().st_mode == 0o100755 + assert (p / "wat").stat().st_mode == 0o100644 def test_revision_null_fields(self, swh_storage): # Our schema doesn't enforce a lot of non-null revision fields. We need # to check these cases don't break the cooker. repo = TestRepo() with repo as rp: - (rp / 'file').write_text(TEST_CONTENT) - c = repo.commit('initial commit') + (rp / "file").write_text(TEST_CONTENT) + c = repo.commit("initial commit") loader = git_loader(swh_storage, str(rp)) loader.load() - repo.repo.refs[b'HEAD'].decode() + repo.repo.refs[b"HEAD"].decode() dir_id_hex = repo.repo[c].tree.decode() dir_id = hashutil.hash_to_bytes(dir_id_hex) test_revision = Revision( - message=b'', - author=Person(name=None, email=None, fullname=b''), + message=b"", + author=Person(name=None, email=None, fullname=b""), date=None, - committer=Person(name=None, email=None, fullname=b''), + committer=Person(name=None, email=None, fullname=b""), committer_date=None, parents=[], type=RevisionType.GIT, directory=dir_id, metadata={}, - synthetic=True + synthetic=True, ) swh_storage.revision_add([test_revision]) - with cook_extract_revision_gitfast(swh_storage, - test_revision.id) as (ert, p): - ert.checkout(b'HEAD') - assert (p / 'file').stat().st_mode == 0o100644 + with cook_extract_revision_gitfast(swh_storage, test_revision.id) as (ert, p): + ert.checkout(b"HEAD") + assert (p / "file").stat().st_mode == 0o100644 def test_revision_revision_data(self, swh_storage): - target_rev = '0e8a3ad980ec179856012b7eecf4327e99cd44cd' + target_rev = "0e8a3ad980ec179856012b7eecf4327e99cd44cd" dir = Directory( entries=[ - DirectoryEntry(name=b'submodule', - type='rev', - target=hashutil.hash_to_bytes(target_rev), - perms=0o100644) + DirectoryEntry( + name=b"submodule", + type="rev", + target=hashutil.hash_to_bytes(target_rev), + perms=0o100644, + ) ], ) swh_storage.directory_add([dir]) rev = Revision( - message=b'', - author=Person(name=None, email=None, fullname=b''), + message=b"", + author=Person(name=None, email=None, fullname=b""), date=None, - committer=Person(name=None, email=None, fullname=b''), + committer=Person(name=None, email=None, fullname=b""), committer_date=None, parents=[], type=RevisionType.GIT, directory=dir.id, metadata={}, - synthetic=True + synthetic=True, ) swh_storage.revision_add([rev]) with cook_stream_revision_gitfast(swh_storage, rev.id) as stream: - pattern = 'M 160000 {} submodule'.format(target_rev).encode() + pattern = "M 160000 {} submodule".format(target_rev).encode() assert pattern in stream.read() diff --git a/swh/vault/tests/test_cookers_base.py b/swh/vault/tests/test_cookers_base.py index 35b28f1..5486d75 100644 --- a/swh/vault/tests/test_cookers_base.py +++ b/swh/vault/tests/test_cookers_base.py @@ -1,81 +1,75 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from unittest.mock import MagicMock from swh.model import hashutil from swh.vault.cookers.base import BaseVaultCooker -TEST_BUNDLE_CHUNKS = [b"test content 1\n", - b"test content 2\n", - b"test content 3\n"] -TEST_BUNDLE_CONTENT = b''.join(TEST_BUNDLE_CHUNKS) -TEST_OBJ_TYPE = 'test_type' -TEST_HEX_ID = '17a3e48bce37be5226490e750202ad3a9a1a3fe9' +TEST_BUNDLE_CHUNKS = [b"test content 1\n", b"test content 2\n", b"test content 3\n"] +TEST_BUNDLE_CONTENT = b"".join(TEST_BUNDLE_CHUNKS) +TEST_OBJ_TYPE = "test_type" +TEST_HEX_ID = "17a3e48bce37be5226490e750202ad3a9a1a3fe9" TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) class BaseVaultCookerMock(BaseVaultCooker): CACHE_TYPE_KEY = TEST_OBJ_TYPE def __init__(self): # we do not call super() here to bypass the building of db objects from # config since we do mock these db objects self.config = {} self.storage = MagicMock() self.backend = MagicMock() self.obj_type = self.CACHE_TYPE_KEY self.obj_id = hashutil.hash_to_bytes(TEST_OBJ_ID) self.max_bundle_size = 1024 def check_exists(self): return True def prepare_bundle(self): for chunk in TEST_BUNDLE_CHUNKS: self.write(chunk) def test_simple_cook(): cooker = BaseVaultCookerMock() cooker.cook() cooker.backend.put_bundle.assert_called_once_with( - TEST_OBJ_TYPE, TEST_OBJ_ID, TEST_BUNDLE_CONTENT) - cooker.backend.set_status.assert_called_with( - TEST_OBJ_TYPE, TEST_OBJ_ID, 'done') - cooker.backend.set_progress.assert_called_with( - TEST_OBJ_TYPE, TEST_OBJ_ID, None) - cooker.backend.send_notif.assert_called_with( - TEST_OBJ_TYPE, TEST_OBJ_ID) + TEST_OBJ_TYPE, TEST_OBJ_ID, TEST_BUNDLE_CONTENT + ) + cooker.backend.set_status.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID, "done") + cooker.backend.set_progress.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID, None) + cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID) def test_code_exception_cook(): cooker = BaseVaultCookerMock() cooker.prepare_bundle = MagicMock() cooker.prepare_bundle.side_effect = RuntimeError("Nope") cooker.cook() # Potentially remove this when we have objstorage streaming cooker.backend.put_bundle.assert_not_called() - cooker.backend.set_status.assert_called_with( - TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed') + cooker.backend.set_status.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID, "failed") assert "Nope" not in cooker.backend.set_progress.call_args[0][2] cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID) def test_policy_exception_cook(): cooker = BaseVaultCookerMock() cooker.max_bundle_size = 8 cooker.cook() # Potentially remove this when we have objstorage streaming cooker.backend.put_bundle.assert_not_called() - cooker.backend.set_status.assert_called_with( - TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed') + cooker.backend.set_status.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID, "failed") assert "exceeds" in cooker.backend.set_progress.call_args[0][2] cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID) diff --git a/swh/vault/tests/vault_testing.py b/swh/vault/tests/vault_testing.py index c15c8f9..bdd1475 100644 --- a/swh/vault/tests/vault_testing.py +++ b/swh/vault/tests/vault_testing.py @@ -1,21 +1,20 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.model import hashutil def hash_content(content): """Hash the content's id (sha1). Args: content (bytes): Content to hash Returns: The tuple (content, content's id as bytes) """ - hashes = hashutil.MultiHash.from_data( - content, hash_names=['sha1']).digest() - return content, hashes['sha1'] + hashes = hashutil.MultiHash.from_data(content, hash_names=["sha1"]).digest() + return content, hashes["sha1"] diff --git a/swh/vault/to_disk.py b/swh/vault/to_disk.py index 85d70f7..7257f65 100644 --- a/swh/vault/to_disk.py +++ b/swh/vault/to_disk.py @@ -1,126 +1,127 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import collections import os from swh.model import hashutil from swh.model.from_disk import mode_to_perms, DentryPerms from swh.storage.algos.dir_iterators import dir_iterator -SKIPPED_MESSAGE = (b'This content has not been retrieved in the ' - b'Software Heritage archive due to its size.') +SKIPPED_MESSAGE = ( + b"This content has not been retrieved in the " + b"Software Heritage archive due to its size." +) -HIDDEN_MESSAGE = (b'This content is hidden.') +HIDDEN_MESSAGE = b"This content is hidden." def get_filtered_files_content(storage, files_data): """Retrieve the files specified by files_data and apply filters for skipped and missing contents. Args: storage: the storage from which to retrieve the objects files_data: list of file entries as returned by directory_ls() Yields: The entries given in files_data with a new 'content' key that points to the file content in bytes. The contents can be replaced by a specific message to indicate that they could not be retrieved (either due to privacy policy or because their sizes were too big for us to archive it). """ - contents_to_fetch = [f['sha1'] for f in files_data - if f['status'] == 'visible'] + contents_to_fetch = [f["sha1"] for f in files_data if f["status"] == "visible"] contents_fetched = storage.content_get(contents_to_fetch) - contents = {c['sha1']: c['data'] for c in contents_fetched} + contents = {c["sha1"]: c["data"] for c in contents_fetched} for file_data in files_data: - if file_data['status'] == 'visible': - content = contents[file_data['sha1']] - elif file_data['status'] == 'absent': + if file_data["status"] == "visible": + content = contents[file_data["sha1"]] + elif file_data["status"] == "absent": content = SKIPPED_MESSAGE - elif file_data['status'] == 'hidden': + elif file_data["status"] == "hidden": content = HIDDEN_MESSAGE - yield {'content': content, **file_data} + yield {"content": content, **file_data} def apply_chunked(func, input_list, chunk_size): """Apply func on input_list divided in chunks of size chunk_size""" for i in range(0, len(input_list), chunk_size): - yield from func(input_list[i:i + chunk_size]) + yield from func(input_list[i : i + chunk_size]) class DirectoryBuilder: """Reconstructs the on-disk representation of a directory in the storage. """ def __init__(self, storage, root, dir_id): """Initialize the directory builder. Args: storage: the storage object root: the path where the directory should be reconstructed dir_id: the identifier of the directory in the storage """ self.storage = storage self.root = root self.dir_id = dir_id def build(self): """Perform the reconstruction of the directory in the given root.""" # Retrieve data from the database. # Split into files, revisions and directory data. entries = collections.defaultdict(list) for entry in dir_iterator(self.storage, self.dir_id): - entries[entry['type']].append(entry) + entries[entry["type"]].append(entry) # Recreate the directory's subtree and then the files into it. - self._create_tree(entries['dir']) - self._create_files(entries['file']) - self._create_revisions(entries['rev']) + self._create_tree(entries["dir"]) + self._create_files(entries["file"]) + self._create_revisions(entries["rev"]) def _create_tree(self, directories): """Create a directory tree from the given paths The tree is created from `root` and each given directory in `directories` will be created. """ # Directories are sorted by depth so they are created in the # right order bsep = os.path.sep.encode() - directories = sorted(directories, - key=lambda x: len(x['path'].split(bsep))) + directories = sorted(directories, key=lambda x: len(x["path"].split(bsep))) for dir in directories: - os.makedirs(os.path.join(self.root, dir['path'])) + os.makedirs(os.path.join(self.root, dir["path"])) def _create_files(self, files_data): """Create the files in the tree and fetch their contents.""" f = functools.partial(get_filtered_files_content, self.storage) files_data = apply_chunked(f, files_data, 1000) for file_data in files_data: - path = os.path.join(self.root, file_data['path']) - self._create_file(path, file_data['content'], file_data['perms']) + path = os.path.join(self.root, file_data["path"]) + self._create_file(path, file_data["content"], file_data["perms"]) def _create_revisions(self, revs_data): """Create the revisions in the tree as broken symlinks to the target identifier.""" for file_data in revs_data: - path = os.path.join(self.root, file_data['path']) - self._create_file(path, hashutil.hash_to_hex(file_data['target']), - mode=0o120000) + path = os.path.join(self.root, file_data["path"]) + self._create_file( + path, hashutil.hash_to_hex(file_data["target"]), mode=0o120000 + ) def _create_file(self, path, content, mode=0o100644): """Create the given file and fill it with content.""" perms = mode_to_perms(mode) if perms == DentryPerms.symlink: os.symlink(content, path) else: - with open(path, 'wb') as f: + with open(path, "wb") as f: f.write(content) os.chmod(path, perms.value) diff --git a/tox.ini b/tox.ini index 93f2ff2..04524da 100644 --- a/tox.ini +++ b/tox.ini @@ -1,27 +1,34 @@ [tox] -envlist=flake8,mypy,py3 +envlist=black,flake8,mypy,py3 [testenv] extras = testing deps = pytest-cov commands = pytest --cov={envsitepackagesdir}/swh/vault \ {envsitepackagesdir}/swh/vault \ --cov-branch {posargs} +[testenv:black] +skip_install = true +deps = + black +commands = + {envpython} -m black --check swh + [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy commands = mypy swh