diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index 32e6a87..05eca87 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,154 +1,157 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import logging +import traceback from typing import ClassVar, Set from psycopg2.extensions import QueryCanceledError from swh.model.identifiers import CoreSWHID, ObjectType from swh.storage.interface import StorageInterface MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB DEFAULT_CONFIG_PATH = "vault/cooker" DEFAULT_CONFIG = { "max_bundle_size": ("int", MAX_BUNDLE_SIZE), } class PolicyError(Exception): """Raised when the bundle violates the cooking policy.""" pass class BundleTooLargeError(PolicyError): """Raised when the bundle is too large to be cooked.""" pass class BytesIOBundleSizeLimit(io.BytesIO): def __init__(self, *args, size_limit=None, **kwargs): super().__init__(*args, **kwargs) self.size_limit = size_limit def write(self, chunk): if ( self.size_limit is not None and self.getbuffer().nbytes + len(chunk) > self.size_limit ): raise BundleTooLargeError( "The requested bundle exceeds the maximum allowed " "size of {} bytes.".format(self.size_limit) ) return super().write(chunk) class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle """ SUPPORTED_OBJECT_TYPES: ClassVar[Set[ObjectType]] BUNDLE_TYPE: ClassVar[str] def __init__( self, swhid: CoreSWHID, backend, storage: StorageInterface, graph=None, objstorage=None, max_bundle_size: int = MAX_BUNDLE_SIZE, ): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: swhid: id of the object to be cooked into a bundle. backend: the vault backend (swh.vault.backend.VaultBackend). """ self.check_object_type(swhid.object_type) self.swhid = swhid self.obj_id = swhid.object_id self.backend = backend self.storage = storage self.objstorage = objstorage self.graph = graph self.max_bundle_size = max_bundle_size @classmethod def check_object_type(cls, object_type: ObjectType) -> None: if object_type not in cls.SUPPORTED_OBJECT_TYPES: raise ValueError(f"{cls.__name__} does not support {object_type} objects.") @abc.abstractmethod def check_exists(self): """Checks that the requested object exists and can be cooked. Override this in the cooker implementation. """ raise NotImplementedError @abc.abstractmethod def prepare_bundle(self): """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplementedError def cache_type_key(self) -> str: assert self.BUNDLE_TYPE return self.BUNDLE_TYPE def write(self, chunk): self.fileobj.write(chunk) def cook(self): """Cook the requested object into a bundle """ self.backend.set_status(self.BUNDLE_TYPE, self.swhid, "pending") self.backend.set_progress(self.BUNDLE_TYPE, self.swhid, "Processing...") self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) try: try: self.prepare_bundle() except QueryCanceledError: raise PolicyError( "Timeout reached while assembling the requested bundle" ) bundle = self.fileobj.getvalue() # TODO: use proper content streaming instead of put_bundle() self.backend.put_bundle(self.cache_type_key(), self.swhid, bundle) except PolicyError as e: logging.info("Bundle cooking violated policy: %s", e) self.backend.set_status(self.BUNDLE_TYPE, self.swhid, "failed") self.backend.set_progress(self.BUNDLE_TYPE, self.swhid, str(e)) except Exception: self.backend.set_status(self.BUNDLE_TYPE, self.swhid, "failed") + tb = traceback.format_exc() self.backend.set_progress( self.BUNDLE_TYPE, self.swhid, - "Internal Server Error. This incident will be reported.", + f"Internal Server Error. This incident will be reported.\n" + f"The full error was:\n\n{tb}", ) logging.exception("Bundle cooking failed.") else: self.backend.set_status(self.BUNDLE_TYPE, self.swhid, "done") self.backend.set_progress(self.BUNDLE_TYPE, self.swhid, None) finally: self.backend.send_notif(self.BUNDLE_TYPE, self.swhid) diff --git a/swh/vault/tests/test_cookers_base.py b/swh/vault/tests/test_cookers_base.py index 3b95fae..965c322 100644 --- a/swh/vault/tests/test_cookers_base.py +++ b/swh/vault/tests/test_cookers_base.py @@ -1,73 +1,83 @@ -# Copyright (C) 2018 The Software Heritage developers +# Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import textwrap from unittest.mock import MagicMock from swh.model.identifiers import CoreSWHID from swh.vault.cookers.base import BaseVaultCooker TEST_BUNDLE_CHUNKS = [b"test content 1\n", b"test content 2\n", b"test content 3\n"] TEST_BUNDLE_CONTENT = b"".join(TEST_BUNDLE_CHUNKS) TEST_BUNDLE_TYPE = "test_type" TEST_SWHID = CoreSWHID.from_string("swh:1:cnt:17a3e48bce37be5226490e750202ad3a9a1a3fe9") class BaseVaultCookerMock(BaseVaultCooker): BUNDLE_TYPE = TEST_BUNDLE_TYPE def __init__(self): # we do not call super() here to bypass the building of db objects from # config since we do mock these db objects self.config = {} self.storage = MagicMock() self.backend = MagicMock() self.swhid = TEST_SWHID self.obj_id = TEST_SWHID.object_id self.max_bundle_size = 1024 def check_exists(self): return True def prepare_bundle(self): for chunk in TEST_BUNDLE_CHUNKS: self.write(chunk) def test_simple_cook(): cooker = BaseVaultCookerMock() cooker.cook() cooker.backend.put_bundle.assert_called_once_with( TEST_BUNDLE_TYPE, TEST_SWHID, TEST_BUNDLE_CONTENT ) cooker.backend.set_status.assert_called_with(TEST_BUNDLE_TYPE, TEST_SWHID, "done") cooker.backend.set_progress.assert_called_with(TEST_BUNDLE_TYPE, TEST_SWHID, None) cooker.backend.send_notif.assert_called_with(TEST_BUNDLE_TYPE, TEST_SWHID) def test_code_exception_cook(): cooker = BaseVaultCookerMock() cooker.prepare_bundle = MagicMock() cooker.prepare_bundle.side_effect = RuntimeError("Nope") cooker.cook() # Potentially remove this when we have objstorage streaming cooker.backend.put_bundle.assert_not_called() cooker.backend.set_status.assert_called_with(TEST_BUNDLE_TYPE, TEST_SWHID, "failed") - assert "Nope" not in cooker.backend.set_progress.call_args[0][2] + assert cooker.backend.set_progress.call_args[0][2].startswith( + textwrap.dedent( + """\ + Internal Server Error. This incident will be reported. + The full error was: + + Traceback (most recent call last): + """ + ) + ) cooker.backend.send_notif.assert_called_with(TEST_BUNDLE_TYPE, TEST_SWHID) def test_policy_exception_cook(): cooker = BaseVaultCookerMock() cooker.max_bundle_size = 8 cooker.cook() # Potentially remove this when we have objstorage streaming cooker.backend.put_bundle.assert_not_called() cooker.backend.set_status.assert_called_with(TEST_BUNDLE_TYPE, TEST_SWHID, "failed") assert "exceeds" in cooker.backend.set_progress.call_args[0][2] cooker.backend.send_notif.assert_called_with(TEST_BUNDLE_TYPE, TEST_SWHID)