Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/cookers/base.py
# Copyright (C) 2016-2018 The Software Heritage developers | # Copyright (C) 2016-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import abc | import abc | ||||
import io | import io | ||||
import logging | import logging | ||||
from typing import Optional | from typing import ClassVar, Set | ||||
from psycopg2.extensions import QueryCanceledError | from psycopg2.extensions import QueryCanceledError | ||||
from swh.model import hashutil | from swh.model.identifiers import CoreSWHID, ObjectType | ||||
from swh.model.model import Sha1Git | |||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB | MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB | ||||
DEFAULT_CONFIG_PATH = "vault/cooker" | DEFAULT_CONFIG_PATH = "vault/cooker" | ||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
"max_bundle_size": ("int", MAX_BUNDLE_SIZE), | "max_bundle_size": ("int", MAX_BUNDLE_SIZE), | ||||
} | } | ||||
Show All 32 Lines | class BaseVaultCooker(metaclass=abc.ABCMeta): | ||||
This class describes a common API for the cookers. | This class describes a common API for the cookers. | ||||
To define a new cooker, inherit from this class and override: | To define a new cooker, inherit from this class and override: | ||||
- CACHE_TYPE_KEY: key to use for the bundle to reference in cache | - CACHE_TYPE_KEY: key to use for the bundle to reference in cache | ||||
- def cook(): cook the object into a bundle | - def cook(): cook the object into a bundle | ||||
""" | """ | ||||
CACHE_TYPE_KEY = None # type: Optional[str] | SUPPORTED_OBJECT_TYPES: ClassVar[Set[ObjectType]] | ||||
BUNDLE_TYPE: ClassVar[str] | |||||
def __init__( | def __init__( | ||||
self, | self, | ||||
bundle_type: str, | swhid: CoreSWHID, | ||||
obj_id: Sha1Git, | |||||
backend, | backend, | ||||
storage: StorageInterface, | storage: StorageInterface, | ||||
graph=None, | graph=None, | ||||
objstorage=None, | objstorage=None, | ||||
max_bundle_size: int = MAX_BUNDLE_SIZE, | max_bundle_size: int = MAX_BUNDLE_SIZE, | ||||
): | ): | ||||
"""Initialize the cooker. | """Initialize the cooker. | ||||
The type of the object represented by the id depends on the | The type of the object represented by the id depends on the | ||||
concrete class. Very likely, each type of bundle will have its | concrete class. Very likely, each type of bundle will have its | ||||
own cooker class. | own cooker class. | ||||
Args: | Args: | ||||
bundle_type: type of the object to be cooked into a bundle (directory, | swhid: id of the object to be cooked into a bundle. | ||||
revision_flat or revision_gitfast; see | |||||
swh.vault.cooker.COOKER_TYPES). | |||||
obj_id: id of the object to be cooked into a bundle. | |||||
backend: the vault backend (swh.vault.backend.VaultBackend). | backend: the vault backend (swh.vault.backend.VaultBackend). | ||||
""" | """ | ||||
self.bundle_type = bundle_type | self.check_object_type(swhid.object_type) | ||||
self.obj_id = hashutil.hash_to_bytes(obj_id) | self.swhid = swhid | ||||
self.obj_id = swhid.object_id | |||||
self.backend = backend | self.backend = backend | ||||
self.storage = storage | self.storage = storage | ||||
self.objstorage = objstorage | self.objstorage = objstorage | ||||
self.graph = graph | self.graph = graph | ||||
self.max_bundle_size = max_bundle_size | self.max_bundle_size = max_bundle_size | ||||
@classmethod | |||||
def check_object_type(cls, object_type: ObjectType) -> None: | |||||
if object_type not in cls.SUPPORTED_OBJECT_TYPES: | |||||
raise ValueError(f"{cls.__name__} does not support {object_type} objects.") | |||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def check_exists(self): | def check_exists(self): | ||||
"""Checks that the requested object exists and can be cooked. | """Checks that the requested object exists and can be cooked. | ||||
Override this in the cooker implementation. | Override this in the cooker implementation. | ||||
""" | """ | ||||
raise NotImplementedError | raise NotImplementedError | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def prepare_bundle(self): | def prepare_bundle(self): | ||||
"""Implementation of the cooker. Yields chunks of the bundle bytes. | """Implementation of the cooker. Yields chunks of the bundle bytes. | ||||
Override this with the cooker implementation. | Override this with the cooker implementation. | ||||
""" | """ | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def cache_type_key(self) -> str: | def cache_type_key(self) -> str: | ||||
assert self.CACHE_TYPE_KEY | assert self.BUNDLE_TYPE | ||||
return self.CACHE_TYPE_KEY | return self.BUNDLE_TYPE | ||||
def write(self, chunk): | def write(self, chunk): | ||||
self.fileobj.write(chunk) | self.fileobj.write(chunk) | ||||
def cook(self): | def cook(self): | ||||
"""Cook the requested object into a bundle | """Cook the requested object into a bundle | ||||
""" | """ | ||||
self.backend.set_status(self.bundle_type, self.obj_id, "pending") | self.backend.set_status(self.BUNDLE_TYPE, self.swhid, "pending") | ||||
self.backend.set_progress(self.bundle_type, self.obj_id, "Processing...") | self.backend.set_progress(self.BUNDLE_TYPE, self.swhid, "Processing...") | ||||
self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) | self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) | ||||
try: | try: | ||||
try: | try: | ||||
self.prepare_bundle() | self.prepare_bundle() | ||||
except QueryCanceledError: | except QueryCanceledError: | ||||
raise PolicyError( | raise PolicyError( | ||||
"Timeout reached while assembling the requested bundle" | "Timeout reached while assembling the requested bundle" | ||||
) | ) | ||||
bundle = self.fileobj.getvalue() | bundle = self.fileobj.getvalue() | ||||
# TODO: use proper content streaming instead of put_bundle() | # TODO: use proper content streaming instead of put_bundle() | ||||
self.backend.put_bundle(self.cache_type_key(), self.obj_id, bundle) | self.backend.put_bundle(self.cache_type_key(), self.swhid, bundle) | ||||
except PolicyError as e: | except PolicyError as e: | ||||
logging.info("Bundle cooking violated policy: %s", e) | logging.info("Bundle cooking violated policy: %s", e) | ||||
self.backend.set_status(self.bundle_type, self.obj_id, "failed") | self.backend.set_status(self.BUNDLE_TYPE, self.swhid, "failed") | ||||
self.backend.set_progress(self.bundle_type, self.obj_id, str(e)) | self.backend.set_progress(self.BUNDLE_TYPE, self.swhid, str(e)) | ||||
except Exception: | except Exception: | ||||
self.backend.set_status(self.bundle_type, self.obj_id, "failed") | self.backend.set_status(self.BUNDLE_TYPE, self.swhid, "failed") | ||||
self.backend.set_progress( | self.backend.set_progress( | ||||
self.bundle_type, | self.BUNDLE_TYPE, | ||||
self.obj_id, | self.swhid, | ||||
"Internal Server Error. This incident will be reported.", | "Internal Server Error. This incident will be reported.", | ||||
) | ) | ||||
logging.exception("Bundle cooking failed.") | logging.exception("Bundle cooking failed.") | ||||
else: | else: | ||||
self.backend.set_status(self.bundle_type, self.obj_id, "done") | self.backend.set_status(self.BUNDLE_TYPE, self.swhid, "done") | ||||
self.backend.set_progress(self.bundle_type, self.obj_id, None) | self.backend.set_progress(self.BUNDLE_TYPE, self.swhid, None) | ||||
finally: | finally: | ||||
self.backend.send_notif(self.bundle_type, self.obj_id) | self.backend.send_notif(self.BUNDLE_TYPE, self.swhid) |