Page MenuHomeSoftware Heritage

base.py
No OneTemporary

# Copyright (C) 2016-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import io
import logging
import traceback
from typing import ClassVar, Set
from psycopg2.extensions import QueryCanceledError
from swh.model.swhids import CoreSWHID, ObjectType
from swh.storage.interface import StorageInterface
MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB
DEFAULT_CONFIG_PATH = "vault/cooker"
DEFAULT_CONFIG = {
"max_bundle_size": ("int", MAX_BUNDLE_SIZE),
}
class PolicyError(Exception):
"""Raised when the bundle violates the cooking policy."""
pass
class BundleTooLargeError(PolicyError):
"""Raised when the bundle is too large to be cooked."""
pass
class BytesIOBundleSizeLimit(io.BytesIO):
def __init__(self, *args, size_limit=None, **kwargs):
super().__init__(*args, **kwargs)
self.size_limit = size_limit
def write(self, chunk):
if (
self.size_limit is not None
and self.getbuffer().nbytes + len(chunk) > self.size_limit
):
raise BundleTooLargeError(
"The requested bundle exceeds the maximum allowed "
"size of {} bytes.".format(self.size_limit)
)
return super().write(chunk)
class BaseVaultCooker(metaclass=abc.ABCMeta):
"""Abstract base class for the vault's bundle creators
This class describes a common API for the cookers.
To define a new cooker, inherit from this class and override:
- CACHE_TYPE_KEY: key to use for the bundle to reference in cache
- def cook(): cook the object into a bundle
"""
SUPPORTED_OBJECT_TYPES: ClassVar[Set[ObjectType]]
BUNDLE_TYPE: ClassVar[str]
def __init__(
self,
swhid: CoreSWHID,
backend,
storage: StorageInterface,
graph=None,
objstorage=None,
max_bundle_size: int = MAX_BUNDLE_SIZE,
thread_pool_size: int = 10,
):
"""Initialize the cooker.
The type of the object represented by the id depends on the
concrete class. Very likely, each type of bundle will have its
own cooker class.
Args:
swhid: id of the object to be cooked into a bundle.
backend: the vault backend (swh.vault.backend.VaultBackend).
"""
self.check_object_type(swhid.object_type)
self.swhid = swhid
self.obj_id = swhid.object_id
self.backend = backend
self.storage = storage
self.objstorage = objstorage
self.graph = graph
self.max_bundle_size = max_bundle_size
self.thread_pool_size = thread_pool_size
@classmethod
def check_object_type(cls, object_type: ObjectType) -> None:
if object_type not in cls.SUPPORTED_OBJECT_TYPES:
raise ValueError(f"{cls.__name__} does not support {object_type} objects.")
@abc.abstractmethod
def check_exists(self):
"""Checks that the requested object exists and can be cooked.
Override this in the cooker implementation.
"""
raise NotImplementedError
@abc.abstractmethod
def prepare_bundle(self):
"""Implementation of the cooker. Yields chunks of the bundle bytes.
Override this with the cooker implementation.
"""
raise NotImplementedError
def cache_type_key(self) -> str:
assert self.BUNDLE_TYPE
return self.BUNDLE_TYPE
def write(self, chunk):
self.fileobj.write(chunk)
def cook(self):
"""Cook the requested object into a bundle
"""
self.backend.set_status(self.BUNDLE_TYPE, self.swhid, "pending")
self.backend.set_progress(self.BUNDLE_TYPE, self.swhid, "Processing...")
self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size)
try:
try:
self.prepare_bundle()
except QueryCanceledError:
raise PolicyError(
"Timeout reached while assembling the requested bundle"
)
bundle = self.fileobj.getvalue()
# TODO: use proper content streaming instead of put_bundle()
self.backend.put_bundle(self.cache_type_key(), self.swhid, bundle)
except PolicyError as e:
logging.info("Bundle cooking violated policy: %s", e)
self.backend.set_status(self.BUNDLE_TYPE, self.swhid, "failed")
self.backend.set_progress(self.BUNDLE_TYPE, self.swhid, str(e))
except Exception:
self.backend.set_status(self.BUNDLE_TYPE, self.swhid, "failed")
tb = traceback.format_exc()
self.backend.set_progress(
self.BUNDLE_TYPE,
self.swhid,
f"Internal Server Error. This incident will be reported.\n"
f"The full error was:\n\n{tb}",
)
logging.exception("Bundle cooking failed.")
else:
self.backend.set_status(self.BUNDLE_TYPE, self.swhid, "done")
self.backend.set_progress(self.BUNDLE_TYPE, self.swhid, None)
finally:
self.backend.send_notif(self.BUNDLE_TYPE, self.swhid)

File Metadata

Mime Type
text/x-python
Expires
Fri, Jul 4, 11:53 AM (3 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3295844

Event Timeline