diff --git a/PKG-INFO b/PKG-INFO index 92b3b0b..a3be86d 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,31 +1,31 @@ Metadata-Version: 2.1 Name: swh.vault -Version: 0.2.0 +Version: 0.3.0 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-vault Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-vault/ Description: swh-vault ========= User-facing service that allows to retrieve parts of the archive as self-contained bundles. See the [documentation](https://docs.softwareheritage.org/devel/swh-vault/index.html) for more details. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/requirements-swh.txt b/requirements-swh.txt index d9966ae..6a1f292 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ -swh.core[db,http] >= 0.3 +swh.core[db,http] >= 0.5 swh.model >= 0.3 swh.objstorage >= 0.0.17 swh.scheduler >= 0.7.0 swh.storage >= 0.0.106 diff --git a/requirements-test.txt b/requirements-test.txt index 078a4e3..66b4544 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,7 +1,8 @@ pytest pytest-aiohttp pytest-postgresql dulwich >= 0.18.7 swh.loader.core swh.loader.git >= 0.0.52 swh.storage[testing] +pytest-mock diff --git a/requirements.txt b/requirements.txt index 328c1d5..bbc6f49 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ click flask psycopg2 python-dateutil fastimport - +typing-extensions diff --git a/swh.vault.egg-info/PKG-INFO b/swh.vault.egg-info/PKG-INFO index 92b3b0b..a3be86d 100644 --- a/swh.vault.egg-info/PKG-INFO +++ b/swh.vault.egg-info/PKG-INFO @@ -1,31 +1,31 @@ Metadata-Version: 2.1 Name: swh.vault -Version: 0.2.0 +Version: 0.3.0 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-vault Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-vault/ Description: swh-vault ========= User-facing service that allows to retrieve parts of the archive as self-contained bundles. See the [documentation](https://docs.softwareheritage.org/devel/swh-vault/index.html) for more details. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.vault.egg-info/SOURCES.txt b/swh.vault.egg-info/SOURCES.txt index 858be50..cda1c23 100644 --- a/swh.vault.egg-info/SOURCES.txt +++ b/swh.vault.egg-info/SOURCES.txt @@ -1,64 +1,67 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile README.md conftest.py mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/api.rst docs/conf.py docs/getting-started.rst docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder sql/upgrades/002.sql sql/upgrades/003.sql swh/__init__.py swh.vault.egg-info/PKG-INFO swh.vault.egg-info/SOURCES.txt swh.vault.egg-info/dependency_links.txt swh.vault.egg-info/entry_points.txt swh.vault.egg-info/not-zip-safe swh.vault.egg-info/requires.txt swh.vault.egg-info/top_level.txt swh/vault/__init__.py swh/vault/backend.py swh/vault/cache.py swh/vault/cli.py swh/vault/cooking_tasks.py swh/vault/exc.py +swh/vault/interface.py swh/vault/py.typed swh/vault/to_disk.py swh/vault/api/__init__.py swh/vault/api/client.py swh/vault/api/server.py swh/vault/cookers/__init__.py swh/vault/cookers/base.py swh/vault/cookers/directory.py swh/vault/cookers/revision_flat.py swh/vault/cookers/revision_gitfast.py swh/vault/cookers/utils.py -swh/vault/sql/30-swh-schema.sql +swh/vault/sql/30-schema.sql swh/vault/tests/__init__.py swh/vault/tests/conftest.py swh/vault/tests/test_backend.py swh/vault/tests/test_cache.py swh/vault/tests/test_cookers.py swh/vault/tests/test_cookers_base.py +swh/vault/tests/test_init.py +swh/vault/tests/test_init_cookers.py swh/vault/tests/test_server.py swh/vault/tests/test_to_disk.py swh/vault/tests/vault_testing.py \ No newline at end of file diff --git a/swh.vault.egg-info/requires.txt b/swh.vault.egg-info/requires.txt index 7ce4bfe..20aa219 100644 --- a/swh.vault.egg-info/requires.txt +++ b/swh.vault.egg-info/requires.txt @@ -1,19 +1,21 @@ click flask psycopg2 python-dateutil fastimport -swh.core[db,http]>=0.3 +typing-extensions +swh.core[db,http]>=0.5 swh.model>=0.3 swh.objstorage>=0.0.17 swh.scheduler>=0.7.0 swh.storage>=0.0.106 [testing] pytest pytest-aiohttp pytest-postgresql dulwich>=0.18.7 swh.loader.core swh.loader.git>=0.0.52 swh.storage[testing] +pytest-mock diff --git a/swh/vault/__init__.py b/swh/vault/__init__.py index a39a171..db16ff9 100644 --- a/swh/vault/__init__.py +++ b/swh/vault/__init__.py @@ -1,41 +1,54 @@ -# Copyright (C) 2018 The Software Heritage developers +# Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information + +from __future__ import annotations + +import importlib import logging +from typing import Dict +import warnings logger = logging.getLogger(__name__) -def get_vault(cls="remote", args={}): +BACKEND_TYPES: Dict[str, str] = { + "remote": ".api.client.RemoteVaultClient", + "local": ".backend.VaultBackend", +} + + +def get_vault(cls: str = "remote", **kwargs): """ Get a vault object of class `vault_class` with arguments `vault_args`. Args: - vault (dict): dictionary with keys: - - cls (str): vault's class, either 'remote' - - args (dict): dictionary with keys + cls: vault's class, either 'remote' or 'local' + kwargs: arguments to pass to the class' constructor Returns: an instance of VaultBackend (either local or remote) Raises: ValueError if passed an unknown storage class. """ - if cls == "remote": - from .api.client import RemoteVaultClient as Vault - elif cls == "local": - from swh.scheduler import get_scheduler - from swh.storage import get_storage - from swh.vault.backend import VaultBackend as Vault - from swh.vault.cache import VaultCache - - args["cache"] = VaultCache(**args["cache"]) - args["storage"] = get_storage(**args["storage"]) - args["scheduler"] = get_scheduler(**args["scheduler"]) - else: - raise ValueError("Unknown storage class `%s`" % cls) - logger.debug("Instantiating %s with %s" % (Vault, args)) - return Vault(**args) + if "args" in kwargs: + warnings.warn( + 'Explicit "args" key is deprecated, use keys directly instead.', + DeprecationWarning, + ) + kwargs = kwargs["args"] + + class_path = BACKEND_TYPES.get(cls) + if class_path is None: + raise ValueError( + f"Unknown Vault class `{cls}`. " f"Supported: {', '.join(BACKEND_TYPES)}" + ) + + (module_path, class_name) = class_path.rsplit(".", 1) + module = importlib.import_module(module_path, package=__package__) + Vault = getattr(module, class_name) + return Vault(**kwargs) diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py index 79a2fd4..0a5fd42 100644 --- a/swh/vault/api/client.py +++ b/swh/vault/api/client.py @@ -1,59 +1,15 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.core.api import RPCClient -from swh.model import hashutil from swh.vault.exc import NotFoundExc +from swh.vault.interface import VaultInterface class RemoteVaultClient(RPCClient): """Client to the Software Heritage vault cache.""" + backend_class = VaultInterface reraise_exceptions = [NotFoundExc] - - # Web API endpoints - - def fetch(self, obj_type, obj_id): - hex_id = hashutil.hash_to_hex(obj_id) - return self.get("fetch/{}/{}".format(obj_type, hex_id)) - - def cook(self, obj_type, obj_id, email=None): - hex_id = hashutil.hash_to_hex(obj_id) - return self.post( - "cook/{}/{}".format(obj_type, hex_id), - data={}, - params=({"email": email} if email else None), - ) - - def progress(self, obj_type, obj_id): - hex_id = hashutil.hash_to_hex(obj_id) - return self.get("progress/{}/{}".format(obj_type, hex_id)) - - # Cookers endpoints - - def set_progress(self, obj_type, obj_id, progress): - hex_id = hashutil.hash_to_hex(obj_id) - return self.post("set_progress/{}/{}".format(obj_type, hex_id), data=progress) - - def set_status(self, obj_type, obj_id, status): - hex_id = hashutil.hash_to_hex(obj_id) - return self.post("set_status/{}/{}".format(obj_type, hex_id), data=status) - - # TODO: handle streaming properly - def put_bundle(self, obj_type, obj_id, bundle): - hex_id = hashutil.hash_to_hex(obj_id) - return self.post("put_bundle/{}/{}".format(obj_type, hex_id), data=bundle) - - def send_notif(self, obj_type, obj_id): - hex_id = hashutil.hash_to_hex(obj_id) - return self.post("send_notif/{}/{}".format(obj_type, hex_id), data=None) - - # Batch endpoints - - def batch_cook(self, batch): - return self.post("batch_cook", data=batch) - - def batch_progress(self, batch_id): - return self.get("batch_progress/{}".format(batch_id)) diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py index 6c178e0..881a390 100644 --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -1,233 +1,132 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from __future__ import annotations + import asyncio -import collections import os +from typing import Any, Dict, Optional import aiohttp.web -from swh.core import config -from swh.core.api.asynchronous import RPCServerApp, decode_request -from swh.core.api.asynchronous import encode_data_server as encode_data -from swh.model import hashutil -from swh.vault import get_vault +from swh.core.api.asynchronous import RPCServerApp +from swh.core.config import config_basepath, merge_configs, read_raw_config +from swh.vault import get_vault as get_swhvault from swh.vault.backend import NotFoundExc -from swh.vault.cookers import COOKER_TYPES +from swh.vault.interface import VaultInterface -DEFAULT_CONFIG_PATH = "vault/server" DEFAULT_CONFIG = { - "storage": ("dict", {"cls": "remote", "args": {"url": "http://localhost:5002/",},}), - "cache": ( - "dict", - { - "cls": "pathslicing", - "args": {"root": "/srv/softwareheritage/vault", "slicing": "0:1/1:5",}, - }, - ), - "client_max_size": ("int", 1024 ** 3), - "vault": ( - "dict", - {"cls": "local", "args": {"db": "dbname=softwareheritage-vault-dev",},}, - ), - "scheduler": ("dict", {"cls": "remote", "url": "http://localhost:5008/",},), + "storage": {"cls": "remote", "url": "http://localhost:5002/"}, + "cache": { + "cls": "pathslicing", + "args": {"root": "/srv/softwareheritage/vault", "slicing": "0:1/1:5"}, + }, + "client_max_size": 1024 ** 3, + "vault": {"cls": "local", "args": {"db": "dbname=softwareheritage-vault-dev",}}, + "scheduler": {"cls": "remote", "url": "http://localhost:5008/"}, } -@asyncio.coroutine -def index(request): - return aiohttp.web.Response(body="SWH Vault API server") - - -# Web API endpoints - - -@asyncio.coroutine -def vault_fetch(request): - obj_type = request.match_info["type"] - obj_id = request.match_info["id"] - - if not request.app["backend"].is_available(obj_type, obj_id): - raise NotFoundExc(f"{obj_type} {obj_id} is not available.") - - return encode_data(request.app["backend"].fetch(obj_type, obj_id)) - +vault = None +app = None -def user_info(task_info): - return { - "id": task_info["id"], - "status": task_info["task_status"], - "progress_message": task_info["progress_msg"], - "obj_type": task_info["type"], - "obj_id": hashutil.hash_to_hex(task_info["object_id"]), - } +def get_vault(config: Optional[Dict[str, Any]] = None) -> VaultInterface: + global vault + if not vault: + assert config is not None + vault = get_swhvault(**config) + return vault -@asyncio.coroutine -def vault_cook(request): - obj_type = request.match_info["type"] - obj_id = request.match_info["id"] - email = request.query.get("email") - sticky = request.query.get("sticky") in ("true", "1") - - if obj_type not in COOKER_TYPES: - raise NotFoundExc(f"{obj_type} is an unknown type.") - - info = request.app["backend"].cook_request( - obj_type, obj_id, email=email, sticky=sticky - ) - # TODO: return 201 status (Created) once the api supports it - return encode_data(user_info(info)) +class VaultServerApp(RPCServerApp): + client_exception_classes = (NotFoundExc,) @asyncio.coroutine -def vault_progress(request): - obj_type = request.match_info["type"] - obj_id = request.match_info["id"] - - info = request.app["backend"].task_info(obj_type, obj_id) - if not info: - raise NotFoundExc(f"{obj_type} {obj_id} was not found.") - - return encode_data(user_info(info)) - - -# Cookers endpoints - +def index(request): + return aiohttp.web.Response(body="SWH Vault API server") -@asyncio.coroutine -def set_progress(request): - obj_type = request.match_info["type"] - obj_id = request.match_info["id"] - progress = yield from decode_request(request) - request.app["backend"].set_progress(obj_type, obj_id, progress) - return encode_data(True) # FIXME: success value? +def check_config(cfg: Dict[str, Any]) -> Dict[str, Any]: + """Ensure the configuration is ok to run a local vault server, and propagate defaults. -@asyncio.coroutine -def set_status(request): - obj_type = request.match_info["type"] - obj_id = request.match_info["id"] - status = yield from decode_request(request) - request.app["backend"].set_status(obj_type, obj_id, status) - return encode_data(True) # FIXME: success value? + Raises: + EnvironmentError if the configuration is not for local instance + ValueError if one of the following keys is missing: vault, cache, storage, + scheduler + Returns: + New configuration dict to instantiate a local vault server instance. -@asyncio.coroutine -def put_bundle(request): - obj_type = request.match_info["type"] - obj_id = request.match_info["id"] + """ + cfg = cfg.copy() - # TODO: handle streaming properly - content = yield from decode_request(request) - request.app["backend"].cache.add(obj_type, obj_id, content) - return encode_data(True) # FIXME: success value? + if "vault" not in cfg: + raise ValueError("missing 'vault' configuration") + vcfg = cfg["vault"] + if vcfg["cls"] != "local": + raise EnvironmentError( + "The vault backend can only be started with a 'local' configuration", + ) -@asyncio.coroutine -def send_notif(request): - obj_type = request.match_info["type"] - obj_id = request.match_info["id"] - request.app["backend"].send_all_notifications(obj_type, obj_id) - return encode_data(True) # FIXME: success value? + # TODO: Soft-deprecation of args key. Remove when ready. + vcfg.update(vcfg.get("args", {})) + # Default to top-level value if any + if "cache" not in vcfg: + vcfg["cache"] = cfg.get("cache") + if "storage" not in vcfg: + vcfg["storage"] = cfg.get("storage") + if "scheduler" not in vcfg: + vcfg["scheduler"] = cfg.get("scheduler") -# Batch endpoints + for key in ("cache", "storage", "scheduler"): + if not vcfg.get(key): + raise ValueError(f"invalid configuration: missing {key} config entry.") + return cfg -@asyncio.coroutine -def batch_cook(request): - batch = yield from decode_request(request) - for obj_type, obj_id in batch: - if obj_type not in COOKER_TYPES: - raise NotFoundExc(f"{obj_type} is an unknown type.") - batch_id = request.app["backend"].batch_cook(batch) - return encode_data({"id": batch_id}) +def make_app(config: Dict[str, Any]) -> VaultServerApp: + """Ensure the configuration is ok, then instantiate the server application -@asyncio.coroutine -def batch_progress(request): - batch_id = request.match_info["batch_id"] - bundles = request.app["backend"].batch_info(batch_id) - if not bundles: - raise NotFoundExc(f"Batch {batch_id} does not exist.") - bundles = [user_info(bundle) for bundle in bundles] - counter = collections.Counter(b["status"] for b in bundles) - res = { - "bundles": bundles, - "total": len(bundles), - **{k: 0 for k in ("new", "pending", "done", "failed")}, - **dict(counter), - } - return encode_data(res) - - -# Web server - - -def make_app(backend, **kwargs): - app = RPCServerApp(**kwargs) + """ + config = check_config(config) + app = VaultServerApp( + __name__, + backend_class=VaultInterface, + backend_factory=lambda: get_vault(config["vault"]), + client_max_size=config["client_max_size"], + ) app.router.add_route("GET", "/", index) - app.client_exception_classes = (NotFoundExc,) - - # Endpoints used by the web API - app.router.add_route("GET", "/fetch/{type}/{id}", vault_fetch) - app.router.add_route("POST", "/cook/{type}/{id}", vault_cook) - app.router.add_route("GET", "/progress/{type}/{id}", vault_progress) - - # Endpoints used by the Cookers - app.router.add_route("POST", "/set_progress/{type}/{id}", set_progress) - app.router.add_route("POST", "/set_status/{type}/{id}", set_status) - app.router.add_route("POST", "/put_bundle/{type}/{id}", put_bundle) - app.router.add_route("POST", "/send_notif/{type}/{id}", send_notif) - - # Endpoints for batch requests - app.router.add_route("POST", "/batch_cook", batch_cook) - app.router.add_route("GET", "/batch_progress/{batch_id}", batch_progress) - - app["backend"] = backend return app -def get_local_backend(cfg): - if "vault" not in cfg: - raise ValueError("missing '%vault' configuration") +def make_app_from_configfile( + config_path: Optional[str] = None, **kwargs +) -> VaultServerApp: + """Load and check configuration if ok, then instantiate (once) a vault server + application. - vcfg = cfg["vault"] - if vcfg["cls"] != "local": - raise EnvironmentError( - "The vault backend can only be started with a 'local' " "configuration", - err=True, - ) - args = vcfg["args"] - if "cache" not in args: - args["cache"] = cfg.get("cache") - if "storage" not in args: - args["storage"] = cfg.get("storage") - if "scheduler" not in args: - args["scheduler"] = cfg.get("scheduler") + """ + global app + if not app: + config_path = os.environ.get("SWH_CONFIG_FILENAME", config_path) + if not config_path: + raise ValueError("Missing configuration path.") + if not os.path.isfile(config_path): + raise ValueError(f"Configuration path {config_path} should exist.") - for key in ("cache", "storage", "scheduler"): - if not args.get(key): - raise ValueError("invalid configuration; missing %s config entry." % key) - - return get_vault("local", args) - - -def make_app_from_configfile(config_file=None, **kwargs): - if config_file is None: - config_file = DEFAULT_CONFIG_PATH - config_file = os.environ.get("SWH_CONFIG_FILENAME", config_file) - if os.path.isfile(config_file): - cfg = config.read(config_file, DEFAULT_CONFIG) - else: - cfg = config.load_named_config(config_file, DEFAULT_CONFIG) - vault = get_local_backend(cfg) - return make_app(backend=vault, client_max_size=cfg["client_max_size"], **kwargs) + app_config = read_raw_config(config_basepath(config_path)) + app_config = merge_configs(DEFAULT_CONFIG, app_config) + app = make_app(app_config) + + return app if __name__ == "__main__": print("Deprecated. Use swh-vault ") diff --git a/swh/vault/backend.py b/swh/vault/backend.py index 1974e9e..9355b06 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,487 +1,551 @@ -# Copyright (C) 2017-2018 The Software Heritage developers +# Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import collections from email.mime.text import MIMEText import smtplib +from typing import Any, Dict, List, Optional, Tuple import psycopg2.extras import psycopg2.pool from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.model import hashutil +from swh.scheduler import get_scheduler from swh.scheduler.utils import create_oneshot_task_dict -from swh.vault.cookers import get_cooker_cls +from swh.storage import get_storage +from swh.vault.cache import VaultCache +from swh.vault.cookers import COOKER_TYPES, get_cooker_cls from swh.vault.exc import NotFoundExc +from swh.vault.interface import ObjectId cooking_task_name = "swh.vault.cooking_tasks.SWHCookingTask" NOTIF_EMAIL_FROM = '"Software Heritage Vault" ' "" NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {obj_type} {short_id}" NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {obj_type} {short_id}" NOTIF_EMAIL_BODY_SUCCESS = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ NOTIF_EMAIL_BODY_FAILURE = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle could not be cooked for the following reason: {progress_msg} We apologize for the inconvenience. --\x20 The Software Heritage Developers """ -def batch_to_bytes(batch): - return [(obj_type, hashutil.hash_to_bytes(obj_id)) for obj_type, obj_id in batch] +def batch_to_bytes(batch: List[Tuple[str, str]]) -> List[Tuple[str, bytes]]: + return [(obj_type, hashutil.hash_to_bytes(hex_id)) for obj_type, hex_id in batch] class VaultBackend: """ Backend for the Software Heritage vault. """ - def __init__(self, db, cache, scheduler, storage=None, **config): + def __init__(self, db, **config): self.config = config - self.cache = cache - self.scheduler = scheduler - self.storage = storage + self.cache = VaultCache(**config["cache"]) + self.scheduler = get_scheduler(**config["scheduler"]) + self.storage = get_storage(**config["storage"]) self.smtp_server = smtplib.SMTP() self._pool = psycopg2.pool.ThreadedConnectionPool( config.get("min_pool_conns", 1), config.get("max_pool_conns", 10), db, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() + def _compute_ids(self, obj_id: ObjectId) -> Tuple[str, bytes]: + """Internal method to reconcile multiple possible inputs + + """ + if isinstance(obj_id, str): + return obj_id, hashutil.hash_to_bytes(obj_id) + return hashutil.hash_to_hex(obj_id), obj_id + @db_transaction() - def task_info(self, obj_type, obj_id, db=None, cur=None): - """Fetch information from a bundle""" - obj_id = hashutil.hash_to_bytes(obj_id) + def progress( + self, + obj_type: str, + obj_id: ObjectId, + raise_notfound: bool = True, + db=None, + cur=None, + ) -> Optional[Dict[str, Any]]: + hex_id, obj_id = self._compute_ids(obj_id) cur.execute( """ SELECT id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s""", (obj_type, obj_id), ) res = cur.fetchone() - if res: - res["object_id"] = bytes(res["object_id"]) + if not res: + if raise_notfound: + raise NotFoundExc(f"{obj_type} {hex_id} was not found.") + return None + + res["object_id"] = hashutil.hash_to_hex(res["object_id"]) return res - def _send_task(self, *args): + def _send_task(self, obj_type: str, hex_id: ObjectId): """Send a cooking task to the celery scheduler""" - task = create_oneshot_task_dict("cook-vault-bundle", *args) + task = create_oneshot_task_dict("cook-vault-bundle", obj_type, hex_id) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]["id"] @db_transaction() - def create_task(self, obj_type, obj_id, sticky=False, db=None, cur=None): + def create_task( + self, obj_type: str, obj_id: bytes, sticky: bool = False, db=None, cur=None + ): """Create and send a cooking task""" - obj_id = hashutil.hash_to_bytes(obj_id) - hex_id = hashutil.hash_to_hex(obj_id) + hex_id, obj_id = self._compute_ids(obj_id) cooker_class = get_cooker_cls(obj_type) cooker = cooker_class(obj_type, hex_id, backend=self, storage=self.storage) + if not cooker.check_exists(): - raise NotFoundExc("Object {} was not found.".format(hex_id)) + raise NotFoundExc(f"{obj_type} {hex_id} was not found.") cur.execute( """ INSERT INTO vault_bundle (type, object_id, sticky) VALUES (%s, %s, %s)""", (obj_type, obj_id, sticky), ) db.conn.commit() task_id = self._send_task(obj_type, hex_id) cur.execute( """ UPDATE vault_bundle SET task_id = %s WHERE type = %s AND object_id = %s""", (task_id, obj_type, obj_id), ) @db_transaction() - def add_notif_email(self, obj_type, obj_id, email, db=None, cur=None): + def add_notif_email( + self, obj_type: str, obj_id: bytes, email: str, db=None, cur=None + ): """Add an e-mail address to notify when a given bundle is ready""" - obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( """ INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))""", (email, obj_type, obj_id), ) + def put_bundle(self, obj_type: str, obj_id: ObjectId, bundle) -> bool: + _, obj_id = self._compute_ids(obj_id) + self.cache.add(obj_type, obj_id, bundle) + return True + @db_transaction() - def cook_request( - self, obj_type, obj_id, *, sticky=False, email=None, db=None, cur=None - ): - """Main entry point for cooking requests. This starts a cooking task if - needed, and add the given e-mail to the notify list""" - obj_id = hashutil.hash_to_bytes(obj_id) - info = self.task_info(obj_type, obj_id) + def cook( + self, + obj_type: str, + obj_id: ObjectId, + *, + sticky: bool = False, + email: Optional[str] = None, + db=None, + cur=None, + ) -> Dict[str, Any]: + hex_id, obj_id = self._compute_ids(obj_id) + info = self.progress(obj_type, obj_id, raise_notfound=False) + + if obj_type not in COOKER_TYPES: + raise NotFoundExc(f"{obj_type} is an unknown type.") # If there's a failed bundle entry, delete it first. if info is not None and info["task_status"] == "failed": + obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( - """DELETE FROM vault_bundle - WHERE type = %s AND object_id = %s""", + "DELETE FROM vault_bundle WHERE type = %s AND object_id = %s", (obj_type, obj_id), ) db.conn.commit() info = None # If there's no bundle entry, create the task. if info is None: self.create_task(obj_type, obj_id, sticky) if email is not None: # If the task is already done, send the email directly if info is not None and info["task_status"] == "done": self.send_notification( - None, email, obj_type, obj_id, info["task_status"] + None, email, obj_type, hex_id, info["task_status"] ) # Else, add it to the notification queue else: self.add_notif_email(obj_type, obj_id, email) - info = self.task_info(obj_type, obj_id) - return info + return self.progress(obj_type, obj_id) @db_transaction() - def batch_cook(self, batch, db=None, cur=None): - """Cook a batch of bundles and returns the cooking id.""" + def batch_cook( + self, batch: List[Tuple[str, str]], db=None, cur=None + ) -> Dict[str, int]: # Import execute_values at runtime only, because it requires # psycopg2 >= 2.7 (only available on postgresql servers) from psycopg2.extras import execute_values + for obj_type, _ in batch: + if obj_type not in COOKER_TYPES: + raise NotFoundExc(f"{obj_type} is an unknown type.") + cur.execute( """ INSERT INTO vault_batch (id) VALUES (DEFAULT) RETURNING id""" ) batch_id = cur.fetchone()["id"] - batch = batch_to_bytes(batch) + batch_bytes = batch_to_bytes(batch) # Delete all failed bundles from the batch cur.execute( """ DELETE FROM vault_bundle WHERE task_status = 'failed' AND (type, object_id) IN %s""", - (tuple(batch),), + (tuple(batch_bytes),), ) # Insert all the bundles, return the new ones execute_values( cur, """ INSERT INTO vault_bundle (type, object_id) VALUES %s ON CONFLICT DO NOTHING""", - batch, + batch_bytes, ) # Get the bundle ids and task status cur.execute( """ SELECT id, type, object_id, task_id FROM vault_bundle WHERE (type, object_id) IN %s""", - (tuple(batch),), + (tuple(batch_bytes),), ) bundles = cur.fetchall() # Insert the batch-bundle entries batch_id_bundle_ids = [(batch_id, row["id"]) for row in bundles] execute_values( cur, """ INSERT INTO vault_batch_bundle (batch_id, bundle_id) VALUES %s ON CONFLICT DO NOTHING""", batch_id_bundle_ids, ) db.conn.commit() # Get the tasks to fetch batch_new = [ (row["type"], bytes(row["object_id"])) for row in bundles if row["task_id"] is None ] # Send the tasks args_batch = [ (obj_type, hashutil.hash_to_hex(obj_id)) for obj_type, obj_id in batch_new ] # TODO: change once the scheduler handles priority tasks tasks = [ create_oneshot_task_dict("swh-vault-batch-cooking", *args) for args in args_batch ] added_tasks = self.scheduler.create_tasks(tasks) - tasks_ids_bundle_ids = zip([task["id"] for task in added_tasks], batch_new) tasks_ids_bundle_ids = [ (task_id, obj_type, obj_id) - for task_id, (obj_type, obj_id) in tasks_ids_bundle_ids + for task_id, (obj_type, obj_id) in zip( + [task["id"] for task in added_tasks], batch_new + ) ] # Update the task ids execute_values( cur, """ UPDATE vault_bundle SET task_id = s_task_id FROM (VALUES %s) AS sub (s_task_id, s_type, s_object_id) WHERE type = s_type::cook_type AND object_id = s_object_id """, tasks_ids_bundle_ids, ) - return batch_id + return {"id": batch_id} @db_transaction() - def batch_info(self, batch_id, db=None, cur=None): - """Fetch information from a batch of bundles""" + def batch_progress(self, batch_id: int, db=None, cur=None) -> Dict[str, Any]: cur.execute( """ SELECT vault_bundle.id as id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_batch_bundle LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id WHERE batch_id = %s""", (batch_id,), ) - res = cur.fetchall() - if res: - for d in res: - d["object_id"] = bytes(d["object_id"]) + bundles = cur.fetchall() + if not bundles: + raise NotFoundExc(f"Batch {batch_id} does not exist.") + + for bundle in bundles: + bundle["object_id"] = hashutil.hash_to_hex(bundle["object_id"]) + + counter = collections.Counter(b["status"] for b in bundles) + res = { + "bundles": bundles, + "total": len(bundles), + **{k: 0 for k in ("new", "pending", "done", "failed")}, + **dict(counter), + } + return res @db_transaction() - def is_available(self, obj_type, obj_id, db=None, cur=None): + def is_available(self, obj_type: str, obj_id: ObjectId, db=None, cur=None): """Check whether a bundle is available for retrieval""" - info = self.task_info(obj_type, obj_id, cur=cur) + info = self.progress(obj_type, obj_id, raise_notfound=False, cur=cur) + obj_id = hashutil.hash_to_bytes(obj_id) return ( info is not None and info["task_status"] == "done" and self.cache.is_cached(obj_type, obj_id) ) @db_transaction() - def fetch(self, obj_type, obj_id, db=None, cur=None): + def fetch( + self, obj_type: str, obj_id: ObjectId, raise_notfound=True, db=None, cur=None + ): """Retrieve a bundle from the cache""" - if not self.is_available(obj_type, obj_id, cur=cur): + hex_id, obj_id = self._compute_ids(obj_id) + available = self.is_available(obj_type, obj_id, cur=cur) + if not available: + if raise_notfound: + raise NotFoundExc(f"{obj_type} {hex_id} is not available.") return None self.update_access_ts(obj_type, obj_id, cur=cur) return self.cache.get(obj_type, obj_id) @db_transaction() - def update_access_ts(self, obj_type, obj_id, db=None, cur=None): + def update_access_ts(self, obj_type: str, obj_id: bytes, db=None, cur=None): """Update the last access timestamp of a bundle""" - obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( """ UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s""", (obj_type, obj_id), ) @db_transaction() - def set_status(self, obj_type, obj_id, status, db=None, cur=None): - """Set the cooking status of a bundle""" + def set_status( + self, obj_type: str, obj_id: ObjectId, status: str, db=None, cur=None + ) -> bool: obj_id = hashutil.hash_to_bytes(obj_id) req = ( """ UPDATE vault_bundle SET task_status = %s """ + (""", ts_done = NOW() """ if status == "done" else "") + """WHERE type = %s AND object_id = %s""" ) cur.execute(req, (status, obj_type, obj_id)) + return True @db_transaction() - def set_progress(self, obj_type, obj_id, progress, db=None, cur=None): - """Set the cooking progress of a bundle""" + def set_progress( + self, obj_type: str, obj_id: ObjectId, progress: str, db=None, cur=None + ) -> bool: obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( """ UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s""", (progress, obj_type, obj_id), ) + return True @db_transaction() - def send_all_notifications(self, obj_type, obj_id, db=None, cur=None): - """Send all the e-mails in the notification list of a bundle""" - obj_id = hashutil.hash_to_bytes(obj_id) + def send_notif(self, obj_type: str, obj_id: ObjectId, db=None, cur=None) -> bool: + hex_id, obj_id = self._compute_ids(obj_id) cur.execute( """ SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s""", (obj_type, obj_id), ) for d in cur: self.send_notification( d["id"], d["email"], obj_type, - obj_id, + hex_id, status=d["task_status"], progress_msg=d["progress_msg"], ) + return True @db_transaction() def send_notification( self, - n_id, - email, - obj_type, - obj_id, - status, - progress_msg=None, + n_id: Optional[int], + email: str, + obj_type: str, + hex_id: str, + status: str, + progress_msg: Optional[str] = None, db=None, cur=None, - ): + ) -> None: """Send the notification of a bundle to a specific e-mail""" - hex_id = hashutil.hash_to_hex(obj_id) short_id = hex_id[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = "https://archive.softwareheritage.org/api/1/vault/{}/{}/" "raw".format( obj_type, hex_id ) if status == "done": text = NOTIF_EMAIL_BODY_SUCCESS.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_SUCCESS.format( obj_type=obj_type, short_id=short_id ) elif status == "failed": text = NOTIF_EMAIL_BODY_FAILURE.strip() text = text.format( obj_type=obj_type, hex_id=hex_id, progress_msg=progress_msg ) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_FAILURE.format( obj_type=obj_type, short_id=short_id ) else: raise RuntimeError( "send_notification called on a '{}' bundle".format(status) ) msg["From"] = NOTIF_EMAIL_FROM msg["To"] = email self._smtp_send(msg) if n_id is not None: cur.execute( """ DELETE FROM vault_notif_email WHERE id = %s""", (n_id,), ) - def _smtp_send(self, msg): + def _smtp_send(self, msg: MIMEText): # Reconnect if needed try: status = self.smtp_server.noop()[0] except smtplib.SMTPException: status = -1 if status != 250: self.smtp_server.connect("localhost", 25) # Send the message self.smtp_server.send_message(msg) @db_transaction() - def _cache_expire(self, cond, *args, db=None, cur=None): + def _cache_expire(self, cond, *args, db=None, cur=None) -> None: """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cur.execute( """ DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, object_id """.format( cond ), args, ) for d in cur: self.cache.delete(d["type"], bytes(d["object_id"])) @db_transaction() - def cache_expire_oldest(self, n=1, by="last_access", db=None, cur=None): + def cache_expire_oldest(self, n=1, by="last_access", db=None, cur=None) -> None: """Expire the `n` oldest bundles""" assert by in ("created", "done", "last_access") filter = """ORDER BY ts_{} LIMIT {}""".format(by, n) return self._cache_expire(filter) @db_transaction() - def cache_expire_until(self, date, by="last_access", db=None, cur=None): + def cache_expire_until(self, date, by="last_access", db=None, cur=None) -> None: """Expire all the bundles until a certain date""" assert by in ("created", "done", "last_access") filter = """AND ts_{} <= %s""".format(by) return self._cache_expire(filter, date) diff --git a/swh/vault/cookers/__init__.py b/swh/vault/cookers/__init__.py index 474eb62..0a9df00 100644 --- a/swh/vault/cookers/__init__.py +++ b/swh/vault/cookers/__init__.py @@ -1,57 +1,96 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information + +from __future__ import annotations + import os +from typing import Any, Dict from swh.core.config import load_named_config from swh.core.config import read as read_config from swh.storage import get_storage from swh.vault import get_vault from swh.vault.cookers.base import DEFAULT_CONFIG, DEFAULT_CONFIG_PATH from swh.vault.cookers.directory import DirectoryCooker from swh.vault.cookers.revision_flat import RevisionFlatCooker from swh.vault.cookers.revision_gitfast import RevisionGitfastCooker COOKER_TYPES = { "directory": DirectoryCooker, "revision_flat": RevisionFlatCooker, "revision_gitfast": RevisionGitfastCooker, } def get_cooker_cls(obj_type): return COOKER_TYPES[obj_type] -def get_cooker(obj_type, obj_id): - if "SWH_CONFIG_FILENAME" in os.environ: - cfg = read_config(os.environ["SWH_CONFIG_FILENAME"], DEFAULT_CONFIG) - else: - cfg = load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG) - cooker_cls = get_cooker_cls(obj_type) +def check_config(cfg: Dict[str, Any]) -> Dict[str, Any]: + """Ensure the configuration is ok to run a vault worker, and propagate defaults + + Raises: + EnvironmentError if the configuration is not for remote instance + ValueError if one of the following keys is missing: vault, storage + + Returns: + New configuration dict to instantiate a vault worker instance + + """ + cfg = cfg.copy() + if "vault" not in cfg: - raise ValueError("missing '%vault' configuration") + raise ValueError("missing 'vault' configuration") vcfg = cfg["vault"] if vcfg["cls"] != "remote": raise EnvironmentError( - "This vault backend can only be a 'remote' " "configuration", err=True + "This vault backend can only be a 'remote' configuration" ) - args = vcfg["args"] - if "storage" not in args: - args["storage"] = cfg.get("storage") - if not args.get("storage"): - raise ValueError("invalid configuration; missing 'storage' config entry.") + # TODO: Soft-deprecation of args key. Remove when ready. + vcfg.update(vcfg.get("args", {})) + + # Default to top-level value if any + if "storage" not in vcfg: + vcfg["storage"] = cfg.get("storage") + + if not vcfg.get("storage"): + raise ValueError("invalid configuration: missing 'storage' config entry.") + + return cfg + + +def get_cooker(obj_type: str, obj_id: str): + """Instantiate a cooker class of type obj_type. + + Returns: + Cooker class in charge of cooking the obj_type with id obj_id. + + Raises: + ValueError in case of a missing top-level vault key configuration or a storage + key. + EnvironmentError in case the vault configuration reference a non remote class. + + """ + if "SWH_CONFIG_FILENAME" in os.environ: + cfg = read_config(os.environ["SWH_CONFIG_FILENAME"], DEFAULT_CONFIG) + else: + cfg = load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG) + cooker_cls = get_cooker_cls(obj_type) + + cfg = check_config(cfg) + vcfg = cfg["vault"] - storage = get_storage(**args.pop("storage")) + storage = get_storage(**vcfg.pop("storage")) backend = get_vault(**vcfg) return cooker_cls( obj_type, obj_id, backend=backend, storage=storage, max_bundle_size=cfg["max_bundle_size"], ) diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index 657ddba..a9c32f5 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,138 +1,136 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import logging from typing import Optional from psycopg2.extensions import QueryCanceledError from swh.model import hashutil MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB DEFAULT_CONFIG_PATH = "vault/cooker" DEFAULT_CONFIG = { - "storage": ("dict", {"cls": "remote", "args": {"url": "http://localhost:5002/",},}), - "vault": ("dict", {"cls": "remote", "args": {"url": "http://localhost:5005/",},}), "max_bundle_size": ("int", MAX_BUNDLE_SIZE), } class PolicyError(Exception): """Raised when the bundle violates the cooking policy.""" pass class BundleTooLargeError(PolicyError): """Raised when the bundle is too large to be cooked.""" pass class BytesIOBundleSizeLimit(io.BytesIO): def __init__(self, *args, size_limit=None, **kwargs): super().__init__(*args, **kwargs) self.size_limit = size_limit def write(self, chunk): if ( self.size_limit is not None and self.getbuffer().nbytes + len(chunk) > self.size_limit ): raise BundleTooLargeError( "The requested bundle exceeds the maximum allowed " "size of {} bytes.".format(self.size_limit) ) return super().write(chunk) class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle """ CACHE_TYPE_KEY = None # type: Optional[str] def __init__( self, obj_type, obj_id, backend, storage, max_bundle_size=MAX_BUNDLE_SIZE ): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: obj_type: type of the object to be cooked into a bundle (directory, revision_flat or revision_gitfast; see swh.vault.cooker.COOKER_TYPES). obj_id: id of the object to be cooked into a bundle. backend: the vault backend (swh.vault.backend.VaultBackend). """ self.obj_type = obj_type self.obj_id = hashutil.hash_to_bytes(obj_id) self.backend = backend self.storage = storage self.max_bundle_size = max_bundle_size @abc.abstractmethod def check_exists(self): """Checks that the requested object exists and can be cooked. Override this in the cooker implementation. """ raise NotImplementedError @abc.abstractmethod def prepare_bundle(self): """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplementedError def write(self, chunk): self.fileobj.write(chunk) def cook(self): """Cook the requested object into a bundle """ self.backend.set_status(self.obj_type, self.obj_id, "pending") self.backend.set_progress(self.obj_type, self.obj_id, "Processing...") self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) try: try: self.prepare_bundle() except QueryCanceledError: raise PolicyError( "Timeout reached while assembling the requested bundle" ) bundle = self.fileobj.getvalue() # TODO: use proper content streaming instead of put_bundle() self.backend.put_bundle(self.CACHE_TYPE_KEY, self.obj_id, bundle) except PolicyError as e: self.backend.set_status(self.obj_type, self.obj_id, "failed") self.backend.set_progress(self.obj_type, self.obj_id, str(e)) except Exception: self.backend.set_status(self.obj_type, self.obj_id, "failed") self.backend.set_progress( self.obj_type, self.obj_id, "Internal Server Error. This incident will be reported.", ) logging.exception("Bundle cooking failed.") else: self.backend.set_status(self.obj_type, self.obj_id, "done") self.backend.set_progress(self.obj_type, self.obj_id, None) finally: self.backend.send_notif(self.obj_type, self.obj_id) diff --git a/swh/vault/cookers/revision_gitfast.py b/swh/vault/cookers/revision_gitfast.py index a47d0d8..a1b97f3 100644 --- a/swh/vault/cookers/revision_gitfast.py +++ b/swh/vault/cookers/revision_gitfast.py @@ -1,219 +1,219 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import os import time import zlib from fastimport.commands import ( BlobCommand, CommitCommand, FileDeleteCommand, FileModifyCommand, ResetCommand, ) from swh.model import hashutil -from swh.model.from_disk import mode_to_perms +from swh.model.from_disk import DentryPerms, mode_to_perms from swh.model.toposort import toposort from swh.vault.cookers.base import BaseVaultCooker from swh.vault.cookers.utils import revision_log from swh.vault.to_disk import get_filtered_files_content class RevisionGitfastCooker(BaseVaultCooker): """Cooker to create a git fast-import bundle """ CACHE_TYPE_KEY = "revision_gitfast" def check_exists(self): return not list(self.storage.revision_missing([self.obj_id])) def prepare_bundle(self): self.log = list(toposort(revision_log(self.storage, self.obj_id))) self.gzobj = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) self.fastexport() self.write(self.gzobj.flush()) def write_cmd(self, cmd): chunk = bytes(cmd) + b"\n" super().write(self.gzobj.compress(chunk)) def fastexport(self): """Generate all the git fast-import commands from a given log. """ self.rev_by_id = {r["id"]: r for r in self.log} self.obj_done = set() self.obj_to_mark = {} self.next_available_mark = 1 last_progress_report = None for i, rev in enumerate(self.log, 1): # Update progress if needed ct = time.time() if last_progress_report is None or last_progress_report + 2 <= ct: last_progress_report = ct pg = "Computing revision {}/{}".format(i, len(self.log)) self.backend.set_progress(self.obj_type, self.obj_id, pg) # Compute the current commit self._compute_commit_command(rev) def mark(self, obj_id): """Get the mark ID as bytes of a git object. If the object has not yet been marked, assign a new ID and add it to the mark dictionary. """ if obj_id not in self.obj_to_mark: self.obj_to_mark[obj_id] = self.next_available_mark self.next_available_mark += 1 return str(self.obj_to_mark[obj_id]).encode() def _compute_blob_command_content(self, file_data): """Compute the blob command of a file entry if it has not been computed yet. """ obj_id = file_data["sha1"] if obj_id in self.obj_done: return contents = list(get_filtered_files_content(self.storage, [file_data])) content = contents[0]["content"] self.write_cmd(BlobCommand(mark=self.mark(obj_id), data=content)) self.obj_done.add(obj_id) def _author_tuple_format(self, author, date): # We never want to have None values here so we replace null entries # by ''. if author is not None: author_tuple = (author.get("name") or b"", author.get("email") or b"") else: author_tuple = (b"", b"") if date is not None: date_tuple = ( date.get("timestamp", {}).get("seconds") or 0, (date.get("offset") or 0) * 60, ) else: date_tuple = (0, 0) return author_tuple + date_tuple def _compute_commit_command(self, rev): """Compute a commit command from a specific revision. """ if "parents" in rev and rev["parents"]: from_ = b":" + self.mark(rev["parents"][0]) merges = [b":" + self.mark(r) for r in rev["parents"][1:]] parent = self.rev_by_id[rev["parents"][0]] else: # We issue a reset command before all the new roots so that they # are not automatically added as children of the current branch. self.write_cmd(ResetCommand(b"refs/heads/master", None)) from_ = None merges = None parent = None # Retrieve the file commands while yielding new blob commands if # needed. files = list(self._compute_file_commands(rev, parent)) # Construct and write the commit command author = self._author_tuple_format(rev["author"], rev["date"]) committer = self._author_tuple_format(rev["committer"], rev["committer_date"]) self.write_cmd( CommitCommand( ref=b"refs/heads/master", mark=self.mark(rev["id"]), author=author, committer=committer, message=rev["message"] or b"", from_=from_, merges=merges, file_iter=files, ) ) @functools.lru_cache(maxsize=4096) def _get_dir_ents(self, dir_id=None): """Get the entities of a directory as a dictionary (name -> entity). This function has a cache to avoid doing multiple requests to retrieve the same entities, as doing a directory_ls() is expensive. """ data = self.storage.directory_ls(dir_id) if dir_id is not None else [] return {f["name"]: f for f in data} def _compute_file_commands(self, rev, parent=None): """Compute all the file commands of a revision. Generate a diff of the files between the revision and its main parent to find the necessary file commands to apply. """ # Initialize the stack with the root of the tree. cur_dir = rev["directory"] parent_dir = parent["directory"] if parent else None stack = [(b"", cur_dir, parent_dir)] while stack: # Retrieve the current directory and the directory of the parent # commit in order to compute the diff of the trees. root, cur_dir_id, prev_dir_id = stack.pop() cur_dir = self._get_dir_ents(cur_dir_id) prev_dir = self._get_dir_ents(prev_dir_id) # Find subtrees to delete: # - Subtrees that are not in the new tree (file or directory # deleted). # - Subtrees that do not have the same type in the new tree # (file -> directory or directory -> file) # After this step, every node remaining in the previous directory # has the same type than the one in the current directory. for fname, f in prev_dir.items(): if fname not in cur_dir or f["type"] != cur_dir[fname]["type"]: yield FileDeleteCommand(path=os.path.join(root, fname)) # Find subtrees to modify: # - Leaves (files) will be added or modified using `filemodify` # - Other subtrees (directories) will be added to the stack and # processed in the next iteration. for fname, f in cur_dir.items(): # A file is added or modified if it was not in the tree, if its # permissions changed or if its content changed. if f["type"] == "file" and ( fname not in prev_dir or f["sha1"] != prev_dir[fname]["sha1"] or f["perms"] != prev_dir[fname]["perms"] ): # Issue a blob command for the new blobs if needed. self._compute_blob_command_content(f) yield FileModifyCommand( path=os.path.join(root, fname), mode=mode_to_perms(f["perms"]).value, dataref=(b":" + self.mark(f["sha1"])), data=None, ) # A revision is added or modified if it was not in the tree or # if its target changed elif f["type"] == "rev" and ( fname not in prev_dir or f["target"] != prev_dir[fname]["target"] ): yield FileModifyCommand( path=os.path.join(root, fname), - mode=0o160000, + mode=DentryPerms.revision, dataref=hashutil.hash_to_hex(f["target"]).encode(), data=None, ) # A directory is added or modified if it was not in the tree or # if its target changed. elif f["type"] == "dir": f_prev_target = None if fname in prev_dir and prev_dir[fname]["type"] == "dir": f_prev_target = prev_dir[fname]["target"] if f_prev_target is None or f["target"] != f_prev_target: stack.append( (os.path.join(root, fname), f["target"], f_prev_target) ) diff --git a/swh/vault/interface.py b/swh/vault/interface.py new file mode 100644 index 0000000..6657c2a --- /dev/null +++ b/swh/vault/interface.py @@ -0,0 +1,70 @@ +# Copyright (C) 2017-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, List, Optional, Tuple, Union + +from typing_extensions import Protocol, runtime_checkable + +from swh.core.api import remote_api_endpoint + +ObjectId = Union[str, bytes] + + +@runtime_checkable +class VaultInterface(Protocol): + """ + Backend Interface for the Software Heritage vault. + """ + + @remote_api_endpoint("fetch") + def fetch(self, obj_type: str, obj_id: ObjectId) -> Dict[str, Any]: + """Fetch information from a bundle""" + ... + + @remote_api_endpoint("cook") + def cook( + self, obj_type: str, obj_id: ObjectId, email: Optional[str] = None + ) -> Dict[str, Any]: + """Main entry point for cooking requests. This starts a cooking task if + needed, and add the given e-mail to the notify list""" + ... + + @remote_api_endpoint("progress") + def progress(self, obj_type: str, obj_id: ObjectId): + ... + + # Cookers endpoints + + @remote_api_endpoint("set_progress") + def set_progress(self, obj_type: str, obj_id: ObjectId, progress: str) -> None: + """Set the cooking progress of a bundle""" + ... + + @remote_api_endpoint("set_status") + def set_status(self, obj_type: str, obj_id: ObjectId, status: str) -> None: + """Set the cooking status of a bundle""" + ... + + @remote_api_endpoint("put_bundle") + def put_bundle(self, obj_type: str, obj_id: ObjectId, bundle): + """Store bundle in vault cache""" + ... + + @remote_api_endpoint("send_notif") + def send_notif(self, obj_type: str, obj_id: ObjectId): + """Send all the e-mails in the notification list of a bundle""" + ... + + # Batch endpoints + + @remote_api_endpoint("batch_cook") + def batch_cook(self, batch: List[Tuple[str, str]]) -> int: + """Cook a batch of bundles and returns the cooking id.""" + ... + + @remote_api_endpoint("batch_progress") + def batch_progress(self, batch_id: int) -> Dict[str, Any]: + """Fetch information from a batch of bundles""" + ... diff --git a/swh/vault/sql/30-swh-schema.sql b/swh/vault/sql/30-schema.sql similarity index 100% rename from swh/vault/sql/30-swh-schema.sql rename to swh/vault/sql/30-schema.sql diff --git a/swh/vault/tests/conftest.py b/swh/vault/tests/conftest.py index 9090e46..67e531f 100644 --- a/swh/vault/tests/conftest.py +++ b/swh/vault/tests/conftest.py @@ -1,88 +1,97 @@ -import glob +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + import os -import subprocess +from typing import Any, Dict import pkg_resources.extern.packaging.version import pytest -from pytest_postgresql import factories +import yaml -from swh.core.utils import numfile_sortkey as sortkey +from swh.core.db.pytest_plugin import postgresql_fact from swh.storage.tests import SQL_DIR as STORAGE_SQL_DIR +import swh.vault from swh.vault import get_vault -from swh.vault.tests import SQL_DIR os.environ["LC_ALL"] = "C.UTF-8" pytest_v = pkg_resources.get_distribution("pytest").parsed_version if pytest_v < pkg_resources.extern.packaging.version.parse("3.9"): @pytest.fixture - def tmp_path(request): + def tmp_path(): import pathlib import tempfile with tempfile.TemporaryDirectory() as tmpdir: yield pathlib.Path(tmpdir) def db_url(name, postgresql_proc): return "postgresql://{user}@{host}:{port}/{dbname}".format( host=postgresql_proc.host, port=postgresql_proc.port, user="postgres", dbname=name, ) -postgresql2 = factories.postgresql("postgresql_proc", "tests2") +VAULT_SQL_DIR = os.path.join(os.path.dirname(swh.vault.__file__), "sql") + + +postgres_vault = postgresql_fact( + "postgresql_proc", db_name="vault", dump_files=f"{VAULT_SQL_DIR}/*.sql" +) +postgres_storage = postgresql_fact( + "postgresql_proc", db_name="storage", dump_files=f"{STORAGE_SQL_DIR}/*.sql" +) @pytest.fixture -def swh_vault(request, postgresql_proc, postgresql, postgresql2, tmp_path): - - for sql_dir, pg in ((SQL_DIR, postgresql), (STORAGE_SQL_DIR, postgresql2)): - dump_files = os.path.join(sql_dir, "*.sql") - all_dump_files = sorted(glob.glob(dump_files), key=sortkey) - - for fname in all_dump_files: - subprocess.check_call( - [ - "psql", - "--quiet", - "--no-psqlrc", - "-v", - "ON_ERROR_STOP=1", - "-d", - pg.dsn, - "-f", - fname, - ] - ) - - vault_config = { - "db": db_url("tests", postgresql_proc), +def swh_vault_config(postgres_vault, postgres_storage, tmp_path) -> Dict[str, Any]: + tmp_path = str(tmp_path) + return { + "db": postgres_vault.dsn, "storage": { "cls": "local", - "db": db_url("tests2", postgresql_proc), + "db": postgres_storage.dsn, "objstorage": { "cls": "pathslicing", - "args": {"root": str(tmp_path), "slicing": "0:1/1:5",}, + "args": {"root": tmp_path, "slicing": "0:1/1:5",}, }, }, "cache": { "cls": "pathslicing", - "args": { - "root": str(tmp_path), - "slicing": "0:1/1:5", - "allow_delete": True, - }, + "args": {"root": tmp_path, "slicing": "0:1/1:5", "allow_delete": True}, }, "scheduler": {"cls": "remote", "url": "http://swh-scheduler:5008",}, } - return get_vault("local", vault_config) + +@pytest.fixture +def swh_local_vault_config(swh_vault_config: Dict[str, Any]) -> Dict[str, Any]: + return { + "vault": {"cls": "local", **swh_vault_config}, + "client_max_size": 1024 ** 3, + } + + +@pytest.fixture +def swh_vault_config_file(swh_local_vault_config, monkeypatch, tmp_path): + conf_path = os.path.join(str(tmp_path), "vault-server.yml") + with open(conf_path, "w") as f: + f.write(yaml.dump(swh_local_vault_config)) + monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) + return conf_path + + +@pytest.fixture +def swh_vault(swh_vault_config): + return get_vault("local", **swh_vault_config) @pytest.fixture def swh_storage(swh_vault): return swh_vault.storage diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py index c699e55..2ec68e5 100644 --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -1,336 +1,353 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime from unittest.mock import MagicMock, patch +import attr import psycopg2 import pytest from swh.model import hashutil +from swh.vault.exc import NotFoundExc from swh.vault.tests.vault_testing import hash_content @contextlib.contextmanager def mock_cooking(vault_backend): with patch.object(vault_backend, "_send_task") as mt: mt.return_value = 42 with patch("swh.vault.backend.get_cooker_cls") as mg: mcc = MagicMock() mc = MagicMock() mg.return_value = mcc mcc.return_value = mc mc.check_exists.return_value = True yield { "_send_task": mt, "get_cooker_cls": mg, "cooker_cls": mcc, "cooker": mc, } def assertTimestampAlmostNow(ts, tolerance_secs=1.0): # noqa now = datetime.datetime.now(datetime.timezone.utc) creation_delta_secs = (ts - now).total_seconds() assert creation_delta_secs < tolerance_secs def fake_cook(backend, obj_type, result_content, sticky=False): content, obj_id = hash_content(result_content) with mock_cooking(backend): backend.create_task(obj_type, obj_id, sticky) backend.cache.add(obj_type, obj_id, b"content") backend.set_status(obj_type, obj_id, "done") return obj_id, content def fail_cook(backend, obj_type, obj_id, failure_reason): with mock_cooking(backend): backend.create_task(obj_type, obj_id) backend.set_status(obj_type, obj_id, "failed") backend.set_progress(obj_type, obj_id, failure_reason) TEST_TYPE = "revision_gitfast" TEST_HEX_ID = "4a4b9771542143cf070386f86b4b92d42966bdbc" TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) TEST_PROGRESS = ( "Mr. White, You're telling me you're cooking again?" " \N{ASTONISHED FACE} " ) TEST_EMAIL = "ouiche@lorraine.fr" +@pytest.fixture +def swh_vault(swh_vault, sample_data): + # make the vault's storage consistent with test data + revision = attr.evolve(sample_data.revision, id=TEST_OBJ_ID) + swh_vault.storage.revision_add([revision]) + return swh_vault + + def test_create_task_simple(swh_vault): with mock_cooking(swh_vault) as m: - swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) m["get_cooker_cls"].assert_called_once_with(TEST_TYPE) args = m["cooker_cls"].call_args[0] assert args[0] == TEST_TYPE assert args[1] == TEST_HEX_ID assert m["cooker"].check_exists.call_count == 1 assert m["_send_task"].call_count == 1 args = m["_send_task"].call_args[0] assert args[0] == TEST_TYPE assert args[1] == TEST_HEX_ID - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) - assert info["object_id"] == TEST_OBJ_ID + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) + assert info["object_id"] == TEST_HEX_ID assert info["type"] == TEST_TYPE assert info["task_status"] == "new" assert info["task_id"] == 42 assertTimestampAlmostNow(info["ts_created"]) assert info["ts_done"] is None assert info["progress_msg"] is None def test_create_fail_duplicate_task(swh_vault): with mock_cooking(swh_vault): - swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) with pytest.raises(psycopg2.IntegrityError): - swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) def test_create_fail_nonexisting_object(swh_vault): with mock_cooking(swh_vault) as m: m["cooker"].check_exists.side_effect = ValueError("Nothing here.") with pytest.raises(ValueError): - swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) def test_create_set_progress(swh_vault): with mock_cooking(swh_vault): - swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["progress_msg"] is None - swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, TEST_PROGRESS) - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + swh_vault.set_progress(TEST_TYPE, TEST_HEX_ID, TEST_PROGRESS) + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["progress_msg"] == TEST_PROGRESS def test_create_set_status(swh_vault): with mock_cooking(swh_vault): - swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["task_status"] == "new" assert info["ts_done"] is None - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "pending") - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "pending") + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["task_status"] == "pending" assert info["ts_done"] is None - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "done") - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["task_status"] == "done" assertTimestampAlmostNow(info["ts_done"]) def test_create_update_access_ts(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + info = swh_vault.progress(TEST_TYPE, TEST_OBJ_ID) access_ts_1 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_1) swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + info = swh_vault.progress(TEST_TYPE, TEST_OBJ_ID) access_ts_2 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_2) swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + info = swh_vault.progress(TEST_TYPE, TEST_OBJ_ID) + access_ts_3 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_3) assert access_ts_1 < access_ts_2 assert access_ts_2 < access_ts_3 -def test_cook_request_idempotent(swh_vault): +def test_cook_idempotent(swh_vault, sample_data): with mock_cooking(swh_vault): - info1 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) - info2 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) - info3 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) + info1 = swh_vault.cook(TEST_TYPE, TEST_HEX_ID) + info2 = swh_vault.cook(TEST_TYPE, TEST_HEX_ID) + info3 = swh_vault.cook(TEST_TYPE, TEST_HEX_ID) assert info1 == info2 assert info1 == info3 def test_cook_email_pending_done(swh_vault): with mock_cooking(swh_vault), patch.object( swh_vault, "add_notif_email" ) as madd, patch.object(swh_vault, "send_notification") as msend: - swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) + swh_vault.cook(TEST_TYPE, TEST_HEX_ID) madd.assert_not_called() msend.assert_not_called() madd.reset_mock() msend.reset_mock() - swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) + swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email=TEST_EMAIL) madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) msend.assert_not_called() madd.reset_mock() msend.reset_mock() - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "done") - swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) - msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_OBJ_ID, "done") + swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") + swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email=TEST_EMAIL) + msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_HEX_ID, "done") madd.assert_not_called() def test_send_all_emails(swh_vault): with mock_cooking(swh_vault): emails = ("a@example.com", "billg@example.com", "test+42@example.org") for email in emails: - swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email=email) + swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email=email) - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "done") + swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") with patch.object(swh_vault, "smtp_server") as m: - swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) + swh_vault.send_notif(TEST_TYPE, TEST_HEX_ID) sent_emails = {k[0][0] for k in m.send_message.call_args_list} assert {k["To"] for k in sent_emails} == set(emails) for e in sent_emails: assert "bot@softwareheritage.org" in e["From"] assert TEST_TYPE in e["Subject"] assert TEST_HEX_ID[:5] in e["Subject"] assert TEST_TYPE in str(e) assert "https://archive.softwareheritage.org/" in str(e) assert TEST_HEX_ID[:5] in str(e) assert "--\x20\n" in str(e) # Well-formated signature!!! # Check that the entries have been deleted and recalling the # function does not re-send the e-mails m.reset_mock() - swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) + swh_vault.send_notif(TEST_TYPE, TEST_HEX_ID) m.assert_not_called() def test_available(swh_vault): - assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) + assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) with mock_cooking(swh_vault): - swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) - assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) + swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) + assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) - swh_vault.cache.add(TEST_TYPE, TEST_OBJ_ID, b"content") - assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) + swh_vault.cache.add(TEST_TYPE, TEST_HEX_ID, b"content") + assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "done") - assert swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) + swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") + assert swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) def test_fetch(swh_vault): - assert swh_vault.fetch(TEST_TYPE, TEST_OBJ_ID) is None + assert swh_vault.fetch(TEST_TYPE, TEST_HEX_ID, raise_notfound=False) is None + + with pytest.raises( + NotFoundExc, match=f"{TEST_TYPE} {TEST_HEX_ID} is not available." + ): + swh_vault.fetch(TEST_TYPE, TEST_HEX_ID) + obj_id, content = fake_cook(swh_vault, TEST_TYPE, b"content") - info = swh_vault.task_info(TEST_TYPE, obj_id) + info = swh_vault.progress(TEST_TYPE, obj_id) access_ts_before = info["ts_last_access"] assert swh_vault.fetch(TEST_TYPE, obj_id) == b"content" - info = swh_vault.task_info(TEST_TYPE, obj_id) + info = swh_vault.progress(TEST_TYPE, obj_id) access_ts_after = info["ts_last_access"] assertTimestampAlmostNow(access_ts_after) assert access_ts_before < access_ts_after def test_cache_expire_oldest(swh_vault): r = range(1, 10) inserted = {} for i in r: sticky = i == 5 content = b"content%s" % str(i).encode() obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) swh_vault.cache_expire_oldest(n=4) should_be_still_here = {2, 3, 5, 8, 9} for i in r: assert swh_vault.is_available(TEST_TYPE, inserted[i][0]) == ( i in should_be_still_here ) def test_cache_expire_until(swh_vault): r = range(1, 10) inserted = {} for i in r: sticky = i == 5 content = b"content%s" % str(i).encode() obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) if i == 7: cutoff_date = datetime.datetime.now() swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) swh_vault.cache_expire_until(date=cutoff_date) should_be_still_here = {2, 3, 5, 8, 9} for i in r: assert swh_vault.is_available(TEST_TYPE, inserted[i][0]) == ( i in should_be_still_here ) def test_fail_cook_simple(swh_vault): - fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, "error42") - assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + fail_cook(swh_vault, TEST_TYPE, TEST_HEX_ID, "error42") + assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["progress_msg"] == "error42" def test_send_failure_email(swh_vault): with mock_cooking(swh_vault): - swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email="a@example.com") + swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email="a@example.com") - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "failed") - swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, "test error") + swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "failed") + swh_vault.set_progress(TEST_TYPE, TEST_HEX_ID, "test error") with patch.object(swh_vault, "smtp_server") as m: - swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) + swh_vault.send_notif(TEST_TYPE, TEST_HEX_ID) e = [k[0][0] for k in m.send_message.call_args_list][0] assert e["To"] == "a@example.com" assert "bot@softwareheritage.org" in e["From"] assert TEST_TYPE in e["Subject"] assert TEST_HEX_ID[:5] in e["Subject"] assert "fail" in e["Subject"] assert TEST_TYPE in str(e) assert TEST_HEX_ID[:5] in str(e) assert "test error" in str(e) assert "--\x20\n" in str(e) # Well-formated signature def test_retry_failed_bundle(swh_vault): - fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, "error42") - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + fail_cook(swh_vault, TEST_TYPE, TEST_HEX_ID, "error42") + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["task_status"] == "failed" with mock_cooking(swh_vault): - swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + swh_vault.cook(TEST_TYPE, TEST_HEX_ID) + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["task_status"] == "new" diff --git a/swh/vault/tests/test_init.py b/swh/vault/tests/test_init.py new file mode 100644 index 0000000..7f402d6 --- /dev/null +++ b/swh/vault/tests/test_init.py @@ -0,0 +1,55 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from swh.vault import get_vault +from swh.vault.api.client import RemoteVaultClient +from swh.vault.backend import VaultBackend + +SERVER_IMPLEMENTATIONS = [ + ("remote", RemoteVaultClient, {"url": "localhost"}), + ( + "local", + VaultBackend, + { + "db": "something", + "cache": {"cls": "memory", "args": {}}, + "storage": {"cls": "remote", "url": "mock://storage-url"}, + "scheduler": {"cls": "remote", "url": "mock://scheduler-url"}, + }, + ), +] + + +@pytest.fixture +def mock_psycopg2(mocker): + mocker.patch("swh.vault.backend.psycopg2.pool") + mocker.patch("swh.vault.backend.psycopg2.extras") + + +def test_init_get_vault_failure(): + with pytest.raises(ValueError, match="Unknown Vault class"): + get_vault("unknown-vault-storage") + + +@pytest.mark.parametrize("class_name,expected_class,kwargs", SERVER_IMPLEMENTATIONS) +def test_init_get_vault(class_name, expected_class, kwargs, mock_psycopg2): + concrete_vault = get_vault(class_name, **kwargs) + assert isinstance(concrete_vault, expected_class) + + +@pytest.mark.parametrize("class_name,expected_class,kwargs", SERVER_IMPLEMENTATIONS) +def test_init_get_vault_deprecation_warning( + class_name, expected_class, kwargs, mock_psycopg2 +): + with pytest.warns(DeprecationWarning): + concrete_vault = get_vault(class_name, args=kwargs) + assert isinstance(concrete_vault, expected_class) + + +def test_init_get_vault_ok(swh_vault_config): + concrete_vault = get_vault("local", **swh_vault_config) + assert isinstance(concrete_vault, VaultBackend) diff --git a/swh/vault/tests/test_init_cookers.py b/swh/vault/tests/test_init_cookers.py new file mode 100644 index 0000000..57a9f28 --- /dev/null +++ b/swh/vault/tests/test_init_cookers.py @@ -0,0 +1,112 @@ +# Copyright (C) 2017-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +from typing import Dict + +import pytest +import yaml + +from swh.vault.cookers import COOKER_TYPES, get_cooker +from swh.vault.tests.test_backend import TEST_HEX_ID + + +@pytest.fixture +def swh_cooker_config(): + return { + "vault": { + "cls": "remote", + "args": { + "url": "mock://vault-backend", + "storage": {"cls": "remote", "url": "mock://storage-url"}, + }, + } + } + + +def write_config_to_env(config: Dict, tmp_path, monkeypatch) -> str: + """Write the configuration dict into a temporary file, then reference that path to + SWH_CONFIG_FILENAME environment variable. + + """ + conf_path = os.path.join(str(tmp_path), "cooker.yml") + with open(conf_path, "w") as f: + f.write(yaml.dump(config)) + monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) + return conf_path + + +def test_write_to_env(swh_cooker_config, tmp_path, monkeypatch): + actual_path = write_config_to_env(swh_cooker_config, tmp_path, monkeypatch) + + assert os.path.exists(actual_path) is True + assert os.environ["SWH_CONFIG_FILENAME"] == actual_path + + with open(actual_path, "r") as f: + actual_config = yaml.safe_load(f.read()) + assert actual_config == swh_cooker_config + + +@pytest.mark.parametrize( + "config_ko,exception_class,exception_msg", + [ + ({}, ValueError, "missing 'vault' configuration"), + ( + {"vault": {"cls": "local"}}, + EnvironmentError, + "This vault backend can only be a 'remote' configuration", + ), + ( + {"vault": {"cls": "remote", "args": {"missing-storage-key": ""}}}, + ValueError, + "invalid configuration: missing 'storage' config entry", + ), + ], +) +def test_get_cooker_config_ko( + config_ko, exception_class, exception_msg, monkeypatch, tmp_path +): + """Misconfigured cooker should fail the instantiation with exception message + + """ + write_config_to_env(config_ko, tmp_path, monkeypatch) + + with pytest.raises(exception_class, match=exception_msg): + get_cooker("directory", TEST_HEX_ID) + + +@pytest.mark.parametrize( + "config_ok", + [ + { + "vault": { + "cls": "remote", + "args": { + "url": "mock://vault-backend", + "storage": {"cls": "remote", "url": "mock://storage-url"}, + }, + } + }, + { + "vault": {"cls": "remote", "args": {"url": "mock://vault-backend",},}, + "storage": {"cls": "remote", "url": "mock://storage-url"}, + }, + { + "vault": {"cls": "remote", "url": "mock://vault-backend",}, + "storage": {"cls": "remote", "url": "mock://storage-url"}, + }, + ], +) +def test_get_cooker_nominal(config_ok, tmp_path, monkeypatch): + """Correct configuration should allow the instantiation of the cookers + + """ + for cooker_type in COOKER_TYPES.keys(): + write_config_to_env(config_ok, tmp_path, monkeypatch) + + cooker = get_cooker(cooker_type, TEST_HEX_ID) + + assert cooker is not None + assert isinstance(cooker, COOKER_TYPES[cooker_type]) diff --git a/swh/vault/tests/test_server.py b/swh/vault/tests/test_server.py index 5bbbf14..0164cb6 100644 --- a/swh/vault/tests/test_server.py +++ b/swh/vault/tests/test_server.py @@ -1,55 +1,161 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import copy +import os +from typing import Any, Dict + import pytest +import yaml + +from swh.core.api.serializers import json_dumps, msgpack_dumps, msgpack_loads +from swh.vault.api.server import ( + VaultServerApp, + check_config, + make_app, + make_app_from_configfile, +) +from swh.vault.tests.test_backend import TEST_HEX_ID + + +def test_make_app_from_file_missing(): + with pytest.raises(ValueError, match="Missing configuration path."): + make_app_from_configfile() + + +def test_make_app_from_file_does_not_exist(tmp_path): + conf_path = os.path.join(str(tmp_path), "vault-server.yml") + assert os.path.exists(conf_path) is False + + with pytest.raises( + ValueError, match=f"Configuration path {conf_path} should exist." + ): + make_app_from_configfile(conf_path) + -from swh.core.api.serializers import msgpack_dumps, msgpack_loads -from swh.vault.api.server import make_app +def test_make_app_from_env_variable(swh_vault_config_file): + """Instantiation of the server should happen once (through environment variable) + + """ + app0 = make_app_from_configfile() + assert app0 is not None + app1 = make_app_from_configfile() + assert app1 == app0 + + +def test_make_app_from_file(swh_local_vault_config, tmp_path): + """Instantiation of the server should happen once (through environment variable) + + """ + conf_path = os.path.join(str(tmp_path), "vault-server.yml") + with open(conf_path, "w") as f: + f.write(yaml.dump(swh_local_vault_config)) + + app0 = make_app_from_configfile(conf_path) + assert app0 is not None + app1 = make_app_from_configfile(conf_path) + assert app1 == app0 @pytest.fixture -def client(swh_vault, loop, aiohttp_client): - app = make_app(backend=swh_vault) - return loop.run_until_complete(aiohttp_client(app)) +def async_app(swh_local_vault_config: Dict[str, Any],) -> VaultServerApp: + """Instantiate the vault server application. + + Note: This requires the db setup to run (fixture swh_vault in charge of this) + """ + return make_app(swh_local_vault_config) -async def test_index(client): - resp = await client.get("/") + +@pytest.fixture +def cli(async_app, aiohttp_client, loop): + return loop.run_until_complete(aiohttp_client(async_app)) + + +async def test_client_index(cli): + resp = await cli.get("/") assert resp.status == 200 -async def test_cook_notfound(client): - resp = await client.post("/cook/directory/000000") +async def test_client_cook_notfound(cli): + resp = await cli.post( + "/cook", + data=json_dumps({"obj_type": "directory", "obj_id": TEST_HEX_ID}), + headers=[("Content-Type", "application/json")], + ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["exception"]["type"] == "NotFoundExc" - assert content["exception"]["args"] == ["Object 000000 was not found."] + assert content["exception"]["args"] == [f"directory {TEST_HEX_ID} was not found."] -async def test_progress_notfound(client): - resp = await client.get("/progress/directory/000000") +async def test_client_progress_notfound(cli): + resp = await cli.post( + "/progress", + data=json_dumps({"obj_type": "directory", "obj_id": TEST_HEX_ID}), + headers=[("Content-Type", "application/json")], + ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["exception"]["type"] == "NotFoundExc" - assert content["exception"]["args"] == ["directory 000000 was not found."] + assert content["exception"]["args"] == [f"directory {TEST_HEX_ID} was not found."] -async def test_batch_cook_invalid_type(client): - data = msgpack_dumps([("foobar", [])]) - resp = await client.post( - "/batch_cook", data=data, headers={"Content-Type": "application/x-msgpack"} +async def test_client_batch_cook_invalid_type(cli): + resp = await cli.post( + "/batch_cook", + data=msgpack_dumps({"batch": [("foobar", [])]}), + headers={"Content-Type": "application/x-msgpack"}, ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["exception"]["type"] == "NotFoundExc" assert content["exception"]["args"] == ["foobar is an unknown type."] -async def test_batch_progress_notfound(client): - resp = await client.get("/batch_progress/1") +async def test_client_batch_progress_notfound(cli): + resp = await cli.post( + "/batch_progress", + data=msgpack_dumps({"batch_id": 1}), + headers={"Content-Type": "application/x-msgpack"}, + ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["exception"]["type"] == "NotFoundExc" assert content["exception"]["args"] == ["Batch 1 does not exist."] + + +def test_check_config_missing_vault_configuration() -> None: + """Irrelevant configuration file path raises""" + with pytest.raises(ValueError, match="missing 'vault' configuration"): + check_config({}) + + +def test_check_config_not_local() -> None: + """Wrong configuration raises""" + expected_error = ( + "The vault backend can only be started with a 'local' configuration" + ) + with pytest.raises(EnvironmentError, match=expected_error): + check_config({"vault": {"cls": "remote"}}) + + +@pytest.mark.parametrize("missing_key", ["storage", "cache", "scheduler"]) +def test_check_config_missing_key(missing_key, swh_vault_config) -> None: + """Any other configuration than 'local' (the default) is rejected""" + config_ok = {"vault": {"cls": "local", **swh_vault_config}} + config_ko = copy.deepcopy(config_ok) + config_ko["vault"].pop(missing_key, None) + + expected_error = f"invalid configuration: missing {missing_key} config entry" + with pytest.raises(ValueError, match=expected_error): + check_config(config_ko) + + +@pytest.mark.parametrize("missing_key", ["storage", "cache", "scheduler"]) +def test_check_config_ok(missing_key, swh_vault_config) -> None: + """Any other configuration than 'local' (the default) is rejected""" + config_ok = {"vault": {"cls": "local", **swh_vault_config}} + assert check_config(config_ok) is not None diff --git a/swh/vault/to_disk.py b/swh/vault/to_disk.py index 152040d..797bb31 100644 --- a/swh/vault/to_disk.py +++ b/swh/vault/to_disk.py @@ -1,139 +1,138 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections import functools import os from typing import Any, Dict, Iterator, List from swh.model import hashutil from swh.model.from_disk import DentryPerms, mode_to_perms from swh.storage.algos.dir_iterators import dir_iterator from swh.storage.interface import StorageInterface SKIPPED_MESSAGE = ( b"This content has not been retrieved in the " b"Software Heritage archive due to its size." ) HIDDEN_MESSAGE = b"This content is hidden." def get_filtered_files_content( storage: StorageInterface, files_data: List[Dict] ) -> Iterator[Dict[str, Any]]: """Retrieve the files specified by files_data and apply filters for skipped and missing contents. Args: storage: the storage from which to retrieve the objects files_data: list of file entries as returned by directory_ls() Yields: The entries given in files_data with a new 'content' key that points to the file content in bytes. The contents can be replaced by a specific message to indicate that they could not be retrieved (either due to privacy policy or because their sizes were too big for us to archive it). """ for file_data in files_data: status = file_data["status"] if status == "absent": content = SKIPPED_MESSAGE elif status == "hidden": content = HIDDEN_MESSAGE elif status == "visible": sha1 = file_data["sha1"] data = storage.content_get_data(sha1) if data is None: content = SKIPPED_MESSAGE else: content = data else: assert False, ( f"unexpected status {status!r} " f"for content {hashutil.hash_to_hex(file_data['target'])}" ) yield {"content": content, **file_data} def apply_chunked(func, input_list, chunk_size): """Apply func on input_list divided in chunks of size chunk_size""" for i in range(0, len(input_list), chunk_size): yield from func(input_list[i : i + chunk_size]) class DirectoryBuilder: """Reconstructs the on-disk representation of a directory in the storage. """ def __init__(self, storage, root, dir_id): """Initialize the directory builder. Args: storage: the storage object root: the path where the directory should be reconstructed dir_id: the identifier of the directory in the storage """ self.storage = storage self.root = root self.dir_id = dir_id def build(self): """Perform the reconstruction of the directory in the given root.""" # Retrieve data from the database. # Split into files, revisions and directory data. entries = collections.defaultdict(list) for entry in dir_iterator(self.storage, self.dir_id): entries[entry["type"]].append(entry) # Recreate the directory's subtree and then the files into it. self._create_tree(entries["dir"]) self._create_files(entries["file"]) self._create_revisions(entries["rev"]) def _create_tree(self, directories): """Create a directory tree from the given paths The tree is created from `root` and each given directory in `directories` will be created. """ # Directories are sorted by depth so they are created in the # right order bsep = os.path.sep.encode() directories = sorted(directories, key=lambda x: len(x["path"].split(bsep))) for dir in directories: os.makedirs(os.path.join(self.root, dir["path"])) def _create_files(self, files_data): """Create the files in the tree and fetch their contents.""" f = functools.partial(get_filtered_files_content, self.storage) files_data = apply_chunked(f, files_data, 1000) for file_data in files_data: path = os.path.join(self.root, file_data["path"]) self._create_file(path, file_data["content"], file_data["perms"]) def _create_revisions(self, revs_data): """Create the revisions in the tree as broken symlinks to the target identifier.""" for file_data in revs_data: path = os.path.join(self.root, file_data["path"]) - self._create_file( - path, hashutil.hash_to_hex(file_data["target"]), mode=0o120000 - ) + target = hashutil.hash_to_hex(file_data["target"]) + self._create_file(path, target, mode=DentryPerms.symlink) - def _create_file(self, path, content, mode=0o100644): + def _create_file(self, path, content, mode=DentryPerms.content): """Create the given file and fill it with content.""" perms = mode_to_perms(mode) if perms == DentryPerms.symlink: os.symlink(content, path) else: with open(path, "wb") as f: f.write(content) os.chmod(path, perms.value)