Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/cookers/base.py
Show First 20 Lines • Show All 55 Lines • ▼ Show 20 Lines | class BaseVaultCooker(metaclass=abc.ABCMeta): | ||||
To define a new cooker, inherit from this class and override: | To define a new cooker, inherit from this class and override: | ||||
- CACHE_TYPE_KEY: key to use for the bundle to reference in cache | - CACHE_TYPE_KEY: key to use for the bundle to reference in cache | ||||
- def cook(): cook the object into a bundle | - def cook(): cook the object into a bundle | ||||
""" | """ | ||||
CACHE_TYPE_KEY = None # type: Optional[str] | CACHE_TYPE_KEY = None # type: Optional[str] | ||||
def __init__( | def __init__( | ||||
self, obj_type, obj_id, backend, storage, max_bundle_size=MAX_BUNDLE_SIZE | self, obj_type, obj_id, backend, storage, graph, max_bundle_size=MAX_BUNDLE_SIZE | ||||
): | ): | ||||
"""Initialize the cooker. | """Initialize the cooker. | ||||
The type of the object represented by the id depends on the | The type of the object represented by the id depends on the | ||||
concrete class. Very likely, each type of bundle will have its | concrete class. Very likely, each type of bundle will have its | ||||
own cooker class. | own cooker class. | ||||
Args: | Args: | ||||
obj_type: type of the object to be cooked into a bundle (directory, | obj_type: type of the object to be cooked into a bundle (directory, | ||||
revision_flat or revision_gitfast; see | revision_flat or revision_gitfast; see | ||||
swh.vault.cooker.COOKER_TYPES). | swh.vault.cooker.COOKER_TYPES). | ||||
obj_id: id of the object to be cooked into a bundle. | obj_id: id of the object to be cooked into a bundle. | ||||
backend: the vault backend (swh.vault.backend.VaultBackend). | backend: the vault backend (swh.vault.backend.VaultBackend). | ||||
""" | """ | ||||
self.obj_type = obj_type | self.obj_type = obj_type | ||||
self.obj_id = hashutil.hash_to_bytes(obj_id) | self.obj_id = hashutil.hash_to_bytes(obj_id) | ||||
self.backend = backend | self.backend = backend | ||||
self.storage = storage | self.storage = storage | ||||
self.graph = graph | |||||
self.max_bundle_size = max_bundle_size | self.max_bundle_size = max_bundle_size | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def check_exists(self): | def check_exists(self): | ||||
"""Checks that the requested object exists and can be cooked. | """Checks that the requested object exists and can be cooked. | ||||
Override this in the cooker implementation. | Override this in the cooker implementation. | ||||
""" | """ | ||||
raise NotImplementedError | raise NotImplementedError | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def prepare_bundle(self): | def prepare_bundle(self): | ||||
"""Implementation of the cooker. Yields chunks of the bundle bytes. | """Implementation of the cooker. Yields chunks of the bundle bytes. | ||||
Override this with the cooker implementation. | Override this with the cooker implementation. | ||||
""" | """ | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def cache_type_key(self) -> str: | |||||
assert self.CACHE_TYPE_KEY | |||||
return self.CACHE_TYPE_KEY | |||||
def write(self, chunk): | def write(self, chunk): | ||||
self.fileobj.write(chunk) | self.fileobj.write(chunk) | ||||
def cook(self): | def cook(self): | ||||
"""Cook the requested object into a bundle | """Cook the requested object into a bundle | ||||
""" | """ | ||||
self.backend.set_status(self.obj_type, self.obj_id, "pending") | self.backend.set_status(self.obj_type, self.obj_id, "pending") | ||||
self.backend.set_progress(self.obj_type, self.obj_id, "Processing...") | self.backend.set_progress(self.obj_type, self.obj_id, "Processing...") | ||||
self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) | self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) | ||||
try: | try: | ||||
try: | try: | ||||
self.prepare_bundle() | self.prepare_bundle() | ||||
except QueryCanceledError: | except QueryCanceledError: | ||||
raise PolicyError( | raise PolicyError( | ||||
"Timeout reached while assembling the requested bundle" | "Timeout reached while assembling the requested bundle" | ||||
) | ) | ||||
bundle = self.fileobj.getvalue() | bundle = self.fileobj.getvalue() | ||||
# TODO: use proper content streaming instead of put_bundle() | # TODO: use proper content streaming instead of put_bundle() | ||||
self.backend.put_bundle(self.CACHE_TYPE_KEY, self.obj_id, bundle) | self.backend.put_bundle(self.cache_type_key(), self.obj_id, bundle) | ||||
except PolicyError as e: | except PolicyError as e: | ||||
self.backend.set_status(self.obj_type, self.obj_id, "failed") | self.backend.set_status(self.obj_type, self.obj_id, "failed") | ||||
self.backend.set_progress(self.obj_type, self.obj_id, str(e)) | self.backend.set_progress(self.obj_type, self.obj_id, str(e)) | ||||
except Exception: | except Exception: | ||||
self.backend.set_status(self.obj_type, self.obj_id, "failed") | self.backend.set_status(self.obj_type, self.obj_id, "failed") | ||||
self.backend.set_progress( | self.backend.set_progress( | ||||
self.obj_type, | self.obj_type, | ||||
self.obj_id, | self.obj_id, | ||||
"Internal Server Error. This incident will be reported.", | "Internal Server Error. This incident will be reported.", | ||||
) | ) | ||||
logging.exception("Bundle cooking failed.") | logging.exception("Bundle cooking failed.") | ||||
else: | else: | ||||
self.backend.set_status(self.obj_type, self.obj_id, "done") | self.backend.set_status(self.obj_type, self.obj_id, "done") | ||||
self.backend.set_progress(self.obj_type, self.obj_id, None) | self.backend.set_progress(self.obj_type, self.obj_id, None) | ||||
finally: | finally: | ||||
self.backend.send_notif(self.obj_type, self.obj_id) | self.backend.send_notif(self.obj_type, self.obj_id) |