diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py --- a/swh/vault/api/client.py +++ b/swh/vault/api/client.py @@ -7,9 +7,13 @@ from swh.vault.exc import NotFoundExc from swh.vault.interface import VaultInterface +from .serializers import DECODERS, ENCODERS + class RemoteVaultClient(RPCClient): """Client to the Software Heritage vault cache.""" backend_class = VaultInterface reraise_exceptions = [NotFoundExc] + extra_type_decoders = DECODERS + extra_type_encoders = ENCODERS diff --git a/swh/vault/api/serializers.py b/swh/vault/api/serializers.py new file mode 100644 --- /dev/null +++ b/swh/vault/api/serializers.py @@ -0,0 +1,17 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Callable, Dict, List, Tuple + +from swh.model.identifiers import CoreSWHID + +ENCODERS: List[Tuple[type, str, Callable]] = [ + (CoreSWHID, "core_swhid", str), +] + + +DECODERS: Dict[str, Callable] = { + "core_swhid": CoreSWHID.from_string, +} diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -17,12 +17,13 @@ from swh.vault.backend import NotFoundExc from swh.vault.interface import VaultInterface +from .serializers import DECODERS, ENCODERS + # do not define default services here DEFAULT_CONFIG = { "client_max_size": 1024 ** 3, } - vault = None app = None @@ -37,6 +38,8 @@ class VaultServerApp(RPCServerApp): client_exception_classes = (NotFoundExc,) + extra_type_decoders = DECODERS + extra_type_encoders = ENCODERS @asyncio.coroutine diff --git a/swh/vault/backend.py b/swh/vault/backend.py --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -13,14 +13,13 @@ from swh.core.db import BaseDb from swh.core.db.common import db_transaction -from swh.model import hashutil +from swh.model.identifiers import CoreSWHID from swh.scheduler import get_scheduler from swh.scheduler.utils import create_oneshot_task_dict from swh.storage import get_storage from swh.vault.cache import VaultCache from swh.vault.cookers import COOKER_TYPES, get_cooker_cls from swh.vault.exc import NotFoundExc -from swh.vault.interface import ObjectId cooking_task_name = "swh.vault.cooking_tasks.SWHCookingTask" @@ -33,7 +32,7 @@ Vault: Bundle Type: {bundle_type} -Object ID: {hex_id} +Object SWHID: {swhid} This bundle is now available for download at the following address: @@ -51,7 +50,7 @@ Vault: Bundle Type: {bundle_type} -Object ID: {hex_id} +Object SWHID: {swhid} This bundle could not be cooked for the following reason: @@ -64,12 +63,6 @@ """ -def batch_to_bytes(batch: List[Tuple[str, str]]) -> List[Tuple[str, bytes]]: - return [ - (bundle_type, hashutil.hash_to_bytes(hex_id)) for bundle_type, hex_id in batch - ] - - class VaultBackend: """ Backend for the Software Heritage Vault. @@ -100,138 +93,128 @@ if db is not self._db: db.put_conn() - def _compute_ids(self, obj_id: ObjectId) -> Tuple[str, bytes]: - """Internal method to reconcile multiple possible inputs - - """ - if isinstance(obj_id, str): - return obj_id, hashutil.hash_to_bytes(obj_id) - return hashutil.hash_to_hex(obj_id), obj_id - @db_transaction() def progress( self, bundle_type: str, - obj_id: ObjectId, + swhid: CoreSWHID, raise_notfound: bool = True, db=None, cur=None, ) -> Optional[Dict[str, Any]]: - hex_id, obj_id = self._compute_ids(obj_id) cur.execute( """ - SELECT id, type, object_id, task_id, task_status, sticky, + SELECT id, type, swhid, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle - WHERE type = %s AND object_id = %s""", - (bundle_type, obj_id), + WHERE type = %s AND swhid = %s""", + (bundle_type, str(swhid)), ) res = cur.fetchone() if not res: if raise_notfound: - raise NotFoundExc(f"{bundle_type} {hex_id} was not found.") + raise NotFoundExc(f"{bundle_type} {swhid} was not found.") return None - - res["object_id"] = hashutil.hash_to_hex(res["object_id"]) + res["swhid"] = CoreSWHID.from_string(res["swhid"]) return res - def _send_task(self, bundle_type: str, hex_id: ObjectId): + def _send_task(self, bundle_type: str, swhid: CoreSWHID): """Send a cooking task to the celery scheduler""" - task = create_oneshot_task_dict("cook-vault-bundle", bundle_type, hex_id) + task = create_oneshot_task_dict("cook-vault-bundle", bundle_type, str(swhid)) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]["id"] @db_transaction() def create_task( - self, bundle_type: str, obj_id: bytes, sticky: bool = False, db=None, cur=None + self, + bundle_type: str, + swhid: CoreSWHID, + sticky: bool = False, + db=None, + cur=None, ): """Create and send a cooking task""" - hex_id, obj_id = self._compute_ids(obj_id) - - cooker_class = get_cooker_cls(bundle_type) - cooker = cooker_class(bundle_type, hex_id, backend=self, storage=self.storage) + cooker_class = get_cooker_cls(bundle_type, swhid.object_type) + cooker = cooker_class(swhid, backend=self, storage=self.storage) if not cooker.check_exists(): - raise NotFoundExc(f"{bundle_type} {hex_id} was not found.") + raise NotFoundExc(f"{bundle_type} {swhid} was not found.") cur.execute( """ - INSERT INTO vault_bundle (type, object_id, sticky) + INSERT INTO vault_bundle (type, swhid, sticky) VALUES (%s, %s, %s)""", - (bundle_type, obj_id, sticky), + (bundle_type, str(swhid), sticky), ) db.conn.commit() - task_id = self._send_task(bundle_type, hex_id) + task_id = self._send_task(bundle_type, swhid) cur.execute( """ UPDATE vault_bundle SET task_id = %s - WHERE type = %s AND object_id = %s""", - (task_id, bundle_type, obj_id), + WHERE type = %s AND swhid = %s""", + (task_id, bundle_type, str(swhid)), ) @db_transaction() def add_notif_email( - self, bundle_type: str, obj_id: bytes, email: str, db=None, cur=None + self, bundle_type: str, swhid: CoreSWHID, email: str, db=None, cur=None ): """Add an e-mail address to notify when a given bundle is ready""" cur.execute( """ INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle - WHERE type = %s AND object_id = %s))""", - (email, bundle_type, obj_id), + WHERE type = %s AND swhid = %s))""", + (email, bundle_type, str(swhid)), ) - def put_bundle(self, bundle_type: str, obj_id: ObjectId, bundle) -> bool: - _, obj_id = self._compute_ids(obj_id) - self.cache.add(bundle_type, obj_id, bundle) + def put_bundle(self, bundle_type: str, swhid: CoreSWHID, bundle) -> bool: + self.cache.add(bundle_type, swhid, bundle) return True @db_transaction() def cook( self, bundle_type: str, - obj_id: ObjectId, + swhid: CoreSWHID, *, sticky: bool = False, email: Optional[str] = None, db=None, cur=None, ) -> Dict[str, Any]: - hex_id, obj_id = self._compute_ids(obj_id) - info = self.progress(bundle_type, obj_id, raise_notfound=False) + info = self.progress(bundle_type, swhid, raise_notfound=False) if bundle_type not in COOKER_TYPES: raise NotFoundExc(f"{bundle_type} is an unknown type.") # If there's a failed bundle entry, delete it first. if info is not None and info["task_status"] == "failed": - obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( - "DELETE FROM vault_bundle WHERE type = %s AND object_id = %s", - (bundle_type, obj_id), + "DELETE FROM vault_bundle WHERE type = %s AND swhid = %s", + (bundle_type, str(swhid)), ) db.conn.commit() info = None # If there's no bundle entry, create the task. if info is None: - self.create_task(bundle_type, obj_id, sticky) + self.create_task(bundle_type, swhid, sticky) if email is not None: # If the task is already done, send the email directly if info is not None and info["task_status"] == "done": self.send_notification( - None, email, bundle_type, hex_id, info["task_status"] + None, email, bundle_type, swhid, info["task_status"] ) # Else, add it to the notification queue else: - self.add_notif_email(bundle_type, obj_id, email) + self.add_notif_email(bundle_type, swhid, email) - return self.progress(bundle_type, obj_id) + return self.progress(bundle_type, swhid) @db_transaction() def batch_cook( @@ -252,32 +235,31 @@ RETURNING id""" ) batch_id = cur.fetchone()["id"] - batch_bytes = batch_to_bytes(batch) # Delete all failed bundles from the batch cur.execute( """ DELETE FROM vault_bundle WHERE task_status = 'failed' - AND (type, object_id) IN %s""", - (tuple(batch_bytes),), + AND (type, swhid) IN %s""", + (tuple(batch),), ) # Insert all the bundles, return the new ones execute_values( cur, """ - INSERT INTO vault_bundle (type, object_id) + INSERT INTO vault_bundle (type, swhid) VALUES %s ON CONFLICT DO NOTHING""", - batch_bytes, + batch, ) # Get the bundle ids and task status cur.execute( """ - SELECT id, type, object_id, task_id FROM vault_bundle - WHERE (type, object_id) IN %s""", - (tuple(batch_bytes),), + SELECT id, type, swhid, task_id FROM vault_bundle + WHERE (type, swhid) IN %s""", + (tuple(batch),), ) bundles = cur.fetchall() @@ -294,16 +276,13 @@ # Get the tasks to fetch batch_new = [ - (row["type"], bytes(row["object_id"])) + (row["type"], CoreSWHID.from_string(row["swhid"])) for row in bundles if row["task_id"] is None ] # Send the tasks - args_batch = [ - (bundle_type, hashutil.hash_to_hex(obj_id)) - for bundle_type, obj_id in batch_new - ] + args_batch = [(bundle_type, swhid) for bundle_type, swhid in batch_new] # TODO: change once the scheduler handles priority tasks tasks = [ create_oneshot_task_dict("swh-vault-batch-cooking", *args) @@ -312,8 +291,8 @@ added_tasks = self.scheduler.create_tasks(tasks) tasks_ids_bundle_ids = [ - (task_id, bundle_type, obj_id) - for task_id, (bundle_type, obj_id) in zip( + (task_id, bundle_type, swhid) + for task_id, (bundle_type, swhid) in zip( [task["id"] for task in added_tasks], batch_new ) ] @@ -324,8 +303,8 @@ """ UPDATE vault_bundle SET task_id = s_task_id - FROM (VALUES %s) AS sub (s_task_id, s_type, s_object_id) - WHERE type = s_type::cook_type AND object_id = s_object_id """, + FROM (VALUES %s) AS sub (s_task_id, s_type, s_swhid) + WHERE type = s_type::cook_type AND swhid = s_swhid """, tasks_ids_bundle_ids, ) return {"id": batch_id} @@ -335,7 +314,7 @@ cur.execute( """ SELECT vault_bundle.id as id, - type, object_id, task_id, task_status, sticky, + type, swhid, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_batch_bundle LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id @@ -347,7 +326,7 @@ raise NotFoundExc(f"Batch {batch_id} does not exist.") for bundle in bundles: - bundle["object_id"] = hashutil.hash_to_hex(bundle["object_id"]) + bundle["swhid"] = CoreSWHID.from_string(bundle["swhid"]) counter = collections.Counter(b["status"] for b in bundles) res = { @@ -360,87 +339,82 @@ return res @db_transaction() - def is_available(self, bundle_type: str, obj_id: ObjectId, db=None, cur=None): + def is_available(self, bundle_type: str, swhid: CoreSWHID, db=None, cur=None): """Check whether a bundle is available for retrieval""" - info = self.progress(bundle_type, obj_id, raise_notfound=False, cur=cur) - obj_id = hashutil.hash_to_bytes(obj_id) + info = self.progress(bundle_type, swhid, raise_notfound=False, cur=cur) return ( info is not None and info["task_status"] == "done" - and self.cache.is_cached(bundle_type, obj_id) + and self.cache.is_cached(bundle_type, swhid) ) @db_transaction() def fetch( - self, bundle_type: str, obj_id: ObjectId, raise_notfound=True, db=None, cur=None + self, bundle_type: str, swhid: CoreSWHID, raise_notfound=True, db=None, cur=None ) -> Optional[bytes]: """Retrieve a bundle from the cache""" - hex_id, obj_id = self._compute_ids(obj_id) - available = self.is_available(bundle_type, obj_id, cur=cur) + available = self.is_available(bundle_type, swhid, cur=cur) if not available: if raise_notfound: - raise NotFoundExc(f"{bundle_type} {hex_id} is not available.") + raise NotFoundExc(f"{bundle_type} {swhid} is not available.") return None - self.update_access_ts(bundle_type, obj_id, cur=cur) - return self.cache.get(bundle_type, obj_id) + self.update_access_ts(bundle_type, swhid, cur=cur) + return self.cache.get(bundle_type, swhid) @db_transaction() - def update_access_ts(self, bundle_type: str, obj_id: bytes, db=None, cur=None): + def update_access_ts(self, bundle_type: str, swhid: CoreSWHID, db=None, cur=None): """Update the last access timestamp of a bundle""" cur.execute( """ UPDATE vault_bundle SET ts_last_access = NOW() - WHERE type = %s AND object_id = %s""", - (bundle_type, obj_id), + WHERE type = %s AND swhid = %s""", + (bundle_type, str(swhid)), ) @db_transaction() def set_status( - self, bundle_type: str, obj_id: ObjectId, status: str, db=None, cur=None + self, bundle_type: str, swhid: CoreSWHID, status: str, db=None, cur=None ) -> bool: - obj_id = hashutil.hash_to_bytes(obj_id) req = ( """ UPDATE vault_bundle SET task_status = %s """ + (""", ts_done = NOW() """ if status == "done" else "") - + """WHERE type = %s AND object_id = %s""" + + """WHERE type = %s AND swhid = %s""" ) - cur.execute(req, (status, bundle_type, obj_id)) + cur.execute(req, (status, bundle_type, str(swhid))) return True @db_transaction() def set_progress( - self, bundle_type: str, obj_id: ObjectId, progress: str, db=None, cur=None + self, bundle_type: str, swhid: CoreSWHID, progress: str, db=None, cur=None ) -> bool: - obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( """ UPDATE vault_bundle SET progress_msg = %s - WHERE type = %s AND object_id = %s""", - (progress, bundle_type, obj_id), + WHERE type = %s AND swhid = %s""", + (progress, bundle_type, str(swhid)), ) return True @db_transaction() - def send_notif(self, bundle_type: str, obj_id: ObjectId, db=None, cur=None) -> bool: - hex_id, obj_id = self._compute_ids(obj_id) + def send_notif(self, bundle_type: str, swhid: CoreSWHID, db=None, cur=None) -> bool: cur.execute( """ SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id - WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s""", - (bundle_type, obj_id), + WHERE vault_bundle.type = %s AND vault_bundle.swhid = %s""", + (bundle_type, str(swhid)), ) for d in cur: self.send_notification( d["id"], d["email"], bundle_type, - hex_id, + swhid, status=d["task_status"], progress_msg=d["progress_msg"], ) @@ -452,14 +426,14 @@ n_id: Optional[int], email: str, bundle_type: str, - hex_id: str, + swhid: CoreSWHID, status: str, progress_msg: Optional[str] = None, db=None, cur=None, ) -> None: """Send the notification of a bundle to a specific e-mail""" - short_id = hex_id[:7] + short_id = swhid.object_id.hex()[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table @@ -468,12 +442,12 @@ # the table # * use this url for the notification e-mail url = "https://archive.softwareheritage.org/api/1/vault/{}/{}/" "raw".format( - bundle_type, hex_id + bundle_type, swhid ) if status == "done": text = NOTIF_EMAIL_BODY_SUCCESS.strip() - text = text.format(bundle_type=bundle_type, hex_id=hex_id, url=url) + text = text.format(bundle_type=bundle_type, swhid=swhid, url=url) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_SUCCESS.format( bundle_type=bundle_type, short_id=short_id @@ -481,7 +455,7 @@ elif status == "failed": text = NOTIF_EMAIL_BODY_FAILURE.strip() text = text.format( - bundle_type=bundle_type, hex_id=hex_id, progress_msg=progress_msg + bundle_type=bundle_type, swhid=swhid, progress_msg=progress_msg ) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_FAILURE.format( @@ -530,7 +504,7 @@ WHERE sticky = false {} ) - RETURNING type, object_id + RETURNING type, swhid """.format( cond ), @@ -538,7 +512,7 @@ ) for d in cur: - self.cache.delete(d["type"], bytes(d["object_id"])) + self.cache.delete(d["type"], CoreSWHID.from_string(d["swhid"])) @db_transaction() def cache_expire_oldest(self, n=1, by="last_access", db=None, cur=None) -> None: diff --git a/swh/vault/cache.py b/swh/vault/cache.py --- a/swh/vault/cache.py +++ b/swh/vault/cache.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information from swh.model import hashutil +from swh.model.identifiers import CoreSWHID from swh.objstorage.factory import get_objstorage from swh.objstorage.objstorage import compute_hash @@ -11,37 +12,36 @@ class VaultCache: """The Vault cache is an object storage that stores Vault bundles. - This implementation computes sha1(':') as the + This implementation computes sha1(':') as the internal identifiers used in the underlying objstorage. """ def __init__(self, **objstorage): self.objstorage = get_objstorage(**objstorage) - def add(self, bundle_type, obj_id, content): - sid = self._get_internal_id(bundle_type, obj_id) + def add(self, bundle_type, swhid: CoreSWHID, content): + sid = self._get_internal_id(bundle_type, swhid) return self.objstorage.add(content, sid) - def get(self, bundle_type, obj_id): - sid = self._get_internal_id(bundle_type, obj_id) + def get(self, bundle_type, swhid: CoreSWHID): + sid = self._get_internal_id(bundle_type, swhid) return self.objstorage.get(hashutil.hash_to_bytes(sid)) - def delete(self, bundle_type, obj_id): - sid = self._get_internal_id(bundle_type, obj_id) + def delete(self, bundle_type, swhid: CoreSWHID): + sid = self._get_internal_id(bundle_type, swhid) return self.objstorage.delete(hashutil.hash_to_bytes(sid)) - def add_stream(self, bundle_type, obj_id, content_iter): - sid = self._get_internal_id(bundle_type, obj_id) + def add_stream(self, bundle_type, swhid: CoreSWHID, content_iter): + sid = self._get_internal_id(bundle_type, swhid) return self.objstorage.add_stream(content_iter, sid) - def get_stream(self, bundle_type, obj_id): - sid = self._get_internal_id(bundle_type, obj_id) + def get_stream(self, bundle_type, swhid: CoreSWHID): + sid = self._get_internal_id(bundle_type, swhid) return self.objstorage.get_stream(hashutil.hash_to_bytes(sid)) - def is_cached(self, bundle_type, obj_id): - sid = self._get_internal_id(bundle_type, obj_id) + def is_cached(self, bundle_type, swhid: CoreSWHID): + sid = self._get_internal_id(bundle_type, swhid) return hashutil.hash_to_bytes(sid) in self.objstorage - def _get_internal_id(self, bundle_type, obj_id): - obj_id = hashutil.hash_to_hex(obj_id) - return compute_hash("{}:{}".format(bundle_type, obj_id).encode()) + def _get_internal_id(self, bundle_type, swhid: CoreSWHID): + return compute_hash("{}:{}".format(bundle_type, swhid).encode()) diff --git a/swh/vault/cli.py b/swh/vault/cli.py --- a/swh/vault/cli.py +++ b/swh/vault/cli.py @@ -52,7 +52,7 @@ @click.argument("swhid", type=SwhidParamType()) @click.argument("outfile", type=click.File("wb")) @click.option( - "--cooker-type", + "--bundle-type", type=click.Choice(["flat", "gitfast", "git_bare"]), help="Selects which cooker to use, when there is more than one available " "for the given object type.", @@ -63,44 +63,23 @@ config_file: str, swhid: CoreSWHID, outfile: io.RawIOBase, - cooker_type: Optional[str], + bundle_type: Optional[str], ): """ Runs a vault cooker for a single object (identified by a SWHID), and outputs it to the given file. """ from swh.core import config + from swh.model.identifiers import ObjectType from swh.objstorage.exc import ObjNotFoundError from swh.objstorage.factory import get_objstorage from swh.storage import get_storage - from .cookers import COOKER_TYPES, get_cooker_cls + from .cookers import get_cooker_cls from .in_memory_backend import InMemoryVaultBackend conf = config.read(config_file) - supported_object_types = {name.split("_")[0] for name in COOKER_TYPES} - if swhid.object_type.name.lower() not in supported_object_types: - raise click.ClickException( - f"No cooker available for {swhid.object_type.name} objects." - ) - - cooker_name = swhid.object_type.name.lower() - - if cooker_type: - cooker_name = f"{cooker_name}_{cooker_type}" - if cooker_name not in COOKER_TYPES: - raise click.ClickException( - f"{swhid.object_type.name.lower()} objects do not have " - f"a {cooker_type} cooker." - ) - else: - if cooker_name not in COOKER_TYPES: - raise click.ClickException( - f"{swhid.object_type.name.lower()} objects need " - f"an explicit --cooker-type." - ) - try: from swh.graph.client import RemoteGraphClient # optional dependency @@ -114,12 +93,27 @@ graph = None backend = InMemoryVaultBackend() + + if bundle_type is None: + if swhid.object_type in (ObjectType.RELEASE, ObjectType.SNAPSHOT,): + bundle_type = "git_bare" + elif swhid.object_type in (ObjectType.DIRECTORY,): + bundle_type = "flat" + else: + raise click.ClickException( + "No default bundle type for this kind of object, " + "use --bundle-type to choose one" + ) + + try: + cooker_cls = get_cooker_cls(bundle_type, swhid.object_type) + except ValueError as e: + raise click.ClickException(*e.args) + storage = get_storage(**conf["storage"]) objstorage = get_objstorage(**conf["objstorage"]) if "objstorage" in conf else None - cooker_cls = get_cooker_cls(cooker_name) cooker = cooker_cls( - bundle_type=cooker_name, - obj_id=swhid.object_id, + swhid=swhid, backend=backend, storage=storage, graph=graph, @@ -129,7 +123,7 @@ cooker.cook() try: - bundle = backend.fetch(cooker_name, swhid.object_id) + bundle = backend.fetch(cooker_cls.BUNDLE_TYPE, swhid) except ObjNotFoundError: bundle = None if bundle is None: diff --git a/swh/vault/cookers/__init__.py b/swh/vault/cookers/__init__.py --- a/swh/vault/cookers/__init__.py +++ b/swh/vault/cookers/__init__.py @@ -6,30 +6,49 @@ from __future__ import annotations import os -from typing import Any, Dict +from typing import Any, Dict, List, Type from swh.core.config import load_named_config from swh.core.config import read as read_config +from swh.model.identifiers import CoreSWHID, ObjectType from swh.storage import get_storage from swh.vault import get_vault -from swh.vault.cookers.base import DEFAULT_CONFIG, DEFAULT_CONFIG_PATH +from swh.vault.cookers.base import DEFAULT_CONFIG, DEFAULT_CONFIG_PATH, BaseVaultCooker from swh.vault.cookers.directory import DirectoryCooker from swh.vault.cookers.git_bare import GitBareCooker from swh.vault.cookers.revision_flat import RevisionFlatCooker from swh.vault.cookers.revision_gitfast import RevisionGitfastCooker -COOKER_TYPES = { - "directory": DirectoryCooker, - "revision_flat": RevisionFlatCooker, - "revision_gitfast": RevisionGitfastCooker, - "snapshot_git_bare": GitBareCooker, - "revision_git_bare": GitBareCooker, - "directory_git_bare": GitBareCooker, -} +_COOKER_CLS: List[Type[BaseVaultCooker]] = [ + DirectoryCooker, + RevisionFlatCooker, + RevisionGitfastCooker, + GitBareCooker, +] +COOKER_TYPES: Dict[str, List[Type[BaseVaultCooker]]] = {} -def get_cooker_cls(bundle_type): - return COOKER_TYPES[bundle_type] +for _cooker_cls in _COOKER_CLS: + COOKER_TYPES.setdefault(_cooker_cls.BUNDLE_TYPE, []).append(_cooker_cls) + + +def get_cooker_cls(bundle_type: str, object_type: ObjectType): + cookers = COOKER_TYPES.get(bundle_type) + + if not cookers: + raise ValueError(f"{bundle_type} is not a valid bundle type.") + + for cooker in cookers: + try: + cooker.check_object_type(object_type) + except ValueError: + pass + else: + return cooker + + raise ValueError( + f"{object_type.name.lower()} objects do not have a {bundle_type} cooker" + ) def check_config(cfg: Dict[str, Any]) -> Dict[str, Any]: @@ -67,11 +86,11 @@ return cfg -def get_cooker(bundle_type: str, obj_id: str): +def get_cooker(bundle_type: str, swhid: CoreSWHID): """Instantiate a cooker class of type bundle_type. Returns: - Cooker class in charge of cooking the bundle_type with id obj_id. + Cooker class in charge of cooking the bundle_type with id swhid. Raises: ValueError in case of a missing top-level vault key configuration or a storage @@ -83,7 +102,7 @@ cfg = read_config(os.environ["SWH_CONFIG_FILENAME"], DEFAULT_CONFIG) else: cfg = load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG) - cooker_cls = get_cooker_cls(bundle_type) + cooker_cls = get_cooker_cls(bundle_type, swhid.object_type) cfg = check_config(cfg) vcfg = cfg["vault"] @@ -104,8 +123,7 @@ graph = None return cooker_cls( - bundle_type, - obj_id, + swhid, backend=backend, storage=storage, graph=graph, diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -6,12 +6,11 @@ import abc import io import logging -from typing import Optional +from typing import ClassVar, Set from psycopg2.extensions import QueryCanceledError -from swh.model import hashutil -from swh.model.model import Sha1Git +from swh.model.identifiers import CoreSWHID, ObjectType from swh.storage.interface import StorageInterface MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB @@ -60,12 +59,12 @@ - def cook(): cook the object into a bundle """ - CACHE_TYPE_KEY = None # type: Optional[str] + SUPPORTED_OBJECT_TYPES: ClassVar[Set[ObjectType]] + BUNDLE_TYPE: ClassVar[str] def __init__( self, - bundle_type: str, - obj_id: Sha1Git, + swhid: CoreSWHID, backend, storage: StorageInterface, graph=None, @@ -79,20 +78,23 @@ own cooker class. Args: - bundle_type: type of the object to be cooked into a bundle (directory, - revision_flat or revision_gitfast; see - swh.vault.cooker.COOKER_TYPES). - obj_id: id of the object to be cooked into a bundle. + swhid: id of the object to be cooked into a bundle. backend: the vault backend (swh.vault.backend.VaultBackend). """ - self.bundle_type = bundle_type - self.obj_id = hashutil.hash_to_bytes(obj_id) + self.check_object_type(swhid.object_type) + self.swhid = swhid + self.obj_id = swhid.object_id self.backend = backend self.storage = storage self.objstorage = objstorage self.graph = graph self.max_bundle_size = max_bundle_size + @classmethod + def check_object_type(cls, object_type: ObjectType) -> None: + if object_type not in cls.SUPPORTED_OBJECT_TYPES: + raise ValueError(f"{cls.__name__} does not support {object_type} objects.") + @abc.abstractmethod def check_exists(self): """Checks that the requested object exists and can be cooked. @@ -110,8 +112,8 @@ raise NotImplementedError def cache_type_key(self) -> str: - assert self.CACHE_TYPE_KEY - return self.CACHE_TYPE_KEY + assert self.BUNDLE_TYPE + return self.BUNDLE_TYPE def write(self, chunk): self.fileobj.write(chunk) @@ -119,8 +121,8 @@ def cook(self): """Cook the requested object into a bundle """ - self.backend.set_status(self.bundle_type, self.obj_id, "pending") - self.backend.set_progress(self.bundle_type, self.obj_id, "Processing...") + self.backend.set_status(self.BUNDLE_TYPE, self.swhid, "pending") + self.backend.set_progress(self.BUNDLE_TYPE, self.swhid, "Processing...") self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) try: @@ -132,21 +134,21 @@ ) bundle = self.fileobj.getvalue() # TODO: use proper content streaming instead of put_bundle() - self.backend.put_bundle(self.cache_type_key(), self.obj_id, bundle) + self.backend.put_bundle(self.cache_type_key(), self.swhid, bundle) except PolicyError as e: logging.info("Bundle cooking violated policy: %s", e) - self.backend.set_status(self.bundle_type, self.obj_id, "failed") - self.backend.set_progress(self.bundle_type, self.obj_id, str(e)) + self.backend.set_status(self.BUNDLE_TYPE, self.swhid, "failed") + self.backend.set_progress(self.BUNDLE_TYPE, self.swhid, str(e)) except Exception: - self.backend.set_status(self.bundle_type, self.obj_id, "failed") + self.backend.set_status(self.BUNDLE_TYPE, self.swhid, "failed") self.backend.set_progress( - self.bundle_type, - self.obj_id, + self.BUNDLE_TYPE, + self.swhid, "Internal Server Error. This incident will be reported.", ) logging.exception("Bundle cooking failed.") else: - self.backend.set_status(self.bundle_type, self.obj_id, "done") - self.backend.set_progress(self.bundle_type, self.obj_id, None) + self.backend.set_status(self.BUNDLE_TYPE, self.swhid, "done") + self.backend.set_progress(self.BUNDLE_TYPE, self.swhid, None) finally: - self.backend.send_notif(self.bundle_type, self.obj_id) + self.backend.send_notif(self.BUNDLE_TYPE, self.swhid) diff --git a/swh/vault/cookers/directory.py b/swh/vault/cookers/directory.py --- a/swh/vault/cookers/directory.py +++ b/swh/vault/cookers/directory.py @@ -6,7 +6,7 @@ import tarfile import tempfile -from swh.model import hashutil +from swh.model.identifiers import ObjectType from swh.vault.cookers.base import BaseVaultCooker from swh.vault.to_disk import DirectoryBuilder @@ -14,7 +14,8 @@ class DirectoryCooker(BaseVaultCooker): """Cooker to create a directory bundle """ - CACHE_TYPE_KEY = "directory" + BUNDLE_TYPE = "flat" + SUPPORTED_OBJECT_TYPES = {ObjectType.DIRECTORY} def check_exists(self): return not list(self.storage.directory_missing([self.obj_id])) @@ -24,4 +25,4 @@ directory_builder = DirectoryBuilder(self.storage, td.encode(), self.obj_id) directory_builder.build() with tarfile.open(fileobj=self.fileobj, mode="w:gz") as tar: - tar.add(td, arcname=hashutil.hash_to_hex(self.obj_id)) + tar.add(td, arcname=str(self.swhid)) diff --git a/swh/vault/cookers/git_bare.py b/swh/vault/cookers/git_bare.py --- a/swh/vault/cookers/git_bare.py +++ b/swh/vault/cookers/git_bare.py @@ -76,16 +76,18 @@ class GitBareCooker(BaseVaultCooker): + BUNDLE_TYPE = "git_bare" + SUPPORTED_OBJECT_TYPES = { + identifiers.ObjectType[obj_type.name] for obj_type in RootObjectType + } + use_fsck = True obj_type: RootObjectType def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.obj_type = RootObjectType(self.bundle_type.split("_")[0]) - - def cache_type_key(self) -> str: - return self.bundle_type + self.obj_type = RootObjectType[self.swhid.object_type.name] def check_exists(self) -> bool: if self.obj_type is RootObjectType.REVISION: @@ -97,12 +99,6 @@ else: assert_never(self.obj_type, f"Unexpected root object type: {self.obj_type}") - def obj_swhid(self) -> identifiers.CoreSWHID: - return identifiers.CoreSWHID( - object_type=identifiers.ObjectType[self.obj_type.name], - object_id=self.obj_id, - ) - def _push(self, stack: List[Sha1Git], obj_ids: Iterable[Sha1Git]) -> None: assert not isinstance(obj_ids, bytes) revision_ids = [id_ for id_ in obj_ids if id_ not in self._seen] @@ -250,7 +246,7 @@ def write_archive(self): with tarfile.TarFile(mode="w", fileobj=self.fileobj) as tf: - tf.add(self.gitdir, arcname=f"{self.obj_swhid()}.git", recursive=True) + tf.add(self.gitdir, arcname=f"{self.swhid}.git", recursive=True) def _obj_path(self, obj_id: Sha1Git): return os.path.join(self.gitdir, self._obj_relative_path(obj_id)) @@ -470,20 +466,15 @@ """Given a list of release ids, loads these releases and adds their target to the list of objects to visit""" for release in self.load_releases(obj_ids): + assert release.target, "{release.swhid(}) has no target" if release.target_type is ObjectType.REVISION: - assert release.target, "{release.swhid(}) has no target" self.push_revision_subgraph(release.target) elif release.target_type is ObjectType.DIRECTORY: - assert release.target, "{release.swhid(}) has no target" self._push(self._dir_stack, [release.target]) elif release.target_type is ObjectType.CONTENT: - raise NotImplementedError( - f"{release.swhid()} targets a content: {release.target!r}" - ) + self._push(self._cnt_stack, [release.target]) elif release.target_type is ObjectType.RELEASE: - raise NotImplementedError( - f"{release.swhid()} targets another release: {release.target!r}" - ) + self.push_releases_subgraphs([release.target]) elif release.target_type is ObjectType.SNAPSHOT: raise NotImplementedError( f"{release.swhid()} targets a snapshot: {release.target!r}" diff --git a/swh/vault/cookers/revision_flat.py b/swh/vault/cookers/revision_flat.py --- a/swh/vault/cookers/revision_flat.py +++ b/swh/vault/cookers/revision_flat.py @@ -8,6 +8,7 @@ import tempfile from swh.model import hashutil +from swh.model.identifiers import ObjectType from swh.vault.cookers.base import BaseVaultCooker from swh.vault.cookers.utils import revision_log from swh.vault.to_disk import DirectoryBuilder @@ -16,15 +17,16 @@ class RevisionFlatCooker(BaseVaultCooker): """Cooker to create a revision_flat bundle """ - CACHE_TYPE_KEY = "revision_flat" + BUNDLE_TYPE = "flat" + SUPPORTED_OBJECT_TYPES = {ObjectType.REVISION} def check_exists(self): - return not list(self.storage.revision_missing([self.obj_id])) + return not list(self.storage.revision_missing([self.swhid.object_id])) def prepare_bundle(self): with tempfile.TemporaryDirectory(prefix="tmp-vault-revision-") as td: root = Path(td) - for revision in revision_log(self.storage, self.obj_id): + for revision in revision_log(self.storage, self.swhid.object_id): revdir = root / hashutil.hash_to_hex(revision["id"]) revdir.mkdir() directory_builder = DirectoryBuilder( @@ -32,4 +34,4 @@ ) directory_builder.build() with tarfile.open(fileobj=self.fileobj, mode="w:gz") as tar: - tar.add(td, arcname=hashutil.hash_to_hex(self.obj_id)) + tar.add(td, arcname=self.swhid) diff --git a/swh/vault/cookers/revision_gitfast.py b/swh/vault/cookers/revision_gitfast.py --- a/swh/vault/cookers/revision_gitfast.py +++ b/swh/vault/cookers/revision_gitfast.py @@ -18,6 +18,7 @@ from swh.model import hashutil from swh.model.from_disk import DentryPerms, mode_to_perms +from swh.model.identifiers import ObjectType from swh.model.toposort import toposort from swh.vault.cookers.base import BaseVaultCooker from swh.vault.cookers.utils import revision_log @@ -27,7 +28,8 @@ class RevisionGitfastCooker(BaseVaultCooker): """Cooker to create a git fast-import bundle """ - CACHE_TYPE_KEY = "revision_gitfast" + BUNDLE_TYPE = "gitfast" + SUPPORTED_OBJECT_TYPES = {ObjectType.REVISION} def check_exists(self): return not list(self.storage.revision_missing([self.obj_id])) @@ -58,7 +60,7 @@ if last_progress_report is None or last_progress_report + 2 <= ct: last_progress_report = ct pg = "Computing revision {}/{}".format(i, len(self.log)) - self.backend.set_progress(self.bundle_type, self.obj_id, pg) + self.backend.set_progress(self.BUNDLE_TYPE, self.swhid, pg) # Compute the current commit self._compute_commit_command(rev) diff --git a/swh/vault/cooking_tasks.py b/swh/vault/cooking_tasks.py --- a/swh/vault/cooking_tasks.py +++ b/swh/vault/cooking_tasks.py @@ -1,21 +1,22 @@ -# Copyright (C) 2016-2017 The Software Heritage developers +# Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import current_app as app +from swh.model.identifiers import CoreSWHID from swh.vault.cookers import get_cooker @app.task(name=__name__ + ".SWHCookingTask") -def cook_bundle(bundle_type, obj_id): +def cook_bundle(bundle_type, swhid): """Main task to cook a bundle.""" - get_cooker(bundle_type, obj_id).cook() + get_cooker(bundle_type, CoreSWHID.from_string(swhid)).cook() # TODO: remove once the scheduler handles priority tasks @app.task(name=__name__ + ".SWHBatchCookingTask") -def batch_cook_bundle(bundle_type, obj_id): +def batch_cook_bundle(bundle_type, swhid): """Temporary task for the batch queue.""" - get_cooker(bundle_type, obj_id).cook() + get_cooker(bundle_type, CoreSWHID.from_string(swhid)).cook() diff --git a/swh/vault/in_memory_backend.py b/swh/vault/in_memory_backend.py --- a/swh/vault/in_memory_backend.py +++ b/swh/vault/in_memory_backend.py @@ -3,14 +3,12 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple -from swh.model.hashutil import hash_to_bytes +from swh.model.identifiers import CoreSWHID from .cache import VaultCache -ObjectId = Union[str, bytes] - class InMemoryVaultBackend: """Stub vault backend, for use in the CLI.""" @@ -18,30 +16,30 @@ def __init__(self): self._cache = VaultCache(cls="memory") - def fetch(self, bundle_type: str, obj_id: ObjectId) -> Optional[bytes]: - return self._cache.get(bundle_type, hash_to_bytes(obj_id)) + def fetch(self, bundle_type: str, swhid: CoreSWHID) -> Optional[bytes]: + return self._cache.get(bundle_type, swhid) def cook( - self, bundle_type: str, obj_id: ObjectId, email: Optional[str] = None + self, bundle_type: str, swhid: CoreSWHID, email: Optional[str] = None ) -> Dict[str, Any]: raise NotImplementedError("InMemoryVaultBackend.cook()") - def progress(self, bundle_type: str, obj_id: ObjectId): + def progress(self, bundle_type: str, swhid: CoreSWHID): raise NotImplementedError("InMemoryVaultBackend.progress()") # Cookers endpoints - def set_progress(self, bundle_type: str, obj_id: ObjectId, progress: str) -> None: + def set_progress(self, bundle_type: str, swhid: CoreSWHID, progress: str) -> None: pass - def set_status(self, bundle_type: str, obj_id: ObjectId, status: str) -> None: + def set_status(self, bundle_type: str, swhid: CoreSWHID, status: str) -> None: pass - def put_bundle(self, bundle_type: str, obj_id: ObjectId, bundle) -> bool: - self._cache.add(bundle_type, hash_to_bytes(obj_id), bundle) + def put_bundle(self, bundle_type: str, swhid: CoreSWHID, bundle) -> bool: + self._cache.add(bundle_type, swhid, bundle) return True - def send_notif(self, bundle_type: str, obj_id: ObjectId): + def send_notif(self, bundle_type: str, swhid: CoreSWHID): pass # Batch endpoints diff --git a/swh/vault/interface.py b/swh/vault/interface.py --- a/swh/vault/interface.py +++ b/swh/vault/interface.py @@ -3,13 +3,12 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple from typing_extensions import Protocol, runtime_checkable from swh.core.api import remote_api_endpoint - -ObjectId = Union[str, bytes] +from swh.model.identifiers import CoreSWHID @runtime_checkable @@ -19,41 +18,41 @@ """ @remote_api_endpoint("fetch") - def fetch(self, bundle_type: str, obj_id: ObjectId) -> Optional[bytes]: + def fetch(self, bundle_type: str, swhid: CoreSWHID) -> Optional[bytes]: """Fetch information from a bundle""" ... @remote_api_endpoint("cook") def cook( - self, bundle_type: str, obj_id: ObjectId, email: Optional[str] = None + self, bundle_type: str, swhid: CoreSWHID, email: Optional[str] = None ) -> Dict[str, Any]: """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" ... @remote_api_endpoint("progress") - def progress(self, bundle_type: str, obj_id: ObjectId): + def progress(self, bundle_type: str, swhid: CoreSWHID): ... # Cookers endpoints @remote_api_endpoint("set_progress") - def set_progress(self, bundle_type: str, obj_id: ObjectId, progress: str) -> None: + def set_progress(self, bundle_type: str, swhid: CoreSWHID, progress: str) -> None: """Set the cooking progress of a bundle""" ... @remote_api_endpoint("set_status") - def set_status(self, bundle_type: str, obj_id: ObjectId, status: str) -> bool: + def set_status(self, bundle_type: str, swhid: CoreSWHID, status: str) -> bool: """Set the cooking status of a bundle""" ... @remote_api_endpoint("put_bundle") - def put_bundle(self, bundle_type: str, obj_id: ObjectId, bundle): + def put_bundle(self, bundle_type: str, swhid: CoreSWHID, bundle): """Store bundle in vault cache""" ... @remote_api_endpoint("send_notif") - def send_notif(self, bundle_type: str, obj_id: ObjectId): + def send_notif(self, bundle_type: str, swhid: CoreSWHID): """Send all the e-mails in the notification list of a bundle""" ... diff --git a/swh/vault/sql/30-schema.sql b/swh/vault/sql/30-schema.sql --- a/swh/vault/sql/30-schema.sql +++ b/swh/vault/sql/30-schema.sql @@ -6,12 +6,12 @@ ); comment on table dbversion is 'Schema update tracking'; insert into dbversion (version, release, description) - values (1, now(), 'Initial version'); + values (4, now(), 'Initial version'); create domain obj_hash as bytea; -create type cook_type as enum ('directory', 'revision_gitfast'); -comment on type cook_type is 'Type of the requested bundle'; +create type bundle_type as enum ('flat', 'gitfast', 'git_bare'); +comment on type bundle_type is 'Type of the requested bundle'; create type cook_status as enum ('new', 'pending', 'done', 'failed'); comment on type cook_status is 'Status of the cooking'; @@ -19,8 +19,8 @@ create table vault_bundle ( id bigserial primary key, - type cook_type not null, -- requested cooking type - object_id obj_hash not null, -- requested object ID + type bundle_type not null, + swhid text not null, -- requested object ID task_id integer, -- scheduler task id task_status cook_status not null default 'new', -- status of the task @@ -32,8 +32,8 @@ progress_msg text -- progress message ); -create unique index concurrently vault_bundle_type_object - on vault_bundle (type, object_id); +create unique index concurrently vault_bundle_type_swhid + on vault_bundle (type, swhid); create index concurrently vault_bundle_task_id on vault_bundle (task_id); diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -11,7 +11,8 @@ import psycopg2 import pytest -from swh.model import hashutil +from swh.model.identifiers import CoreSWHID +from swh.model.model import Content from swh.vault.exc import NotFoundExc from swh.vault.tests.vault_testing import hash_content @@ -42,26 +43,26 @@ def fake_cook(backend, bundle_type, result_content, sticky=False): + swhid = Content.from_data(result_content).swhid() content, obj_id = hash_content(result_content) with mock_cooking(backend): - backend.create_task(bundle_type, obj_id, sticky) - backend.cache.add(bundle_type, obj_id, b"content") - backend.set_status(bundle_type, obj_id, "done") - return obj_id, content + backend.create_task(bundle_type, swhid, sticky) + backend.cache.add(bundle_type, swhid, b"content") + backend.set_status(bundle_type, swhid, "done") + return swhid, content -def fail_cook(backend, bundle_type, obj_id, failure_reason): +def fail_cook(backend, bundle_type, swhid, failure_reason): with mock_cooking(backend): - backend.create_task(bundle_type, obj_id) - backend.set_status(bundle_type, obj_id, "failed") - backend.set_progress(bundle_type, obj_id, failure_reason) + backend.create_task(bundle_type, swhid) + backend.set_status(bundle_type, swhid, "failed") + backend.set_progress(bundle_type, swhid, failure_reason) -TEST_TYPE = "revision_gitfast" -TEST_HEX_ID = "4a4b9771542143cf070386f86b4b92d42966bdbc" -TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) +TEST_TYPE = "gitfast" +TEST_SWHID = CoreSWHID.from_string("swh:1:rev:4a4b9771542143cf070386f86b4b92d42966bdbc") TEST_PROGRESS = ( - "Mr. White, You're telling me you're cooking again?" " \N{ASTONISHED FACE} " + "Mr. White, You're telling me you're cooking again? \N{ASTONISHED FACE} " ) TEST_EMAIL = "ouiche@lorraine.fr" @@ -69,30 +70,29 @@ @pytest.fixture def swh_vault(swh_vault, sample_data): # make the vault's storage consistent with test data - revision = attr.evolve(sample_data.revision, id=TEST_OBJ_ID) + revision = attr.evolve(sample_data.revision, id=TEST_SWHID.object_id) swh_vault.storage.revision_add([revision]) return swh_vault def test_create_task_simple(swh_vault): with mock_cooking(swh_vault) as m: - swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) + swh_vault.create_task(TEST_TYPE, TEST_SWHID) - m["get_cooker_cls"].assert_called_once_with(TEST_TYPE) + m["get_cooker_cls"].assert_called_once_with(TEST_TYPE, TEST_SWHID.object_type) args = m["cooker_cls"].call_args[0] - assert args[0] == TEST_TYPE - assert args[1] == TEST_HEX_ID + assert args[0] == TEST_SWHID assert m["cooker"].check_exists.call_count == 1 assert m["_send_task"].call_count == 1 args = m["_send_task"].call_args[0] assert args[0] == TEST_TYPE - assert args[1] == TEST_HEX_ID + assert args[1] == TEST_SWHID - info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) - assert info["object_id"] == TEST_HEX_ID + info = swh_vault.progress(TEST_TYPE, TEST_SWHID) + assert info["swhid"] == TEST_SWHID assert info["type"] == TEST_TYPE assert info["task_status"] == "new" assert info["task_id"] == 42 @@ -105,63 +105,63 @@ def test_create_fail_duplicate_task(swh_vault): with mock_cooking(swh_vault): - swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) + swh_vault.create_task(TEST_TYPE, TEST_SWHID) with pytest.raises(psycopg2.IntegrityError): - swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) + swh_vault.create_task(TEST_TYPE, TEST_SWHID) def test_create_fail_nonexisting_object(swh_vault): with mock_cooking(swh_vault) as m: m["cooker"].check_exists.side_effect = ValueError("Nothing here.") with pytest.raises(ValueError): - swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) + swh_vault.create_task(TEST_TYPE, TEST_SWHID) def test_create_set_progress(swh_vault): with mock_cooking(swh_vault): - swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) + swh_vault.create_task(TEST_TYPE, TEST_SWHID) - info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) + info = swh_vault.progress(TEST_TYPE, TEST_SWHID) assert info["progress_msg"] is None - swh_vault.set_progress(TEST_TYPE, TEST_HEX_ID, TEST_PROGRESS) - info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) + swh_vault.set_progress(TEST_TYPE, TEST_SWHID, TEST_PROGRESS) + info = swh_vault.progress(TEST_TYPE, TEST_SWHID) assert info["progress_msg"] == TEST_PROGRESS def test_create_set_status(swh_vault): with mock_cooking(swh_vault): - swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) + swh_vault.create_task(TEST_TYPE, TEST_SWHID) - info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) + info = swh_vault.progress(TEST_TYPE, TEST_SWHID) assert info["task_status"] == "new" assert info["ts_done"] is None - swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "pending") - info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) + swh_vault.set_status(TEST_TYPE, TEST_SWHID, "pending") + info = swh_vault.progress(TEST_TYPE, TEST_SWHID) assert info["task_status"] == "pending" assert info["ts_done"] is None - swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") - info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) + swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") + info = swh_vault.progress(TEST_TYPE, TEST_SWHID) assert info["task_status"] == "done" assertTimestampAlmostNow(info["ts_done"]) def test_create_update_access_ts(swh_vault): with mock_cooking(swh_vault): - swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + swh_vault.create_task(TEST_TYPE, TEST_SWHID) - info = swh_vault.progress(TEST_TYPE, TEST_OBJ_ID) + info = swh_vault.progress(TEST_TYPE, TEST_SWHID) access_ts_1 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_1) - swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) - info = swh_vault.progress(TEST_TYPE, TEST_OBJ_ID) + swh_vault.update_access_ts(TEST_TYPE, TEST_SWHID) + info = swh_vault.progress(TEST_TYPE, TEST_SWHID) access_ts_2 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_2) - swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) - info = swh_vault.progress(TEST_TYPE, TEST_OBJ_ID) + swh_vault.update_access_ts(TEST_TYPE, TEST_SWHID) + info = swh_vault.progress(TEST_TYPE, TEST_SWHID) access_ts_3 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_3) @@ -172,9 +172,9 @@ def test_cook_idempotent(swh_vault, sample_data): with mock_cooking(swh_vault): - info1 = swh_vault.cook(TEST_TYPE, TEST_HEX_ID) - info2 = swh_vault.cook(TEST_TYPE, TEST_HEX_ID) - info3 = swh_vault.cook(TEST_TYPE, TEST_HEX_ID) + info1 = swh_vault.cook(TEST_TYPE, TEST_SWHID) + info2 = swh_vault.cook(TEST_TYPE, TEST_SWHID) + info3 = swh_vault.cook(TEST_TYPE, TEST_SWHID) assert info1 == info2 assert info1 == info3 @@ -184,23 +184,23 @@ swh_vault, "add_notif_email" ) as madd, patch.object(swh_vault, "send_notification") as msend: - swh_vault.cook(TEST_TYPE, TEST_HEX_ID) + swh_vault.cook(TEST_TYPE, TEST_SWHID) madd.assert_not_called() msend.assert_not_called() madd.reset_mock() msend.reset_mock() - swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email=TEST_EMAIL) - madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) + swh_vault.cook(TEST_TYPE, TEST_SWHID, email=TEST_EMAIL) + madd.assert_called_once_with(TEST_TYPE, TEST_SWHID, TEST_EMAIL) msend.assert_not_called() madd.reset_mock() msend.reset_mock() - swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") - swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email=TEST_EMAIL) - msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_HEX_ID, "done") + swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") + swh_vault.cook(TEST_TYPE, TEST_SWHID, email=TEST_EMAIL) + msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_SWHID, "done") madd.assert_not_called() @@ -208,12 +208,12 @@ with mock_cooking(swh_vault): emails = ("a@example.com", "billg@example.com", "test+42@example.org") for email in emails: - swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email=email) + swh_vault.cook(TEST_TYPE, TEST_SWHID, email=email) - swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") + swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") with patch.object(swh_vault, "smtp_server") as m: - swh_vault.send_notif(TEST_TYPE, TEST_HEX_ID) + swh_vault.send_notif(TEST_TYPE, TEST_SWHID) sent_emails = {k[0][0] for k in m.send_message.call_args_list} assert {k["To"] for k in sent_emails} == set(emails) @@ -221,49 +221,49 @@ for e in sent_emails: assert "bot@softwareheritage.org" in e["From"] assert TEST_TYPE in e["Subject"] - assert TEST_HEX_ID[:5] in e["Subject"] + assert TEST_SWHID.object_id.hex()[:5] in e["Subject"] assert TEST_TYPE in str(e) assert "https://archive.softwareheritage.org/" in str(e) - assert TEST_HEX_ID[:5] in str(e) + assert TEST_SWHID.object_id.hex()[:5] in str(e) assert "--\x20\n" in str(e) # Well-formated signature!!! # Check that the entries have been deleted and recalling the # function does not re-send the e-mails m.reset_mock() - swh_vault.send_notif(TEST_TYPE, TEST_HEX_ID) + swh_vault.send_notif(TEST_TYPE, TEST_SWHID) m.assert_not_called() def test_available(swh_vault): - assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) + assert not swh_vault.is_available(TEST_TYPE, TEST_SWHID) with mock_cooking(swh_vault): - swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) - assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) + swh_vault.create_task(TEST_TYPE, TEST_SWHID) + assert not swh_vault.is_available(TEST_TYPE, TEST_SWHID) - swh_vault.cache.add(TEST_TYPE, TEST_HEX_ID, b"content") - assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) + swh_vault.cache.add(TEST_TYPE, TEST_SWHID, b"content") + assert not swh_vault.is_available(TEST_TYPE, TEST_SWHID) - swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") - assert swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) + swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done") + assert swh_vault.is_available(TEST_TYPE, TEST_SWHID) def test_fetch(swh_vault): - assert swh_vault.fetch(TEST_TYPE, TEST_HEX_ID, raise_notfound=False) is None + assert swh_vault.fetch(TEST_TYPE, TEST_SWHID, raise_notfound=False) is None with pytest.raises( - NotFoundExc, match=f"{TEST_TYPE} {TEST_HEX_ID} is not available." + NotFoundExc, match=f"{TEST_TYPE} {TEST_SWHID} is not available." ): - swh_vault.fetch(TEST_TYPE, TEST_HEX_ID) + swh_vault.fetch(TEST_TYPE, TEST_SWHID) - obj_id, content = fake_cook(swh_vault, TEST_TYPE, b"content") + swhid, content = fake_cook(swh_vault, TEST_TYPE, b"content") - info = swh_vault.progress(TEST_TYPE, obj_id) + info = swh_vault.progress(TEST_TYPE, swhid) access_ts_before = info["ts_last_access"] - assert swh_vault.fetch(TEST_TYPE, obj_id) == b"content" + assert swh_vault.fetch(TEST_TYPE, swhid) == b"content" - info = swh_vault.progress(TEST_TYPE, obj_id) + info = swh_vault.progress(TEST_TYPE, swhid) access_ts_after = info["ts_last_access"] assertTimestampAlmostNow(access_ts_after) @@ -276,8 +276,8 @@ for i in r: sticky = i == 5 content = b"content%s" % str(i).encode() - obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) - inserted[i] = (obj_id, content) + swhid, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) + inserted[i] = (swhid, content) swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) @@ -296,8 +296,8 @@ for i in r: sticky = i == 5 content = b"content%s" % str(i).encode() - obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) - inserted[i] = (obj_id, content) + swhid, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) + inserted[i] = (swhid, content) if i == 7: cutoff_date = datetime.datetime.now() @@ -314,40 +314,40 @@ def test_fail_cook_simple(swh_vault): - fail_cook(swh_vault, TEST_TYPE, TEST_HEX_ID, "error42") - assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) - info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) + fail_cook(swh_vault, TEST_TYPE, TEST_SWHID, "error42") + assert not swh_vault.is_available(TEST_TYPE, TEST_SWHID) + info = swh_vault.progress(TEST_TYPE, TEST_SWHID) assert info["progress_msg"] == "error42" def test_send_failure_email(swh_vault): with mock_cooking(swh_vault): - swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email="a@example.com") + swh_vault.cook(TEST_TYPE, TEST_SWHID, email="a@example.com") - swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "failed") - swh_vault.set_progress(TEST_TYPE, TEST_HEX_ID, "test error") + swh_vault.set_status(TEST_TYPE, TEST_SWHID, "failed") + swh_vault.set_progress(TEST_TYPE, TEST_SWHID, "test error") with patch.object(swh_vault, "smtp_server") as m: - swh_vault.send_notif(TEST_TYPE, TEST_HEX_ID) + swh_vault.send_notif(TEST_TYPE, TEST_SWHID) e = [k[0][0] for k in m.send_message.call_args_list][0] assert e["To"] == "a@example.com" assert "bot@softwareheritage.org" in e["From"] assert TEST_TYPE in e["Subject"] - assert TEST_HEX_ID[:5] in e["Subject"] + assert TEST_SWHID.object_id.hex()[:5] in e["Subject"] assert "fail" in e["Subject"] assert TEST_TYPE in str(e) - assert TEST_HEX_ID[:5] in str(e) + assert TEST_SWHID.object_id.hex()[:5] in str(e) assert "test error" in str(e) assert "--\x20\n" in str(e) # Well-formated signature def test_retry_failed_bundle(swh_vault): - fail_cook(swh_vault, TEST_TYPE, TEST_HEX_ID, "error42") - info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) + fail_cook(swh_vault, TEST_TYPE, TEST_SWHID, "error42") + info = swh_vault.progress(TEST_TYPE, TEST_SWHID) assert info["task_status"] == "failed" with mock_cooking(swh_vault): - swh_vault.cook(TEST_TYPE, TEST_HEX_ID) - info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) + swh_vault.cook(TEST_TYPE, TEST_SWHID) + info = swh_vault.progress(TEST_TYPE, TEST_SWHID) assert info["task_status"] == "new" diff --git a/swh/vault/tests/test_cache.py b/swh/vault/tests/test_cache.py --- a/swh/vault/tests/test_cache.py +++ b/swh/vault/tests/test_cache.py @@ -5,15 +5,17 @@ from swh.model import hashutil +from swh.model.identifiers import CoreSWHID TEST_TYPE_1 = "revision_gitfast" TEST_TYPE_2 = "directory" -TEST_HEX_ID_1 = "4a4b9771542143cf070386f86b4b92d42966bdbc" -TEST_HEX_ID_2 = "17a3e48bce37be5226490e750202ad3a9a1a3fe9" - -TEST_OBJ_ID_1 = hashutil.hash_to_bytes(TEST_HEX_ID_1) -TEST_OBJ_ID_2 = hashutil.hash_to_bytes(TEST_HEX_ID_2) +TEST_SWHID_1 = CoreSWHID.from_string( + "swh:1:rev:4a4b9771542143cf070386f86b4b92d42966bdbc" +) +TEST_SWHID_2 = CoreSWHID.from_string( + "swh:1:dir:17a3e48bce37be5226490e750202ad3a9a1a3fe9" +) TEST_CONTENT_1 = b"test content 1" TEST_CONTENT_2 = b"test content 2" @@ -25,38 +27,38 @@ def test_internal_id(swh_vault): - sid = swh_vault.cache._get_internal_id(TEST_TYPE_1, TEST_OBJ_ID_1) - assert hashutil.hash_to_hex(sid) == "6829cda55b54c295aa043a611a4e0320239988d9" + sid = swh_vault.cache._get_internal_id(TEST_TYPE_1, TEST_SWHID_1) + assert hashutil.hash_to_hex(sid) == "ec2a99d6b21a68648a9d0c99c5d7c35f69268564" def test_simple_add_get(swh_vault): - swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1) - assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == TEST_CONTENT_1 - assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1) + swh_vault.cache.add(TEST_TYPE_1, TEST_SWHID_1, TEST_CONTENT_1) + assert swh_vault.cache.get(TEST_TYPE_1, TEST_SWHID_1) == TEST_CONTENT_1 + assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_SWHID_1) def test_different_type_same_id(swh_vault): - swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1) - swh_vault.cache.add(TEST_TYPE_2, TEST_OBJ_ID_1, TEST_CONTENT_2) - assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == TEST_CONTENT_1 - assert swh_vault.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1) == TEST_CONTENT_2 - assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1) - assert swh_vault.cache.is_cached(TEST_TYPE_2, TEST_OBJ_ID_1) + swh_vault.cache.add(TEST_TYPE_1, TEST_SWHID_1, TEST_CONTENT_1) + swh_vault.cache.add(TEST_TYPE_2, TEST_SWHID_1, TEST_CONTENT_2) + assert swh_vault.cache.get(TEST_TYPE_1, TEST_SWHID_1) == TEST_CONTENT_1 + assert swh_vault.cache.get(TEST_TYPE_2, TEST_SWHID_1) == TEST_CONTENT_2 + assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_SWHID_1) + assert swh_vault.cache.is_cached(TEST_TYPE_2, TEST_SWHID_1) def test_different_type_same_content(swh_vault): - swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1) - swh_vault.cache.add(TEST_TYPE_2, TEST_OBJ_ID_1, TEST_CONTENT_1) - assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == TEST_CONTENT_1 - assert swh_vault.cache.get(TEST_TYPE_2, TEST_OBJ_ID_1) == TEST_CONTENT_1 - assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1) - assert swh_vault.cache.is_cached(TEST_TYPE_2, TEST_OBJ_ID_1) + swh_vault.cache.add(TEST_TYPE_1, TEST_SWHID_1, TEST_CONTENT_1) + swh_vault.cache.add(TEST_TYPE_2, TEST_SWHID_1, TEST_CONTENT_1) + assert swh_vault.cache.get(TEST_TYPE_1, TEST_SWHID_1) == TEST_CONTENT_1 + assert swh_vault.cache.get(TEST_TYPE_2, TEST_SWHID_1) == TEST_CONTENT_1 + assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_SWHID_1) + assert swh_vault.cache.is_cached(TEST_TYPE_2, TEST_SWHID_1) def test_different_id_same_type(swh_vault): - swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_1, TEST_CONTENT_1) - swh_vault.cache.add(TEST_TYPE_1, TEST_OBJ_ID_2, TEST_CONTENT_2) - assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_1) == TEST_CONTENT_1 - assert swh_vault.cache.get(TEST_TYPE_1, TEST_OBJ_ID_2) == TEST_CONTENT_2 - assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_1) - assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_OBJ_ID_2) + swh_vault.cache.add(TEST_TYPE_1, TEST_SWHID_1, TEST_CONTENT_1) + swh_vault.cache.add(TEST_TYPE_1, TEST_SWHID_2, TEST_CONTENT_2) + assert swh_vault.cache.get(TEST_TYPE_1, TEST_SWHID_1) == TEST_CONTENT_1 + assert swh_vault.cache.get(TEST_TYPE_1, TEST_SWHID_2) == TEST_CONTENT_2 + assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_SWHID_1) + assert swh_vault.cache.is_cached(TEST_TYPE_1, TEST_SWHID_2) diff --git a/swh/vault/tests/test_cli.py b/swh/vault/tests/test_cli.py --- a/swh/vault/tests/test_cli.py +++ b/swh/vault/tests/test_cli.py @@ -10,6 +10,7 @@ import click.testing import pytest +from swh.model.identifiers import CoreSWHID from swh.vault.cli import vault as vault_cli_group from swh.vault.cookers.base import BaseVaultCooker from swh.vault.in_memory_backend import InMemoryVaultBackend @@ -26,24 +27,20 @@ assert isinstance(result.exception, SystemExit) assert "expected core SWHID" in result.stdout - result = runner.invoke(vault_cli_group, ["cook", "swh:1:cnt:" + "0" * 40, "-"]) - assert isinstance(result.exception, SystemExit) - assert "No cooker available for CONTENT" in result.stdout - def test_cook_unknown_cooker(): runner = click.testing.CliRunner() result = runner.invoke( vault_cli_group, - ["cook", "swh:1:dir:" + "0" * 40, "-", "--cooker-type", "gitfast"], + ["cook", "swh:1:dir:" + "0" * 40, "-", "--bundle-type", "gitfast"], ) assert isinstance(result.exception, SystemExit) assert "do not have a gitfast cooker" in result.stdout result = runner.invoke(vault_cli_group, ["cook", "swh:1:rev:" + "0" * 40, "-"]) assert isinstance(result.exception, SystemExit) - assert "explicit --cooker-type" in result.stdout + assert "use --bundle-type" in result.stdout @pytest.mark.parametrize( @@ -66,6 +63,8 @@ runner = click.testing.CliRunner() + swhid = CoreSWHID.from_string(f"swh:1:{swhid_type}:{'0'*40}") + with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: config_fd.write('{"storage": {}}') config_fd.seek(0) @@ -78,24 +77,20 @@ "-", "-C", config_fd.name, - "--cooker-type", + "--bundle-type", cooker_name_suffix, ], ) else: result = runner.invoke( - vault_cli_group, - ["cook", f"swh:1:{swhid_type}:{'0'*40}", "-", "-C", config_fd.name], + vault_cli_group, ["cook", str(swhid), "-", "-C", config_fd.name], ) if result.exception is not None: raise result.exception cooker_cls.assert_called_once_with( - bundle_type=f"{bundle_type}_{cooker_name_suffix}" - if cooker_name_suffix - else bundle_type, - obj_id=b"\x00" * 20, + swhid=swhid, backend=backend, storage=storage, graph=None, diff --git a/swh/vault/tests/test_cookers.py b/swh/vault/tests/test_cookers.py --- a/swh/vault/tests/test_cookers.py +++ b/swh/vault/tests/test_cookers.py @@ -25,7 +25,8 @@ import pytest from swh.loader.git.from_disk import GitLoaderFromDisk -from swh.model import from_disk, hashutil, identifiers +from swh.model import from_disk, hashutil +from swh.model.identifiers import CoreSWHID, ObjectType from swh.model.model import ( Directory, DirectoryEntry, @@ -172,11 +173,11 @@ @contextlib.contextmanager -def cook_extract_directory_dircooker(storage, obj_id, fsck=True): +def cook_extract_directory_dircooker(storage, swhid, fsck=True): """Context manager that cooks a directory and extract it.""" backend = unittest.mock.MagicMock() backend.storage = storage - cooker = DirectoryCooker("directory", obj_id, backend=backend, storage=storage) + cooker = DirectoryCooker(swhid, backend=backend, storage=storage) cooker.fileobj = io.BytesIO() assert cooker.check_exists() cooker.prepare_bundle() @@ -184,12 +185,12 @@ with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: tar.extractall(td) - yield pathlib.Path(td) / hashutil.hash_to_hex(obj_id) + yield pathlib.Path(td) / str(swhid) cooker.storage = None @contextlib.contextmanager -def cook_extract_directory_gitfast(storage, obj_id, fsck=True): +def cook_extract_directory_gitfast(storage, swhid, fsck=True): """Context manager that cooks a revision containing a directory and extract it, using RevisionGitfastCooker""" test_repo = TestRepo() @@ -198,7 +199,7 @@ datetime.datetime.now(datetime.timezone.utc) ) revision = Revision( - directory=obj_id, + directory=swhid.object_id, message=b"dummy message", author=Person.from_fullname(b"someone"), committer=Person.from_fullname(b"someone"), @@ -209,7 +210,9 @@ ) storage.revision_add([revision]) - with cook_stream_revision_gitfast(storage, revision.id) as stream, test_repo as p: + with cook_stream_revision_gitfast( + storage, revision.swhid() + ) as stream, test_repo as p: processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) processor.import_stream(stream) test_repo.checkout(b"HEAD") @@ -218,9 +221,7 @@ @contextlib.contextmanager -def cook_extract_directory_git_bare( - storage, obj_id, fsck=True, direct_objstorage=False -): +def cook_extract_directory_git_bare(storage, swhid, fsck=True, direct_objstorage=False): """Context manager that cooks a revision and extract it, using GitBareCooker""" backend = unittest.mock.MagicMock() @@ -228,8 +229,7 @@ # Cook the object cooker = GitBareCooker( - "directory", - obj_id, + swhid, backend=backend, storage=storage, objstorage=storage.objstorage if direct_objstorage else None, @@ -249,12 +249,7 @@ with tempfile.TemporaryDirectory(prefix="tmp-vault-clone-") as clone_dir: clone_dir = pathlib.Path(clone_dir) subprocess.check_call( - [ - "git", - "clone", - os.path.join(td, f"swh:1:dir:{obj_id.hex()}.git"), - clone_dir, - ] + ["git", "clone", os.path.join(td, f"{swhid}.git"), clone_dir,] ) shutil.rmtree(clone_dir / ".git") yield clone_dir @@ -275,13 +270,11 @@ @contextlib.contextmanager -def cook_stream_revision_gitfast(storage, obj_id): +def cook_stream_revision_gitfast(storage, swhid): """Context manager that cooks a revision and stream its fastexport.""" backend = unittest.mock.MagicMock() backend.storage = storage - cooker = RevisionGitfastCooker( - "revision_gitfast", obj_id, backend=backend, storage=storage - ) + cooker = RevisionGitfastCooker(swhid, backend=backend, storage=storage) cooker.fileobj = io.BytesIO() assert cooker.check_exists() cooker.prepare_bundle() @@ -292,11 +285,11 @@ @contextlib.contextmanager -def cook_extract_revision_gitfast(storage, obj_id, fsck=True): +def cook_extract_revision_gitfast(storage, swhid, fsck=True): """Context manager that cooks a revision and extract it, using RevisionGitfastCooker""" test_repo = TestRepo() - with cook_stream_revision_gitfast(storage, obj_id) as stream, test_repo as p: + with cook_stream_revision_gitfast(storage, swhid) as stream, test_repo as p: processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) processor.import_stream(stream) yield test_repo, p @@ -310,12 +303,7 @@ backend.storage = storage # Cook the object - cooker = GitBareCooker( - swhid.object_type.name.lower(), - swhid.object_id, - backend=backend, - storage=storage, - ) + cooker = GitBareCooker(swhid, backend=backend, storage=storage) cooker.use_fsck = fsck # Some tests try edge-cases that git-fsck rejects cooker.fileobj = io.BytesIO() assert cooker.check_exists() @@ -339,14 +327,8 @@ @contextlib.contextmanager -def cook_extract_revision_git_bare(storage, obj_id, fsck=True): - with cook_extract_git_bare( - storage, - identifiers.CoreSWHID( - object_type=identifiers.ObjectType.REVISION, object_id=obj_id - ), - fsck=fsck, - ) as res: +def cook_extract_revision_git_bare(storage, swhid, fsck=True): + with cook_extract_git_bare(storage, swhid, fsck=fsck,) as res: yield res @@ -361,14 +343,8 @@ @contextlib.contextmanager -def cook_extract_snapshot_git_bare(storage, obj_id, fsck=True): - with cook_extract_git_bare( - storage, - identifiers.CoreSWHID( - object_type=identifiers.ObjectType.SNAPSHOT, object_id=obj_id - ), - fsck=fsck, - ) as res: +def cook_extract_snapshot_git_bare(storage, swhid, fsck=True): + with cook_extract_git_bare(storage, swhid, fsck=fsck,) as res: yield res @@ -403,8 +379,9 @@ obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) + swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj_id) - with cook_extract_directory(loader.storage, obj_id) as p: + with cook_extract_directory(loader.storage, swhid) as p: assert (p / "file").stat().st_mode == 0o100644 assert (p / "file").read_text() == TEST_CONTENT assert (p / "executable").stat().st_mode == 0o100755 @@ -434,6 +411,7 @@ obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) + swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj_id) # FIXME: storage.content_update() should be changed to allow things # like that @@ -462,7 +440,7 @@ cur.execute("delete from content where sha1 = %s", (id_3,)) - with cook_extract_directory(loader.storage, obj_id) as p: + with cook_extract_directory(loader.storage, swhid) as p: assert (p / "file").read_bytes() == b"test1" assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE @@ -508,8 +486,9 @@ obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) + swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj_id) - with cook_extract_directory(loader.storage, obj_id) as p: + with cook_extract_directory(loader.storage, swhid) as p: assert (p / "file").stat().st_mode == 0o100644 assert (p / "executable").stat().st_mode == 0o100755 assert (p / "wat").stat().st_mode == 0o100644 @@ -534,6 +513,7 @@ obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) + swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj_id) # Set-up spies storage_content_get_data = mocker.patch.object( @@ -544,7 +524,7 @@ ) with cook_extract_directory_git_bare( - loader.storage, obj_id, direct_objstorage=direct_objstorage + loader.storage, swhid, direct_objstorage=direct_objstorage ) as p: assert (p / "file").stat().st_mode == 0o100644 assert (p / "file").read_text() == TEST_CONTENT @@ -580,7 +560,9 @@ ) swh_storage.directory_add([dir]) - with cook_extract_directory_dircooker(swh_storage, dir.id, fsck=False) as p: + with cook_extract_directory_dircooker( + swh_storage, dir.swhid(), fsck=False + ) as p: assert (p / "submodule").is_symlink() assert os.readlink(str(p / "submodule")) == target_rev @@ -601,7 +583,7 @@ repo.commit("add file2") (rp / "dir1/dir2").mkdir(parents=True) (rp / "dir1/dir2/file").write_text(TEST_CONTENT) - repo.commit("add dir1/dir2/file") + (rp / "bin1").write_bytes(TEST_EXECUTABLE) (rp / "bin1").chmod(0o755) repo.commit("add bin1") @@ -615,9 +597,10 @@ loader.load() obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) - return (loader, obj_id) + swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) + return (loader, swhid) - def check_revision_simple(self, ert, p, obj_id): + def check_revision_simple(self, ert, p, swhid): ert.checkout(b"HEAD") assert (p / "file1").stat().st_mode == 0o100644 assert (p / "file1").read_text() == TEST_CONTENT @@ -627,7 +610,7 @@ assert (p / "bin").read_bytes() == TEST_EXECUTABLE assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 - assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() + assert ert.repo.refs[b"HEAD"].decode() == swhid.object_id.hex() def load_repo_two_roots(self, git_loader): # @@ -647,14 +630,15 @@ repo.commit("add file3") obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) + swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) loader = git_loader(str(rp)) loader.load() - return (loader, obj_id) + return (loader, swhid) - def check_revision_two_roots(self, ert, p, obj_id): - assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() + def check_revision_two_roots(self, ert, p, swhid): + assert ert.repo.refs[b"HEAD"].decode() == swhid.object_id.hex() - (c3,) = ert.repo[hashutil.hash_to_bytehex(obj_id)].parents + (c3,) = ert.repo[hashutil.hash_to_bytehex(swhid.object_id)].parents assert len(ert.repo[c3].parents) == 2 def load_repo_two_heads(self, git_loader): @@ -682,13 +666,14 @@ obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) + swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) loader = git_loader(str(rp)) loader.load() - return (loader, obj_id) + return (loader, swhid) - def check_snapshot_two_heads(self, ert, p, obj_id): + def check_snapshot_two_heads(self, ert, p, swhid): assert ( - hashutil.hash_to_bytehex(obj_id) + hashutil.hash_to_bytehex(swhid.object_id) == ert.repo.refs[b"HEAD"] == ert.repo.refs[b"refs/heads/master"] == ert.repo.refs[b"refs/remotes/origin/HEAD"] @@ -696,7 +681,7 @@ == ert.repo.refs[b"refs/remotes/origin/b1"] ) - c4_id = hashutil.hash_to_bytehex(obj_id) + c4_id = hashutil.hash_to_bytehex(swhid.object_id) c3_id = ert.repo.refs[b"refs/remotes/origin/b2"] assert ert.repo[c3_id].parents == ert.repo[c4_id].parents @@ -729,23 +714,24 @@ obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) + swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) loader = git_loader(str(rp)) loader.load() - return (loader, obj_id) + return (loader, swhid) - def check_revision_two_double_fork_merge(self, ert, p, obj_id): - assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() + def check_revision_two_double_fork_merge(self, ert, p, swhid): + assert ert.repo.refs[b"HEAD"].decode() == swhid.object_id.hex() - def check_snapshot_two_double_fork_merge(self, ert, p, obj_id): + def check_snapshot_two_double_fork_merge(self, ert, p, swhid): assert ( - hashutil.hash_to_bytehex(obj_id) + hashutil.hash_to_bytehex(swhid.object_id) == ert.repo.refs[b"HEAD"] == ert.repo.refs[b"refs/heads/master"] == ert.repo.refs[b"refs/remotes/origin/HEAD"] == ert.repo.refs[b"refs/remotes/origin/master"] ) - (c4_id, c5_id) = ert.repo[obj_id.hex().encode()].parents + (c4_id, c5_id) = ert.repo[swhid.object_id.hex().encode()].parents assert c5_id == ert.repo.refs[b"refs/remotes/origin/c3"] (c2_id, c3_id) = ert.repo[c4_id].parents @@ -773,23 +759,24 @@ obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) + swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) loader = git_loader(str(rp)) loader.load() - return (loader, obj_id) + return (loader, swhid) - def check_revision_triple_merge(self, ert, p, obj_id): - assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() + def check_revision_triple_merge(self, ert, p, swhid): + assert ert.repo.refs[b"HEAD"].decode() == swhid.object_id.hex() - def check_snapshot_triple_merge(self, ert, p, obj_id): + def check_snapshot_triple_merge(self, ert, p, swhid): assert ( - hashutil.hash_to_bytehex(obj_id) + hashutil.hash_to_bytehex(swhid.object_id) == ert.repo.refs[b"HEAD"] == ert.repo.refs[b"refs/heads/master"] == ert.repo.refs[b"refs/remotes/origin/HEAD"] == ert.repo.refs[b"refs/remotes/origin/master"] ) - (c2_id, c3_id, c4_id) = ert.repo[obj_id.hex().encode()].parents + (c2_id, c3_id, c4_id) = ert.repo[swhid.object_id.hex().encode()].parents assert c3_id == ert.repo.refs[b"refs/remotes/origin/b1"] assert c4_id == ert.repo.refs[b"refs/remotes/origin/b2"] @@ -813,6 +800,7 @@ repo.commit() obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) + swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) loader = git_loader(str(rp)) loader.load() @@ -842,9 +830,9 @@ ) cur.execute("delete from content where sha1 = %s", (id_3,)) - return (loader, obj_id) + return (loader, swhid) - def check_revision_filtered_objects(self, ert, p, obj_id): + def check_revision_filtered_objects(self, ert, p, swhid): ert.checkout(b"HEAD") assert (p / "file").read_bytes() == b"test1" assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE @@ -878,9 +866,9 @@ storage = loader.storage storage.revision_add([test_revision]) - return (loader, test_revision.id) + return (loader, test_revision.swhid()) - def check_revision_null_fields(self, ert, p, obj_id): + def check_revision_null_fields(self, ert, p, swhid): ert.checkout(b"HEAD") assert (p / "file").stat().st_mode == 0o100644 @@ -920,13 +908,14 @@ obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) + swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) loader = git_loader(str(rp)) loader.load() - return (loader, obj_id) + return (loader, swhid) - def check_snapshot_tags(self, ert, p, obj_id): + def check_snapshot_tags(self, ert, p, swhid): assert ( - hashutil.hash_to_bytehex(obj_id) + hashutil.hash_to_bytehex(swhid.object_id) == ert.repo.refs[b"HEAD"] == ert.repo.refs[b"refs/heads/master"] == ert.repo.refs[b"refs/remotes/origin/HEAD"] @@ -935,7 +924,7 @@ ) c2_id = ert.repo.refs[b"refs/tags/t2"] - c5_id = hashutil.hash_to_bytehex(obj_id) + c5_id = hashutil.hash_to_bytehex(swhid.object_id) assert ert.repo[c5_id].parents == [c2_id] @@ -953,34 +942,34 @@ class TestRevisionCooker(RepoFixtures): def test_revision_simple(self, git_loader, cook_extract_revision): - (loader, obj_id) = self.load_repo_simple(git_loader) - with cook_extract_revision(loader.storage, obj_id) as (ert, p): - self.check_revision_simple(ert, p, obj_id) + (loader, swhid) = self.load_repo_simple(git_loader) + with cook_extract_revision(loader.storage, swhid) as (ert, p): + self.check_revision_simple(ert, p, swhid) def test_revision_two_roots(self, git_loader, cook_extract_revision): - (loader, obj_id) = self.load_repo_two_roots(git_loader) - with cook_extract_revision(loader.storage, obj_id) as (ert, p): - self.check_revision_two_roots(ert, p, obj_id) + (loader, swhid) = self.load_repo_two_roots(git_loader) + with cook_extract_revision(loader.storage, swhid) as (ert, p): + self.check_revision_two_roots(ert, p, swhid) def test_revision_two_double_fork_merge(self, git_loader, cook_extract_revision): - (loader, obj_id) = self.load_repo_two_double_fork_merge(git_loader) - with cook_extract_revision(loader.storage, obj_id) as (ert, p): - self.check_revision_two_double_fork_merge(ert, p, obj_id) + (loader, swhid) = self.load_repo_two_double_fork_merge(git_loader) + with cook_extract_revision(loader.storage, swhid) as (ert, p): + self.check_revision_two_double_fork_merge(ert, p, swhid) def test_revision_triple_merge(self, git_loader, cook_extract_revision): - (loader, obj_id) = self.load_repo_triple_merge(git_loader) - with cook_extract_revision(loader.storage, obj_id) as (ert, p): - self.check_revision_triple_merge(ert, p, obj_id) + (loader, swhid) = self.load_repo_triple_merge(git_loader) + with cook_extract_revision(loader.storage, swhid) as (ert, p): + self.check_revision_triple_merge(ert, p, swhid) def test_revision_filtered_objects(self, git_loader, cook_extract_revision): - (loader, obj_id) = self.load_repo_filtered_objects(git_loader) - with cook_extract_revision(loader.storage, obj_id) as (ert, p): - self.check_revision_filtered_objects(ert, p, obj_id) + (loader, swhid) = self.load_repo_filtered_objects(git_loader) + with cook_extract_revision(loader.storage, swhid) as (ert, p): + self.check_revision_filtered_objects(ert, p, swhid) def test_revision_null_fields(self, git_loader, cook_extract_revision): - (loader, obj_id) = self.load_repo_null_fields(git_loader) - with cook_extract_revision(loader.storage, obj_id, fsck=False) as (ert, p): - self.check_revision_null_fields(ert, p, obj_id) + (loader, swhid) = self.load_repo_null_fields(git_loader) + with cook_extract_revision(loader.storage, swhid, fsck=False) as (ert, p): + self.check_revision_null_fields(ert, p, swhid) def test_revision_revision_data(self, swh_storage): target_rev = "0e8a3ad980ec179856012b7eecf4327e99cd44cd" @@ -1011,7 +1000,7 @@ ) swh_storage.revision_add([rev]) - with cook_stream_revision_gitfast(swh_storage, rev.id) as stream: + with cook_stream_revision_gitfast(swh_storage, rev.swhid()) as stream: pattern = "M 160000 {} submodule".format(target_rev).encode() assert pattern in stream.read() @@ -1020,43 +1009,50 @@ def test_snapshot_simple(self, git_loader, cook_extract_snapshot): (loader, main_rev_id) = self.load_repo_simple(git_loader) snp_id = loader.loaded_snapshot_id - with cook_extract_snapshot(loader.storage, snp_id) as (ert, p): + swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) + with cook_extract_snapshot(loader.storage, swhid) as (ert, p): self.check_revision_simple(ert, p, main_rev_id) def test_snapshot_two_roots(self, git_loader, cook_extract_snapshot): (loader, main_rev_id) = self.load_repo_two_roots(git_loader) snp_id = loader.loaded_snapshot_id - with cook_extract_snapshot(loader.storage, snp_id) as (ert, p): + swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) + with cook_extract_snapshot(loader.storage, swhid) as (ert, p): self.check_revision_two_roots(ert, p, main_rev_id) def test_snapshot_two_heads(self, git_loader, cook_extract_snapshot): (loader, main_rev_id) = self.load_repo_two_heads(git_loader) snp_id = loader.loaded_snapshot_id - with cook_extract_snapshot(loader.storage, snp_id) as (ert, p): + swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) + with cook_extract_snapshot(loader.storage, swhid) as (ert, p): self.check_snapshot_two_heads(ert, p, main_rev_id) def test_snapshot_two_double_fork_merge(self, git_loader, cook_extract_snapshot): (loader, main_rev_id) = self.load_repo_two_double_fork_merge(git_loader) snp_id = loader.loaded_snapshot_id - with cook_extract_snapshot(loader.storage, snp_id) as (ert, p): + swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) + with cook_extract_snapshot(loader.storage, swhid) as (ert, p): self.check_revision_two_double_fork_merge(ert, p, main_rev_id) self.check_snapshot_two_double_fork_merge(ert, p, main_rev_id) def test_snapshot_triple_merge(self, git_loader, cook_extract_snapshot): (loader, main_rev_id) = self.load_repo_triple_merge(git_loader) snp_id = loader.loaded_snapshot_id - with cook_extract_snapshot(loader.storage, snp_id) as (ert, p): + swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) + with cook_extract_snapshot(loader.storage, swhid) as (ert, p): self.check_revision_triple_merge(ert, p, main_rev_id) self.check_snapshot_triple_merge(ert, p, main_rev_id) def test_snapshot_filtered_objects(self, git_loader, cook_extract_snapshot): (loader, main_rev_id) = self.load_repo_filtered_objects(git_loader) snp_id = loader.loaded_snapshot_id - with cook_extract_snapshot(loader.storage, snp_id) as (ert, p): + swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) + with cook_extract_snapshot(loader.storage, swhid) as (ert, p): self.check_revision_filtered_objects(ert, p, main_rev_id) def test_snapshot_tags(self, git_loader, cook_extract_snapshot): (loader, main_rev_id) = self.load_repo_tags(git_loader) snp_id = loader.loaded_snapshot_id - with cook_extract_snapshot(loader.storage, snp_id) as (ert, p): + swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) + with cook_extract_snapshot(loader.storage, swhid) as (ert, p): self.check_snapshot_tags(ert, p, main_rev_id) diff --git a/swh/vault/tests/test_cookers_base.py b/swh/vault/tests/test_cookers_base.py --- a/swh/vault/tests/test_cookers_base.py +++ b/swh/vault/tests/test_cookers_base.py @@ -5,18 +5,17 @@ from unittest.mock import MagicMock -from swh.model import hashutil +from swh.model.identifiers import CoreSWHID from swh.vault.cookers.base import BaseVaultCooker TEST_BUNDLE_CHUNKS = [b"test content 1\n", b"test content 2\n", b"test content 3\n"] TEST_BUNDLE_CONTENT = b"".join(TEST_BUNDLE_CHUNKS) -TEST_OBJ_TYPE = "test_type" -TEST_HEX_ID = "17a3e48bce37be5226490e750202ad3a9a1a3fe9" -TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) +TEST_BUNDLE_TYPE = "test_type" +TEST_SWHID = CoreSWHID.from_string("swh:1:cnt:17a3e48bce37be5226490e750202ad3a9a1a3fe9") class BaseVaultCookerMock(BaseVaultCooker): - CACHE_TYPE_KEY = TEST_OBJ_TYPE + BUNDLE_TYPE = TEST_BUNDLE_TYPE def __init__(self): # we do not call super() here to bypass the building of db objects from @@ -24,8 +23,8 @@ self.config = {} self.storage = MagicMock() self.backend = MagicMock() - self.bundle_type = self.CACHE_TYPE_KEY - self.obj_id = hashutil.hash_to_bytes(TEST_OBJ_ID) + self.swhid = TEST_SWHID + self.obj_id = TEST_SWHID.object_id self.max_bundle_size = 1024 def check_exists(self): @@ -40,11 +39,11 @@ cooker = BaseVaultCookerMock() cooker.cook() cooker.backend.put_bundle.assert_called_once_with( - TEST_OBJ_TYPE, TEST_OBJ_ID, TEST_BUNDLE_CONTENT + TEST_BUNDLE_TYPE, TEST_SWHID, TEST_BUNDLE_CONTENT ) - cooker.backend.set_status.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID, "done") - cooker.backend.set_progress.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID, None) - cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID) + cooker.backend.set_status.assert_called_with(TEST_BUNDLE_TYPE, TEST_SWHID, "done") + cooker.backend.set_progress.assert_called_with(TEST_BUNDLE_TYPE, TEST_SWHID, None) + cooker.backend.send_notif.assert_called_with(TEST_BUNDLE_TYPE, TEST_SWHID) def test_code_exception_cook(): @@ -56,9 +55,9 @@ # Potentially remove this when we have objstorage streaming cooker.backend.put_bundle.assert_not_called() - cooker.backend.set_status.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID, "failed") + cooker.backend.set_status.assert_called_with(TEST_BUNDLE_TYPE, TEST_SWHID, "failed") assert "Nope" not in cooker.backend.set_progress.call_args[0][2] - cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID) + cooker.backend.send_notif.assert_called_with(TEST_BUNDLE_TYPE, TEST_SWHID) def test_policy_exception_cook(): @@ -69,6 +68,6 @@ # Potentially remove this when we have objstorage streaming cooker.backend.put_bundle.assert_not_called() - cooker.backend.set_status.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID, "failed") + cooker.backend.set_status.assert_called_with(TEST_BUNDLE_TYPE, TEST_SWHID, "failed") assert "exceeds" in cooker.backend.set_progress.call_args[0][2] - cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID) + cooker.backend.send_notif.assert_called_with(TEST_BUNDLE_TYPE, TEST_SWHID) diff --git a/swh/vault/tests/test_git_bare_cooker.py b/swh/vault/tests/test_git_bare_cooker.py --- a/swh/vault/tests/test_git_bare_cooker.py +++ b/swh/vault/tests/test_git_bare_cooker.py @@ -10,7 +10,6 @@ """ import datetime -import glob import io import subprocess import tarfile @@ -61,23 +60,26 @@ r""" Build objects:: - rel2 <------ snp - | / | \ - v / v \ - rev1 <------ rev2 <----° dir4 \ - | | | \ - v v v \ - dir1 dir2 dir3 \ - | / | | | - v / v v v - cnt1 <----° cnt2 cnt3 cnt4 + snp + /|||\ + / ||| \ + rel2 <----° /|\ \----> rel4 + | / | \ | + v / v \ v + rev1 <------ rev2 <----° dir4 \ rel3 + | | | \ | + v v v \ | + dir1 dir2 dir3 | | + | / | | | | + v / v v v v + cnt1 <----° cnt2 cnt3 cnt4 cnt5 If up_to_date_graph is true, then swh-graph contains all objects. - Else, cnt4, dir4, rev2, rel2, and snp are missing from the graph. + Else, cnt4, cnt5, dir4, rev2, rel2, rel3, and snp are missing from the graph. If tag is False, rel2 is excluded. - If weird_branches is False, dir4 and cnt4 are excluded. + If weird_branches is False, dir4, cnt4, rel3, rel4, and cnt5 are excluded. """ from swh.graph.naive_client import NaiveClient as GraphClient @@ -91,6 +93,7 @@ cnt2 = Content.from_data(b"horse") cnt3 = Content.from_data(b"battery") cnt4 = Content.from_data(b"staple") + cnt5 = Content.from_data(b"Tr0ub4dor&3") dir1 = Directory( entries=( DirectoryEntry( @@ -166,6 +169,20 @@ target=rev2.id, synthetic=True, ) + rel3 = Release( + name=b"1.0.0-blob", + message=b"tagged-blob", + target_type=ObjectType.CONTENT, + target=cnt5.sha1_git, + synthetic=True, + ) + rel4 = Release( + name=b"1.0.0-weird", + message=b"weird release", + target_type=ObjectType.RELEASE, + target=rel3.id, + synthetic=True, + ) # Create snapshot: @@ -185,6 +202,9 @@ branches[b"refs/heads/blob-ref"] = SnapshotBranch( target=cnt4.sha1_git, target_type=TargetType.CONTENT ) + branches[b"refs/tags/1.0.0-weird"] = SnapshotBranch( + target=rel4.id, target_type=TargetType.RELEASE + ) snp = Snapshot(branches=branches) # "Fill" swh-graph @@ -205,8 +225,18 @@ edges.append((rel2, rev2)) edges.append((snp, rel2)) if weird_branches: - nodes.extend([cnt3, cnt4, dir3, dir4]) - edges.extend([(dir3, cnt3), (dir4, dir3), (snp, dir4), (snp, cnt4)]) + nodes.extend([cnt3, cnt4, cnt5, dir3, dir4, rel3, rel4]) + edges.extend( + [ + (dir3, cnt3), + (dir4, dir3), + (snp, dir4), + (snp, cnt4), + (snp, rel4), + (rel4, rel3), + (rel3, cnt5), + ] + ) else: nodes = [cnt1, cnt2, cnt3, dir1, dir2, dir3, rev1] edges = [ @@ -226,10 +256,10 @@ edges = [(str(s.swhid()), str(d.swhid())) for (s, d) in edges] # Add all objects to storage - swh_storage.content_add([cnt1, cnt2, cnt3, cnt4]) + swh_storage.content_add([cnt1, cnt2, cnt3, cnt4, cnt5]) swh_storage.directory_add([dir1, dir2, dir3, dir4]) swh_storage.revision_add([rev1, rev2]) - swh_storage.release_add([rel2]) + swh_storage.release_add([rel2, rel3, rel4]) swh_storage.snapshot_add([snp]) # Add spy on swh_storage, to make sure revision_log is not called @@ -242,13 +272,11 @@ # Cook backend = InMemoryVaultBackend() if snapshot: - cooker_name = "snapshot_gitbare" - cooked_id = snp.id + cooked_swhid = snp.swhid() else: - cooker_name = "revision_gitbare" - cooked_id = rev2.id + cooked_swhid = rev2.swhid() cooker = GitBareCooker( - cooker_name, cooked_id, backend=backend, storage=swh_storage, graph=swh_graph, + cooked_swhid, backend=backend, storage=swh_storage, graph=swh_graph, ) if weird_branches: @@ -259,7 +287,7 @@ cooker.cook() # Get bundle - bundle = backend.fetch(cooker_name, cooked_id) + bundle = backend.fetch("git_bare", cooked_swhid) # Extract bundle and make sure both revisions are in it with tempfile.TemporaryDirectory("swh-vault-test-bare") as tempdir: @@ -270,7 +298,7 @@ [ "git", "-C", - glob.glob(f"{tempdir}/*{cooked_id.hex()}.git")[0], + f"{tempdir}/{cooked_swhid}.git", "log", "--format=oneline", "--decorate=", diff --git a/swh/vault/tests/test_init_cookers.py b/swh/vault/tests/test_init_cookers.py --- a/swh/vault/tests/test_init_cookers.py +++ b/swh/vault/tests/test_init_cookers.py @@ -10,7 +10,7 @@ import yaml from swh.vault.cookers import COOKER_TYPES, get_cooker -from swh.vault.tests.test_backend import TEST_HEX_ID +from swh.vault.tests.test_backend import TEST_SWHID @pytest.fixture @@ -74,7 +74,7 @@ write_config_to_env(config_ko, tmp_path, monkeypatch) with pytest.raises(exception_class, match=exception_msg): - get_cooker("directory", TEST_HEX_ID) + get_cooker("flat", TEST_SWHID) @pytest.mark.parametrize( @@ -106,7 +106,7 @@ for cooker_type in COOKER_TYPES.keys(): write_config_to_env(config_ok, tmp_path, monkeypatch) - cooker = get_cooker(cooker_type, TEST_HEX_ID) + cooker = get_cooker(cooker_type, TEST_SWHID) assert cooker is not None - assert isinstance(cooker, COOKER_TYPES[cooker_type]) + assert isinstance(cooker, tuple(COOKER_TYPES[cooker_type])) diff --git a/swh/vault/tests/test_server.py b/swh/vault/tests/test_server.py --- a/swh/vault/tests/test_server.py +++ b/swh/vault/tests/test_server.py @@ -11,13 +11,14 @@ import yaml from swh.core.api.serializers import json_dumps, msgpack_dumps, msgpack_loads +from swh.vault.api.serializers import ENCODERS from swh.vault.api.server import ( VaultServerApp, check_config, make_app, make_app_from_configfile, ) -from swh.vault.tests.test_backend import TEST_HEX_ID +from swh.vault.tests.test_backend import TEST_SWHID def test_make_app_from_file_missing(): @@ -78,25 +79,29 @@ async def test_client_cook_notfound(cli): resp = await cli.post( "/cook", - data=json_dumps({"bundle_type": "directory", "obj_id": TEST_HEX_ID}), + data=json_dumps( + {"bundle_type": "flat", "swhid": TEST_SWHID}, extra_encoders=ENCODERS + ), headers=[("Content-Type", "application/json")], ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["type"] == "NotFoundExc" - assert content["args"] == [f"directory {TEST_HEX_ID} was not found."] + assert content["args"] == [f"flat {TEST_SWHID} was not found."] async def test_client_progress_notfound(cli): resp = await cli.post( "/progress", - data=json_dumps({"bundle_type": "directory", "obj_id": TEST_HEX_ID}), + data=json_dumps( + {"bundle_type": "flat", "swhid": TEST_SWHID}, extra_encoders=ENCODERS + ), headers=[("Content-Type", "application/json")], ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["type"] == "NotFoundExc" - assert content["args"] == [f"directory {TEST_HEX_ID} was not found."] + assert content["args"] == [f"flat {TEST_SWHID} was not found."] async def test_client_batch_cook_invalid_type(cli):