Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/cookers/base.py
Show First 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | class BaseVaultCooker(metaclass=abc.ABCMeta): | ||||
- CACHE_TYPE_KEY: key to use for the bundle to reference in cache | - CACHE_TYPE_KEY: key to use for the bundle to reference in cache | ||||
- def cook(): cook the object into a bundle | - def cook(): cook the object into a bundle | ||||
""" | """ | ||||
CACHE_TYPE_KEY = None # type: Optional[str] | CACHE_TYPE_KEY = None # type: Optional[str] | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
obj_type: str, | bundle_type: str, | ||||
obj_id: Sha1Git, | obj_id: Sha1Git, | ||||
backend, | backend, | ||||
storage: StorageInterface, | storage: StorageInterface, | ||||
graph=None, | graph=None, | ||||
objstorage=None, | objstorage=None, | ||||
max_bundle_size: int = MAX_BUNDLE_SIZE, | max_bundle_size: int = MAX_BUNDLE_SIZE, | ||||
): | ): | ||||
"""Initialize the cooker. | """Initialize the cooker. | ||||
The type of the object represented by the id depends on the | The type of the object represented by the id depends on the | ||||
concrete class. Very likely, each type of bundle will have its | concrete class. Very likely, each type of bundle will have its | ||||
own cooker class. | own cooker class. | ||||
Args: | Args: | ||||
obj_type: type of the object to be cooked into a bundle (directory, | bundle_type: type of the object to be cooked into a bundle (directory, | ||||
revision_flat or revision_gitfast; see | revision_flat or revision_gitfast; see | ||||
swh.vault.cooker.COOKER_TYPES). | swh.vault.cooker.COOKER_TYPES). | ||||
obj_id: id of the object to be cooked into a bundle. | obj_id: id of the object to be cooked into a bundle. | ||||
backend: the vault backend (swh.vault.backend.VaultBackend). | backend: the vault backend (swh.vault.backend.VaultBackend). | ||||
""" | """ | ||||
self.obj_type = obj_type | self.bundle_type = bundle_type | ||||
self.obj_id = hashutil.hash_to_bytes(obj_id) | self.obj_id = hashutil.hash_to_bytes(obj_id) | ||||
self.backend = backend | self.backend = backend | ||||
self.storage = storage | self.storage = storage | ||||
self.objstorage = objstorage | self.objstorage = objstorage | ||||
self.graph = graph | self.graph = graph | ||||
self.max_bundle_size = max_bundle_size | self.max_bundle_size = max_bundle_size | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
Show All 17 Lines | def cache_type_key(self) -> str: | ||||
return self.CACHE_TYPE_KEY | return self.CACHE_TYPE_KEY | ||||
def write(self, chunk): | def write(self, chunk): | ||||
self.fileobj.write(chunk) | self.fileobj.write(chunk) | ||||
def cook(self): | def cook(self): | ||||
"""Cook the requested object into a bundle | """Cook the requested object into a bundle | ||||
""" | """ | ||||
self.backend.set_status(self.obj_type, self.obj_id, "pending") | self.backend.set_status(self.bundle_type, self.obj_id, "pending") | ||||
self.backend.set_progress(self.obj_type, self.obj_id, "Processing...") | self.backend.set_progress(self.bundle_type, self.obj_id, "Processing...") | ||||
self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) | self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) | ||||
try: | try: | ||||
try: | try: | ||||
self.prepare_bundle() | self.prepare_bundle() | ||||
except QueryCanceledError: | except QueryCanceledError: | ||||
raise PolicyError( | raise PolicyError( | ||||
"Timeout reached while assembling the requested bundle" | "Timeout reached while assembling the requested bundle" | ||||
) | ) | ||||
bundle = self.fileobj.getvalue() | bundle = self.fileobj.getvalue() | ||||
# TODO: use proper content streaming instead of put_bundle() | # TODO: use proper content streaming instead of put_bundle() | ||||
self.backend.put_bundle(self.cache_type_key(), self.obj_id, bundle) | self.backend.put_bundle(self.cache_type_key(), self.obj_id, bundle) | ||||
except PolicyError as e: | except PolicyError as e: | ||||
self.backend.set_status(self.obj_type, self.obj_id, "failed") | logging.info("Bundle cooking violated policy: %s", e) | ||||
self.backend.set_progress(self.obj_type, self.obj_id, str(e)) | self.backend.set_status(self.bundle_type, self.obj_id, "failed") | ||||
self.backend.set_progress(self.bundle_type, self.obj_id, str(e)) | |||||
except Exception: | except Exception: | ||||
self.backend.set_status(self.obj_type, self.obj_id, "failed") | self.backend.set_status(self.bundle_type, self.obj_id, "failed") | ||||
self.backend.set_progress( | self.backend.set_progress( | ||||
self.obj_type, | self.bundle_type, | ||||
self.obj_id, | self.obj_id, | ||||
"Internal Server Error. This incident will be reported.", | "Internal Server Error. This incident will be reported.", | ||||
) | ) | ||||
logging.exception("Bundle cooking failed.") | logging.exception("Bundle cooking failed.") | ||||
else: | else: | ||||
self.backend.set_status(self.obj_type, self.obj_id, "done") | self.backend.set_status(self.bundle_type, self.obj_id, "done") | ||||
self.backend.set_progress(self.obj_type, self.obj_id, None) | self.backend.set_progress(self.bundle_type, self.obj_id, None) | ||||
finally: | finally: | ||||
self.backend.send_notif(self.obj_type, self.obj_id) | self.backend.send_notif(self.bundle_type, self.obj_id) |