diff --git a/swh/vault/backend.py b/swh/vault/backend.py --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -25,14 +25,14 @@ cooking_task_name = "swh.vault.cooking_tasks.SWHCookingTask" NOTIF_EMAIL_FROM = '"Software Heritage Vault" ' "" -NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {obj_type} {short_id}" -NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {obj_type} {short_id}" +NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {bundle_type} {short_id}" +NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {bundle_type} {short_id}" NOTIF_EMAIL_BODY_SUCCESS = """ You have requested the following bundle from the Software Heritage Vault: -Object Type: {obj_type} +Bundle Type: {bundle_type} Object ID: {hex_id} This bundle is now available for download at the following address: @@ -50,7 +50,7 @@ You have requested the following bundle from the Software Heritage Vault: -Object Type: {obj_type} +Bundle Type: {bundle_type} Object ID: {hex_id} This bundle could not be cooked for the following reason: @@ -65,7 +65,9 @@ def batch_to_bytes(batch: List[Tuple[str, str]]) -> List[Tuple[str, bytes]]: - return [(obj_type, hashutil.hash_to_bytes(hex_id)) for obj_type, hex_id in batch] + return [ + (bundle_type, hashutil.hash_to_bytes(hex_id)) for bundle_type, hex_id in batch + ] class VaultBackend: @@ -109,7 +111,7 @@ @db_transaction() def progress( self, - obj_type: str, + bundle_type: str, obj_id: ObjectId, raise_notfound: bool = True, db=None, @@ -122,57 +124,57 @@ ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s""", - (obj_type, obj_id), + (bundle_type, obj_id), ) res = cur.fetchone() if not res: if raise_notfound: - raise NotFoundExc(f"{obj_type} {hex_id} was not found.") + raise NotFoundExc(f"{bundle_type} {hex_id} was not found.") return None res["object_id"] = hashutil.hash_to_hex(res["object_id"]) return res - def _send_task(self, obj_type: str, hex_id: ObjectId): + def _send_task(self, bundle_type: str, hex_id: ObjectId): """Send a cooking task to the celery scheduler""" - task = create_oneshot_task_dict("cook-vault-bundle", obj_type, hex_id) + task = create_oneshot_task_dict("cook-vault-bundle", bundle_type, hex_id) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]["id"] @db_transaction() def create_task( - self, obj_type: str, obj_id: bytes, sticky: bool = False, db=None, cur=None + self, bundle_type: str, obj_id: bytes, sticky: bool = False, db=None, cur=None ): """Create and send a cooking task""" hex_id, obj_id = self._compute_ids(obj_id) - cooker_class = get_cooker_cls(obj_type) - cooker = cooker_class(obj_type, hex_id, backend=self, storage=self.storage) + cooker_class = get_cooker_cls(bundle_type) + cooker = cooker_class(bundle_type, hex_id, backend=self, storage=self.storage) if not cooker.check_exists(): - raise NotFoundExc(f"{obj_type} {hex_id} was not found.") + raise NotFoundExc(f"{bundle_type} {hex_id} was not found.") cur.execute( """ INSERT INTO vault_bundle (type, object_id, sticky) VALUES (%s, %s, %s)""", - (obj_type, obj_id, sticky), + (bundle_type, obj_id, sticky), ) db.conn.commit() - task_id = self._send_task(obj_type, hex_id) + task_id = self._send_task(bundle_type, hex_id) cur.execute( """ UPDATE vault_bundle SET task_id = %s WHERE type = %s AND object_id = %s""", - (task_id, obj_type, obj_id), + (task_id, bundle_type, obj_id), ) @db_transaction() def add_notif_email( - self, obj_type: str, obj_id: bytes, email: str, db=None, cur=None + self, bundle_type: str, obj_id: bytes, email: str, db=None, cur=None ): """Add an e-mail address to notify when a given bundle is ready""" cur.execute( @@ -180,18 +182,18 @@ INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))""", - (email, obj_type, obj_id), + (email, bundle_type, obj_id), ) - def put_bundle(self, obj_type: str, obj_id: ObjectId, bundle) -> bool: + def put_bundle(self, bundle_type: str, obj_id: ObjectId, bundle) -> bool: _, obj_id = self._compute_ids(obj_id) - self.cache.add(obj_type, obj_id, bundle) + self.cache.add(bundle_type, obj_id, bundle) return True @db_transaction() def cook( self, - obj_type: str, + bundle_type: str, obj_id: ObjectId, *, sticky: bool = False, @@ -200,36 +202,36 @@ cur=None, ) -> Dict[str, Any]: hex_id, obj_id = self._compute_ids(obj_id) - info = self.progress(obj_type, obj_id, raise_notfound=False) + info = self.progress(bundle_type, obj_id, raise_notfound=False) - if obj_type not in COOKER_TYPES: - raise NotFoundExc(f"{obj_type} is an unknown type.") + if bundle_type not in COOKER_TYPES: + raise NotFoundExc(f"{bundle_type} is an unknown type.") # If there's a failed bundle entry, delete it first. if info is not None and info["task_status"] == "failed": obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( "DELETE FROM vault_bundle WHERE type = %s AND object_id = %s", - (obj_type, obj_id), + (bundle_type, obj_id), ) db.conn.commit() info = None # If there's no bundle entry, create the task. if info is None: - self.create_task(obj_type, obj_id, sticky) + self.create_task(bundle_type, obj_id, sticky) if email is not None: # If the task is already done, send the email directly if info is not None and info["task_status"] == "done": self.send_notification( - None, email, obj_type, hex_id, info["task_status"] + None, email, bundle_type, hex_id, info["task_status"] ) # Else, add it to the notification queue else: - self.add_notif_email(obj_type, obj_id, email) + self.add_notif_email(bundle_type, obj_id, email) - return self.progress(obj_type, obj_id) + return self.progress(bundle_type, obj_id) @db_transaction() def batch_cook( @@ -239,9 +241,9 @@ # psycopg2 >= 2.7 (only available on postgresql servers) from psycopg2.extras import execute_values - for obj_type, _ in batch: - if obj_type not in COOKER_TYPES: - raise NotFoundExc(f"{obj_type} is an unknown type.") + for bundle_type, _ in batch: + if bundle_type not in COOKER_TYPES: + raise NotFoundExc(f"{bundle_type} is an unknown type.") cur.execute( """ @@ -299,7 +301,8 @@ # Send the tasks args_batch = [ - (obj_type, hashutil.hash_to_hex(obj_id)) for obj_type, obj_id in batch_new + (bundle_type, hashutil.hash_to_hex(obj_id)) + for bundle_type, obj_id in batch_new ] # TODO: change once the scheduler handles priority tasks tasks = [ @@ -309,8 +312,8 @@ added_tasks = self.scheduler.create_tasks(tasks) tasks_ids_bundle_ids = [ - (task_id, obj_type, obj_id) - for task_id, (obj_type, obj_id) in zip( + (task_id, bundle_type, obj_id) + for task_id, (bundle_type, obj_id) in zip( [task["id"] for task in added_tasks], batch_new ) ] @@ -357,44 +360,44 @@ return res @db_transaction() - def is_available(self, obj_type: str, obj_id: ObjectId, db=None, cur=None): + def is_available(self, bundle_type: str, obj_id: ObjectId, db=None, cur=None): """Check whether a bundle is available for retrieval""" - info = self.progress(obj_type, obj_id, raise_notfound=False, cur=cur) + info = self.progress(bundle_type, obj_id, raise_notfound=False, cur=cur) obj_id = hashutil.hash_to_bytes(obj_id) return ( info is not None and info["task_status"] == "done" - and self.cache.is_cached(obj_type, obj_id) + and self.cache.is_cached(bundle_type, obj_id) ) @db_transaction() def fetch( - self, obj_type: str, obj_id: ObjectId, raise_notfound=True, db=None, cur=None + self, bundle_type: str, obj_id: ObjectId, raise_notfound=True, db=None, cur=None ) -> Optional[bytes]: """Retrieve a bundle from the cache""" hex_id, obj_id = self._compute_ids(obj_id) - available = self.is_available(obj_type, obj_id, cur=cur) + available = self.is_available(bundle_type, obj_id, cur=cur) if not available: if raise_notfound: - raise NotFoundExc(f"{obj_type} {hex_id} is not available.") + raise NotFoundExc(f"{bundle_type} {hex_id} is not available.") return None - self.update_access_ts(obj_type, obj_id, cur=cur) - return self.cache.get(obj_type, obj_id) + self.update_access_ts(bundle_type, obj_id, cur=cur) + return self.cache.get(bundle_type, obj_id) @db_transaction() - def update_access_ts(self, obj_type: str, obj_id: bytes, db=None, cur=None): + def update_access_ts(self, bundle_type: str, obj_id: bytes, db=None, cur=None): """Update the last access timestamp of a bundle""" cur.execute( """ UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s""", - (obj_type, obj_id), + (bundle_type, obj_id), ) @db_transaction() def set_status( - self, obj_type: str, obj_id: ObjectId, status: str, db=None, cur=None + self, bundle_type: str, obj_id: ObjectId, status: str, db=None, cur=None ) -> bool: obj_id = hashutil.hash_to_bytes(obj_id) req = ( @@ -404,12 +407,12 @@ + (""", ts_done = NOW() """ if status == "done" else "") + """WHERE type = %s AND object_id = %s""" ) - cur.execute(req, (status, obj_type, obj_id)) + cur.execute(req, (status, bundle_type, obj_id)) return True @db_transaction() def set_progress( - self, obj_type: str, obj_id: ObjectId, progress: str, db=None, cur=None + self, bundle_type: str, obj_id: ObjectId, progress: str, db=None, cur=None ) -> bool: obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( @@ -417,12 +420,12 @@ UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s""", - (progress, obj_type, obj_id), + (progress, bundle_type, obj_id), ) return True @db_transaction() - def send_notif(self, obj_type: str, obj_id: ObjectId, db=None, cur=None) -> bool: + def send_notif(self, bundle_type: str, obj_id: ObjectId, db=None, cur=None) -> bool: hex_id, obj_id = self._compute_ids(obj_id) cur.execute( """ @@ -430,13 +433,13 @@ FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s""", - (obj_type, obj_id), + (bundle_type, obj_id), ) for d in cur: self.send_notification( d["id"], d["email"], - obj_type, + bundle_type, hex_id, status=d["task_status"], progress_msg=d["progress_msg"], @@ -448,7 +451,7 @@ self, n_id: Optional[int], email: str, - obj_type: str, + bundle_type: str, hex_id: str, status: str, progress_msg: Optional[str] = None, @@ -465,24 +468,24 @@ # the table # * use this url for the notification e-mail url = "https://archive.softwareheritage.org/api/1/vault/{}/{}/" "raw".format( - obj_type, hex_id + bundle_type, hex_id ) if status == "done": text = NOTIF_EMAIL_BODY_SUCCESS.strip() - text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) + text = text.format(bundle_type=bundle_type, hex_id=hex_id, url=url) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_SUCCESS.format( - obj_type=obj_type, short_id=short_id + bundle_type=bundle_type, short_id=short_id ) elif status == "failed": text = NOTIF_EMAIL_BODY_FAILURE.strip() text = text.format( - obj_type=obj_type, hex_id=hex_id, progress_msg=progress_msg + bundle_type=bundle_type, hex_id=hex_id, progress_msg=progress_msg ) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_FAILURE.format( - obj_type=obj_type, short_id=short_id + bundle_type=bundle_type, short_id=short_id ) else: raise RuntimeError( diff --git a/swh/vault/cache.py b/swh/vault/cache.py --- a/swh/vault/cache.py +++ b/swh/vault/cache.py @@ -18,30 +18,30 @@ def __init__(self, **objstorage): self.objstorage = get_objstorage(**objstorage) - def add(self, obj_type, obj_id, content): - sid = self._get_internal_id(obj_type, obj_id) + def add(self, bundle_type, obj_id, content): + sid = self._get_internal_id(bundle_type, obj_id) return self.objstorage.add(content, sid) - def get(self, obj_type, obj_id): - sid = self._get_internal_id(obj_type, obj_id) + def get(self, bundle_type, obj_id): + sid = self._get_internal_id(bundle_type, obj_id) return self.objstorage.get(hashutil.hash_to_bytes(sid)) - def delete(self, obj_type, obj_id): - sid = self._get_internal_id(obj_type, obj_id) + def delete(self, bundle_type, obj_id): + sid = self._get_internal_id(bundle_type, obj_id) return self.objstorage.delete(hashutil.hash_to_bytes(sid)) - def add_stream(self, obj_type, obj_id, content_iter): - sid = self._get_internal_id(obj_type, obj_id) + def add_stream(self, bundle_type, obj_id, content_iter): + sid = self._get_internal_id(bundle_type, obj_id) return self.objstorage.add_stream(content_iter, sid) - def get_stream(self, obj_type, obj_id): - sid = self._get_internal_id(obj_type, obj_id) + def get_stream(self, bundle_type, obj_id): + sid = self._get_internal_id(bundle_type, obj_id) return self.objstorage.get_stream(hashutil.hash_to_bytes(sid)) - def is_cached(self, obj_type, obj_id): - sid = self._get_internal_id(obj_type, obj_id) + def is_cached(self, bundle_type, obj_id): + sid = self._get_internal_id(bundle_type, obj_id) return hashutil.hash_to_bytes(sid) in self.objstorage - def _get_internal_id(self, obj_type, obj_id): + def _get_internal_id(self, bundle_type, obj_id): obj_id = hashutil.hash_to_hex(obj_id) - return compute_hash("{}:{}".format(obj_type, obj_id).encode()) + return compute_hash("{}:{}".format(bundle_type, obj_id).encode()) diff --git a/swh/vault/cli.py b/swh/vault/cli.py --- a/swh/vault/cli.py +++ b/swh/vault/cli.py @@ -118,7 +118,7 @@ objstorage = get_objstorage(**conf["objstorage"]) if "objstorage" in conf else None cooker_cls = get_cooker_cls(cooker_name) cooker = cooker_cls( - obj_type=cooker_name, + bundle_type=cooker_name, obj_id=swhid.object_id, backend=backend, storage=storage, @@ -133,6 +133,9 @@ except ObjNotFoundError: bundle = None if bundle is None: + import pdb + + pdb.set_trace() raise click.ClickException("Cooker did not write a bundle to the backend.") outfile.write(bundle) diff --git a/swh/vault/cookers/__init__.py b/swh/vault/cookers/__init__.py --- a/swh/vault/cookers/__init__.py +++ b/swh/vault/cookers/__init__.py @@ -28,8 +28,8 @@ } -def get_cooker_cls(obj_type): - return COOKER_TYPES[obj_type] +def get_cooker_cls(bundle_type): + return COOKER_TYPES[bundle_type] def check_config(cfg: Dict[str, Any]) -> Dict[str, Any]: @@ -67,11 +67,11 @@ return cfg -def get_cooker(obj_type: str, obj_id: str): - """Instantiate a cooker class of type obj_type. +def get_cooker(bundle_type: str, obj_id: str): + """Instantiate a cooker class of type bundle_type. Returns: - Cooker class in charge of cooking the obj_type with id obj_id. + Cooker class in charge of cooking the bundle_type with id obj_id. Raises: ValueError in case of a missing top-level vault key configuration or a storage @@ -83,7 +83,7 @@ cfg = read_config(os.environ["SWH_CONFIG_FILENAME"], DEFAULT_CONFIG) else: cfg = load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG) - cooker_cls = get_cooker_cls(obj_type) + cooker_cls = get_cooker_cls(bundle_type) cfg = check_config(cfg) vcfg = cfg["vault"] @@ -104,7 +104,7 @@ graph = None return cooker_cls( - obj_type, + bundle_type, obj_id, backend=backend, storage=storage, diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -64,7 +64,7 @@ def __init__( self, - obj_type: str, + bundle_type: str, obj_id: Sha1Git, backend, storage: StorageInterface, @@ -79,13 +79,13 @@ own cooker class. Args: - obj_type: type of the object to be cooked into a bundle (directory, + bundle_type: type of the object to be cooked into a bundle (directory, revision_flat or revision_gitfast; see swh.vault.cooker.COOKER_TYPES). obj_id: id of the object to be cooked into a bundle. backend: the vault backend (swh.vault.backend.VaultBackend). """ - self.obj_type = obj_type + self.bundle_type = bundle_type self.obj_id = hashutil.hash_to_bytes(obj_id) self.backend = backend self.storage = storage @@ -119,8 +119,8 @@ def cook(self): """Cook the requested object into a bundle """ - self.backend.set_status(self.obj_type, self.obj_id, "pending") - self.backend.set_progress(self.obj_type, self.obj_id, "Processing...") + self.backend.set_status(self.bundle_type, self.obj_id, "pending") + self.backend.set_progress(self.bundle_type, self.obj_id, "Processing...") self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) try: @@ -134,18 +134,19 @@ # TODO: use proper content streaming instead of put_bundle() self.backend.put_bundle(self.cache_type_key(), self.obj_id, bundle) except PolicyError as e: - self.backend.set_status(self.obj_type, self.obj_id, "failed") - self.backend.set_progress(self.obj_type, self.obj_id, str(e)) + logging.info("Bundle cooking violated policy: %s", e) + self.backend.set_status(self.bundle_type, self.obj_id, "failed") + self.backend.set_progress(self.bundle_type, self.obj_id, str(e)) except Exception: - self.backend.set_status(self.obj_type, self.obj_id, "failed") + self.backend.set_status(self.bundle_type, self.obj_id, "failed") self.backend.set_progress( - self.obj_type, + self.bundle_type, self.obj_id, "Internal Server Error. This incident will be reported.", ) logging.exception("Bundle cooking failed.") else: - self.backend.set_status(self.obj_type, self.obj_id, "done") - self.backend.set_progress(self.obj_type, self.obj_id, None) + self.backend.set_status(self.bundle_type, self.obj_id, "done") + self.backend.set_progress(self.bundle_type, self.obj_id, None) finally: - self.backend.send_notif(self.obj_type, self.obj_id) + self.backend.send_notif(self.bundle_type, self.obj_id) diff --git a/swh/vault/cookers/git_bare.py b/swh/vault/cookers/git_bare.py --- a/swh/vault/cookers/git_bare.py +++ b/swh/vault/cookers/git_bare.py @@ -60,10 +60,10 @@ use_fsck = True def cache_type_key(self) -> str: - return self.obj_type + return self.bundle_type def check_exists(self): - obj_type = self.obj_type.split("_")[0] + obj_type = self.bundle_type.split("_")[0] if obj_type == "revision": return not list(self.storage.revision_missing([self.obj_id])) elif obj_type == "directory": @@ -74,7 +74,7 @@ raise NotImplementedError(f"GitBareCooker for {obj_type}") def obj_swhid(self) -> identifiers.CoreSWHID: - obj_type = self.obj_type.split("_")[0] + obj_type = self.bundle_type.split("_")[0] return identifiers.CoreSWHID( object_type=identifiers.ObjectType[obj_type.upper()], object_id=self.obj_id, ) @@ -112,7 +112,7 @@ self.init_git() # Add the root object to the stack of objects to visit - self.push_subgraph(self.obj_type.split("_")[0], self.obj_id) + self.push_subgraph(self.bundle_type.split("_")[0], self.obj_id) # Load and write all the objects to disk self.load_objects() @@ -173,7 +173,7 @@ def write_refs(self, snapshot=None): refs: Dict[bytes, bytes] # ref name -> target - obj_type = self.obj_type.split("_")[0] + obj_type = self.bundle_type.split("_")[0] if obj_type == "directory": # We need a synthetic revision pointing to the directory author = Person.from_fullname( diff --git a/swh/vault/cookers/revision_gitfast.py b/swh/vault/cookers/revision_gitfast.py --- a/swh/vault/cookers/revision_gitfast.py +++ b/swh/vault/cookers/revision_gitfast.py @@ -58,7 +58,7 @@ if last_progress_report is None or last_progress_report + 2 <= ct: last_progress_report = ct pg = "Computing revision {}/{}".format(i, len(self.log)) - self.backend.set_progress(self.obj_type, self.obj_id, pg) + self.backend.set_progress(self.bundle_type, self.obj_id, pg) # Compute the current commit self._compute_commit_command(rev) diff --git a/swh/vault/cooking_tasks.py b/swh/vault/cooking_tasks.py --- a/swh/vault/cooking_tasks.py +++ b/swh/vault/cooking_tasks.py @@ -9,13 +9,13 @@ @app.task(name=__name__ + ".SWHCookingTask") -def cook_bundle(obj_type, obj_id): +def cook_bundle(bundle_type, obj_id): """Main task to cook a bundle.""" - get_cooker(obj_type, obj_id).cook() + get_cooker(bundle_type, obj_id).cook() # TODO: remove once the scheduler handles priority tasks @app.task(name=__name__ + ".SWHBatchCookingTask") -def batch_cook_bundle(obj_type, obj_id): +def batch_cook_bundle(bundle_type, obj_id): """Temporary task for the batch queue.""" - get_cooker(obj_type, obj_id).cook() + get_cooker(bundle_type, obj_id).cook() diff --git a/swh/vault/in_memory_backend.py b/swh/vault/in_memory_backend.py --- a/swh/vault/in_memory_backend.py +++ b/swh/vault/in_memory_backend.py @@ -18,30 +18,30 @@ def __init__(self): self._cache = VaultCache(cls="memory") - def fetch(self, obj_type: str, obj_id: ObjectId) -> Optional[bytes]: - return self._cache.get(obj_type, hash_to_bytes(obj_id)) + def fetch(self, bundle_type: str, obj_id: ObjectId) -> Optional[bytes]: + return self._cache.get(bundle_type, hash_to_bytes(obj_id)) def cook( - self, obj_type: str, obj_id: ObjectId, email: Optional[str] = None + self, bundle_type: str, obj_id: ObjectId, email: Optional[str] = None ) -> Dict[str, Any]: raise NotImplementedError("InMemoryVaultBackend.cook()") - def progress(self, obj_type: str, obj_id: ObjectId): + def progress(self, bundle_type: str, obj_id: ObjectId): raise NotImplementedError("InMemoryVaultBackend.progress()") # Cookers endpoints - def set_progress(self, obj_type: str, obj_id: ObjectId, progress: str) -> None: + def set_progress(self, bundle_type: str, obj_id: ObjectId, progress: str) -> None: pass - def set_status(self, obj_type: str, obj_id: ObjectId, status: str) -> None: + def set_status(self, bundle_type: str, obj_id: ObjectId, status: str) -> None: pass - def put_bundle(self, obj_type: str, obj_id: ObjectId, bundle) -> bool: - self._cache.add(obj_type, hash_to_bytes(obj_id), bundle) + def put_bundle(self, bundle_type: str, obj_id: ObjectId, bundle) -> bool: + self._cache.add(bundle_type, hash_to_bytes(obj_id), bundle) return True - def send_notif(self, obj_type: str, obj_id: ObjectId): + def send_notif(self, bundle_type: str, obj_id: ObjectId): pass # Batch endpoints diff --git a/swh/vault/interface.py b/swh/vault/interface.py --- a/swh/vault/interface.py +++ b/swh/vault/interface.py @@ -19,41 +19,41 @@ """ @remote_api_endpoint("fetch") - def fetch(self, obj_type: str, obj_id: ObjectId) -> Optional[bytes]: + def fetch(self, bundle_type: str, obj_id: ObjectId) -> Optional[bytes]: """Fetch information from a bundle""" ... @remote_api_endpoint("cook") def cook( - self, obj_type: str, obj_id: ObjectId, email: Optional[str] = None + self, bundle_type: str, obj_id: ObjectId, email: Optional[str] = None ) -> Dict[str, Any]: """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" ... @remote_api_endpoint("progress") - def progress(self, obj_type: str, obj_id: ObjectId): + def progress(self, bundle_type: str, obj_id: ObjectId): ... # Cookers endpoints @remote_api_endpoint("set_progress") - def set_progress(self, obj_type: str, obj_id: ObjectId, progress: str) -> None: + def set_progress(self, bundle_type: str, obj_id: ObjectId, progress: str) -> None: """Set the cooking progress of a bundle""" ... @remote_api_endpoint("set_status") - def set_status(self, obj_type: str, obj_id: ObjectId, status: str) -> bool: + def set_status(self, bundle_type: str, obj_id: ObjectId, status: str) -> bool: """Set the cooking status of a bundle""" ... @remote_api_endpoint("put_bundle") - def put_bundle(self, obj_type: str, obj_id: ObjectId, bundle): + def put_bundle(self, bundle_type: str, obj_id: ObjectId, bundle): """Store bundle in vault cache""" ... @remote_api_endpoint("send_notif") - def send_notif(self, obj_type: str, obj_id: ObjectId): + def send_notif(self, bundle_type: str, obj_id: ObjectId): """Send all the e-mails in the notification list of a bundle""" ... diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -41,20 +41,20 @@ assert creation_delta_secs < tolerance_secs -def fake_cook(backend, obj_type, result_content, sticky=False): +def fake_cook(backend, bundle_type, result_content, sticky=False): content, obj_id = hash_content(result_content) with mock_cooking(backend): - backend.create_task(obj_type, obj_id, sticky) - backend.cache.add(obj_type, obj_id, b"content") - backend.set_status(obj_type, obj_id, "done") + backend.create_task(bundle_type, obj_id, sticky) + backend.cache.add(bundle_type, obj_id, b"content") + backend.set_status(bundle_type, obj_id, "done") return obj_id, content -def fail_cook(backend, obj_type, obj_id, failure_reason): +def fail_cook(backend, bundle_type, obj_id, failure_reason): with mock_cooking(backend): - backend.create_task(obj_type, obj_id) - backend.set_status(obj_type, obj_id, "failed") - backend.set_progress(obj_type, obj_id, failure_reason) + backend.create_task(bundle_type, obj_id) + backend.set_status(bundle_type, obj_id, "failed") + backend.set_progress(bundle_type, obj_id, failure_reason) TEST_TYPE = "revision_gitfast" diff --git a/swh/vault/tests/test_cli.py b/swh/vault/tests/test_cli.py --- a/swh/vault/tests/test_cli.py +++ b/swh/vault/tests/test_cli.py @@ -47,10 +47,10 @@ @pytest.mark.parametrize( - "obj_type,cooker_name_suffix,swhid_type", + "bundle_type,cooker_name_suffix,swhid_type", [("directory", "", "dir"), ("revision", "gitfast", "rev"),], ) -def test_cook_directory(obj_type, cooker_name_suffix, swhid_type, mocker): +def test_cook_directory(bundle_type, cooker_name_suffix, swhid_type, mocker): storage = object() mocker.patch("swh.storage.get_storage", return_value=storage) @@ -92,7 +92,9 @@ raise result.exception cooker_cls.assert_called_once_with( - obj_type=f"{obj_type}_{cooker_name_suffix}" if cooker_name_suffix else obj_type, + bundle_type=f"{bundle_type}_{cooker_name_suffix}" + if cooker_name_suffix + else bundle_type, obj_id=b"\x00" * 20, backend=backend, storage=storage, diff --git a/swh/vault/tests/test_cookers_base.py b/swh/vault/tests/test_cookers_base.py --- a/swh/vault/tests/test_cookers_base.py +++ b/swh/vault/tests/test_cookers_base.py @@ -24,7 +24,7 @@ self.config = {} self.storage = MagicMock() self.backend = MagicMock() - self.obj_type = self.CACHE_TYPE_KEY + self.bundle_type = self.CACHE_TYPE_KEY self.obj_id = hashutil.hash_to_bytes(TEST_OBJ_ID) self.max_bundle_size = 1024 diff --git a/swh/vault/tests/test_server.py b/swh/vault/tests/test_server.py --- a/swh/vault/tests/test_server.py +++ b/swh/vault/tests/test_server.py @@ -78,7 +78,7 @@ async def test_client_cook_notfound(cli): resp = await cli.post( "/cook", - data=json_dumps({"obj_type": "directory", "obj_id": TEST_HEX_ID}), + data=json_dumps({"bundle_type": "directory", "obj_id": TEST_HEX_ID}), headers=[("Content-Type", "application/json")], ) assert resp.status == 400 @@ -90,7 +90,7 @@ async def test_client_progress_notfound(cli): resp = await cli.post( "/progress", - data=json_dumps({"obj_type": "directory", "obj_id": TEST_HEX_ID}), + data=json_dumps({"bundle_type": "directory", "obj_id": TEST_HEX_ID}), headers=[("Content-Type", "application/json")], ) assert resp.status == 400