Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/cookers/base.py
Show First 20 Lines • Show All 92 Lines • ▼ Show 20 Lines | class BaseVaultCooker(metaclass=abc.ABCMeta): | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def prepare_bundle(self): | def prepare_bundle(self): | ||||
"""Implementation of the cooker. Yields chunks of the bundle bytes. | """Implementation of the cooker. Yields chunks of the bundle bytes. | ||||
Override this with the cooker implementation. | Override this with the cooker implementation. | ||||
""" | """ | ||||
raise NotImplementedError | raise NotImplementedError | ||||
def cache_type_key(self) -> str: | |||||
assert self.CACHE_TYPE_KEY | |||||
return self.CACHE_TYPE_KEY | |||||
def write(self, chunk): | def write(self, chunk): | ||||
self.fileobj.write(chunk) | self.fileobj.write(chunk) | ||||
def cook(self): | def cook(self): | ||||
"""Cook the requested object into a bundle | """Cook the requested object into a bundle | ||||
""" | """ | ||||
self.backend.set_status(self.obj_type, self.obj_id, "pending") | self.backend.set_status(self.obj_type, self.obj_id, "pending") | ||||
self.backend.set_progress(self.obj_type, self.obj_id, "Processing...") | self.backend.set_progress(self.obj_type, self.obj_id, "Processing...") | ||||
self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) | self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) | ||||
try: | try: | ||||
try: | try: | ||||
self.prepare_bundle() | self.prepare_bundle() | ||||
except QueryCanceledError: | except QueryCanceledError: | ||||
raise PolicyError( | raise PolicyError( | ||||
"Timeout reached while assembling the requested bundle" | "Timeout reached while assembling the requested bundle" | ||||
) | ) | ||||
bundle = self.fileobj.getvalue() | bundle = self.fileobj.getvalue() | ||||
# TODO: use proper content streaming instead of put_bundle() | # TODO: use proper content streaming instead of put_bundle() | ||||
self.backend.put_bundle(self.CACHE_TYPE_KEY, self.obj_id, bundle) | self.backend.put_bundle(self.cache_type_key(), self.obj_id, bundle) | ||||
except PolicyError as e: | except PolicyError as e: | ||||
self.backend.set_status(self.obj_type, self.obj_id, "failed") | self.backend.set_status(self.obj_type, self.obj_id, "failed") | ||||
self.backend.set_progress(self.obj_type, self.obj_id, str(e)) | self.backend.set_progress(self.obj_type, self.obj_id, str(e)) | ||||
except Exception: | except Exception: | ||||
self.backend.set_status(self.obj_type, self.obj_id, "failed") | self.backend.set_status(self.obj_type, self.obj_id, "failed") | ||||
self.backend.set_progress( | self.backend.set_progress( | ||||
self.obj_type, | self.obj_type, | ||||
self.obj_id, | self.obj_id, | ||||
"Internal Server Error. This incident will be reported.", | "Internal Server Error. This incident will be reported.", | ||||
) | ) | ||||
logging.exception("Bundle cooking failed.") | logging.exception("Bundle cooking failed.") | ||||
else: | else: | ||||
self.backend.set_status(self.obj_type, self.obj_id, "done") | self.backend.set_status(self.obj_type, self.obj_id, "done") | ||||
self.backend.set_progress(self.obj_type, self.obj_id, None) | self.backend.set_progress(self.obj_type, self.obj_id, None) | ||||
finally: | finally: | ||||
self.backend.send_notif(self.obj_type, self.obj_id) | self.backend.send_notif(self.obj_type, self.obj_id) |