diff --git a/swh/vault/cookers/__init__.py b/swh/vault/cookers/__init__.py index 474eb62..04ca9e5 100644 --- a/swh/vault/cookers/__init__.py +++ b/swh/vault/cookers/__init__.py @@ -1,57 +1,68 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from swh.core.config import load_named_config from swh.core.config import read as read_config from swh.storage import get_storage from swh.vault import get_vault from swh.vault.cookers.base import DEFAULT_CONFIG, DEFAULT_CONFIG_PATH from swh.vault.cookers.directory import DirectoryCooker from swh.vault.cookers.revision_flat import RevisionFlatCooker from swh.vault.cookers.revision_gitfast import RevisionGitfastCooker COOKER_TYPES = { "directory": DirectoryCooker, "revision_flat": RevisionFlatCooker, "revision_gitfast": RevisionGitfastCooker, } def get_cooker_cls(obj_type): return COOKER_TYPES[obj_type] -def get_cooker(obj_type, obj_id): +def get_cooker(obj_type: str, obj_id: str): + """Instantiate a cooker class of type obj_type. + + Returns: + Cooker class in charge of cooking the obj_type with id obj_id. + + Raises: + ValueError in case of a missing top-level vault key configuration or a storage + key. + EnvironmentError in case the vault configuration reference a non remote class. + + """ if "SWH_CONFIG_FILENAME" in os.environ: cfg = read_config(os.environ["SWH_CONFIG_FILENAME"], DEFAULT_CONFIG) else: cfg = load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG) cooker_cls = get_cooker_cls(obj_type) if "vault" not in cfg: - raise ValueError("missing '%vault' configuration") + raise ValueError("missing 'vault' configuration") vcfg = cfg["vault"] if vcfg["cls"] != "remote": raise EnvironmentError( - "This vault backend can only be a 'remote' " "configuration", err=True + "This vault backend can only be a 'remote' configuration" ) args = vcfg["args"] if "storage" not in args: args["storage"] = cfg.get("storage") if not args.get("storage"): - raise ValueError("invalid configuration; missing 'storage' config entry.") + raise ValueError("invalid configuration: missing 'storage' config entry.") storage = get_storage(**args.pop("storage")) backend = get_vault(**vcfg) return cooker_cls( obj_type, obj_id, backend=backend, storage=storage, max_bundle_size=cfg["max_bundle_size"], ) diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index 657ddba..a9c32f5 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,138 +1,136 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import logging from typing import Optional from psycopg2.extensions import QueryCanceledError from swh.model import hashutil MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB DEFAULT_CONFIG_PATH = "vault/cooker" DEFAULT_CONFIG = { - "storage": ("dict", {"cls": "remote", "args": {"url": "http://localhost:5002/",},}), - "vault": ("dict", {"cls": "remote", "args": {"url": "http://localhost:5005/",},}), "max_bundle_size": ("int", MAX_BUNDLE_SIZE), } class PolicyError(Exception): """Raised when the bundle violates the cooking policy.""" pass class BundleTooLargeError(PolicyError): """Raised when the bundle is too large to be cooked.""" pass class BytesIOBundleSizeLimit(io.BytesIO): def __init__(self, *args, size_limit=None, **kwargs): super().__init__(*args, **kwargs) self.size_limit = size_limit def write(self, chunk): if ( self.size_limit is not None and self.getbuffer().nbytes + len(chunk) > self.size_limit ): raise BundleTooLargeError( "The requested bundle exceeds the maximum allowed " "size of {} bytes.".format(self.size_limit) ) return super().write(chunk) class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle """ CACHE_TYPE_KEY = None # type: Optional[str] def __init__( self, obj_type, obj_id, backend, storage, max_bundle_size=MAX_BUNDLE_SIZE ): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: obj_type: type of the object to be cooked into a bundle (directory, revision_flat or revision_gitfast; see swh.vault.cooker.COOKER_TYPES). obj_id: id of the object to be cooked into a bundle. backend: the vault backend (swh.vault.backend.VaultBackend). """ self.obj_type = obj_type self.obj_id = hashutil.hash_to_bytes(obj_id) self.backend = backend self.storage = storage self.max_bundle_size = max_bundle_size @abc.abstractmethod def check_exists(self): """Checks that the requested object exists and can be cooked. Override this in the cooker implementation. """ raise NotImplementedError @abc.abstractmethod def prepare_bundle(self): """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplementedError def write(self, chunk): self.fileobj.write(chunk) def cook(self): """Cook the requested object into a bundle """ self.backend.set_status(self.obj_type, self.obj_id, "pending") self.backend.set_progress(self.obj_type, self.obj_id, "Processing...") self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) try: try: self.prepare_bundle() except QueryCanceledError: raise PolicyError( "Timeout reached while assembling the requested bundle" ) bundle = self.fileobj.getvalue() # TODO: use proper content streaming instead of put_bundle() self.backend.put_bundle(self.CACHE_TYPE_KEY, self.obj_id, bundle) except PolicyError as e: self.backend.set_status(self.obj_type, self.obj_id, "failed") self.backend.set_progress(self.obj_type, self.obj_id, str(e)) except Exception: self.backend.set_status(self.obj_type, self.obj_id, "failed") self.backend.set_progress( self.obj_type, self.obj_id, "Internal Server Error. This incident will be reported.", ) logging.exception("Bundle cooking failed.") else: self.backend.set_status(self.obj_type, self.obj_id, "done") self.backend.set_progress(self.obj_type, self.obj_id, None) finally: self.backend.send_notif(self.obj_type, self.obj_id) diff --git a/swh/vault/tests/test_init_cookers.py b/swh/vault/tests/test_init_cookers.py new file mode 100644 index 0000000..51d6bb1 --- /dev/null +++ b/swh/vault/tests/test_init_cookers.py @@ -0,0 +1,109 @@ +# Copyright (C) 2017-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +from typing import Dict + +import pytest +import yaml + +from swh.vault.cookers import COOKER_TYPES, get_cooker +from swh.vault.tests.test_backend import TEST_HEX_ID + + +@pytest.fixture +def swh_cooker_config(): + return { + "vault": { + "cls": "remote", + "args": { + "url": "mock://vault-backend", + "storage": {"cls": "remote", "url": "mock://storage-url"}, + }, + } + } + + +def write_config_to_env(config: Dict, tmp_path, monkeypatch) -> str: + """Write the configuration dict into a temporary file, then reference that path to + SWH_CONFIG_FILENAME environment variable. + + """ + conf_path = os.path.join(str(tmp_path), "cooker.yml") + with open(conf_path, "w") as f: + f.write(yaml.dump(config)) + monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) + return conf_path + + +def test_write_to_env(swh_cooker_config, tmp_path, monkeypatch): + actual_path = write_config_to_env(swh_cooker_config, tmp_path, monkeypatch) + + assert os.path.exists(actual_path) is True + assert os.environ["SWH_CONFIG_FILENAME"] == actual_path + + with open(actual_path, "r") as f: + actual_config = yaml.safe_load(f.read()) + assert actual_config == swh_cooker_config + + +@pytest.mark.parametrize( + "config_ko,exception_class,exception_msg", + [ + ({}, ValueError, "missing 'vault' configuration"), + ( + {"vault": {"cls": "local"}}, + EnvironmentError, + "This vault backend can only be a 'remote' configuration", + ), + ( + {"vault": {"cls": "remote", "args": {"missing-storage-key": ""}}}, + ValueError, + "invalid configuration: missing 'storage' config entry", + ), + ({"vault": {"cls": "remote"}}, KeyError, "args",), + ], +) +def test_get_cooker_config_ko( + config_ko, exception_class, exception_msg, monkeypatch, tmp_path +): + """Misconfigured cooker should fail the instantiation with exception message + + """ + write_config_to_env(config_ko, tmp_path, monkeypatch) + + with pytest.raises(exception_class, match=exception_msg): + get_cooker("directory", TEST_HEX_ID) + + +@pytest.mark.parametrize( + "config_ok", + [ + { + "vault": { + "cls": "remote", + "args": { + "url": "mock://vault-backend", + "storage": {"cls": "remote", "url": "mock://storage-url"}, + }, + } + }, + { + "vault": {"cls": "remote", "args": {"url": "mock://vault-backend",},}, + "storage": {"cls": "remote", "url": "mock://storage-url"}, + }, + ], +) +def test_get_cooker_nominal(config_ok, tmp_path, monkeypatch): + """Correct configuration should allow the instantiation of the cookers + + """ + for cooker_type in COOKER_TYPES.keys(): + write_config_to_env(config_ok, tmp_path, monkeypatch) + + cooker = get_cooker(cooker_type, TEST_HEX_ID) + + assert cooker is not None + assert isinstance(cooker, COOKER_TYPES[cooker_type])