diff --git a/swh/vault/backend.py b/swh/vault/backend.py index 4812f0b..3bfbfc9 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,552 +1,555 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections from email.mime.text import MIMEText import smtplib from typing import Any, Dict, List, Optional, Tuple import psycopg2.extras import psycopg2.pool from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.model import hashutil from swh.scheduler import get_scheduler from swh.scheduler.utils import create_oneshot_task_dict from swh.storage import get_storage from swh.vault.cache import VaultCache from swh.vault.cookers import COOKER_TYPES, get_cooker_cls from swh.vault.exc import NotFoundExc from swh.vault.interface import ObjectId cooking_task_name = "swh.vault.cooking_tasks.SWHCookingTask" NOTIF_EMAIL_FROM = '"Software Heritage Vault" ' "" -NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {obj_type} {short_id}" -NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {obj_type} {short_id}" +NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {bundle_type} {short_id}" +NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {bundle_type} {short_id}" NOTIF_EMAIL_BODY_SUCCESS = """ You have requested the following bundle from the Software Heritage Vault: -Object Type: {obj_type} +Bundle Type: {bundle_type} Object ID: {hex_id} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ NOTIF_EMAIL_BODY_FAILURE = """ You have requested the following bundle from the Software Heritage Vault: -Object Type: {obj_type} +Bundle Type: {bundle_type} Object ID: {hex_id} This bundle could not be cooked for the following reason: {progress_msg} We apologize for the inconvenience. --\x20 The Software Heritage Developers """ def batch_to_bytes(batch: List[Tuple[str, str]]) -> List[Tuple[str, bytes]]: - return [(obj_type, hashutil.hash_to_bytes(hex_id)) for obj_type, hex_id in batch] + return [ + (bundle_type, hashutil.hash_to_bytes(hex_id)) for bundle_type, hex_id in batch + ] class VaultBackend: """ Backend for the Software Heritage Vault. """ def __init__(self, **config): self.config = config self.cache = VaultCache(**config["cache"]) self.scheduler = get_scheduler(**config["scheduler"]) self.storage = get_storage(**config["storage"]) self.smtp_server = smtplib.SMTP() db_conn = config["db"] self._pool = psycopg2.pool.ThreadedConnectionPool( config.get("min_pool_conns", 1), config.get("max_pool_conns", 10), db_conn, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() def _compute_ids(self, obj_id: ObjectId) -> Tuple[str, bytes]: """Internal method to reconcile multiple possible inputs """ if isinstance(obj_id, str): return obj_id, hashutil.hash_to_bytes(obj_id) return hashutil.hash_to_hex(obj_id), obj_id @db_transaction() def progress( self, - obj_type: str, + bundle_type: str, obj_id: ObjectId, raise_notfound: bool = True, db=None, cur=None, ) -> Optional[Dict[str, Any]]: hex_id, obj_id = self._compute_ids(obj_id) cur.execute( """ SELECT id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s""", - (obj_type, obj_id), + (bundle_type, obj_id), ) res = cur.fetchone() if not res: if raise_notfound: - raise NotFoundExc(f"{obj_type} {hex_id} was not found.") + raise NotFoundExc(f"{bundle_type} {hex_id} was not found.") return None res["object_id"] = hashutil.hash_to_hex(res["object_id"]) return res - def _send_task(self, obj_type: str, hex_id: ObjectId): + def _send_task(self, bundle_type: str, hex_id: ObjectId): """Send a cooking task to the celery scheduler""" - task = create_oneshot_task_dict("cook-vault-bundle", obj_type, hex_id) + task = create_oneshot_task_dict("cook-vault-bundle", bundle_type, hex_id) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]["id"] @db_transaction() def create_task( - self, obj_type: str, obj_id: bytes, sticky: bool = False, db=None, cur=None + self, bundle_type: str, obj_id: bytes, sticky: bool = False, db=None, cur=None ): """Create and send a cooking task""" hex_id, obj_id = self._compute_ids(obj_id) - cooker_class = get_cooker_cls(obj_type) - cooker = cooker_class(obj_type, hex_id, backend=self, storage=self.storage) + cooker_class = get_cooker_cls(bundle_type) + cooker = cooker_class(bundle_type, hex_id, backend=self, storage=self.storage) if not cooker.check_exists(): - raise NotFoundExc(f"{obj_type} {hex_id} was not found.") + raise NotFoundExc(f"{bundle_type} {hex_id} was not found.") cur.execute( """ INSERT INTO vault_bundle (type, object_id, sticky) VALUES (%s, %s, %s)""", - (obj_type, obj_id, sticky), + (bundle_type, obj_id, sticky), ) db.conn.commit() - task_id = self._send_task(obj_type, hex_id) + task_id = self._send_task(bundle_type, hex_id) cur.execute( """ UPDATE vault_bundle SET task_id = %s WHERE type = %s AND object_id = %s""", - (task_id, obj_type, obj_id), + (task_id, bundle_type, obj_id), ) @db_transaction() def add_notif_email( - self, obj_type: str, obj_id: bytes, email: str, db=None, cur=None + self, bundle_type: str, obj_id: bytes, email: str, db=None, cur=None ): """Add an e-mail address to notify when a given bundle is ready""" cur.execute( """ INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))""", - (email, obj_type, obj_id), + (email, bundle_type, obj_id), ) - def put_bundle(self, obj_type: str, obj_id: ObjectId, bundle) -> bool: + def put_bundle(self, bundle_type: str, obj_id: ObjectId, bundle) -> bool: _, obj_id = self._compute_ids(obj_id) - self.cache.add(obj_type, obj_id, bundle) + self.cache.add(bundle_type, obj_id, bundle) return True @db_transaction() def cook( self, - obj_type: str, + bundle_type: str, obj_id: ObjectId, *, sticky: bool = False, email: Optional[str] = None, db=None, cur=None, ) -> Dict[str, Any]: hex_id, obj_id = self._compute_ids(obj_id) - info = self.progress(obj_type, obj_id, raise_notfound=False) + info = self.progress(bundle_type, obj_id, raise_notfound=False) - if obj_type not in COOKER_TYPES: - raise NotFoundExc(f"{obj_type} is an unknown type.") + if bundle_type not in COOKER_TYPES: + raise NotFoundExc(f"{bundle_type} is an unknown type.") # If there's a failed bundle entry, delete it first. if info is not None and info["task_status"] == "failed": obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( "DELETE FROM vault_bundle WHERE type = %s AND object_id = %s", - (obj_type, obj_id), + (bundle_type, obj_id), ) db.conn.commit() info = None # If there's no bundle entry, create the task. if info is None: - self.create_task(obj_type, obj_id, sticky) + self.create_task(bundle_type, obj_id, sticky) if email is not None: # If the task is already done, send the email directly if info is not None and info["task_status"] == "done": self.send_notification( - None, email, obj_type, hex_id, info["task_status"] + None, email, bundle_type, hex_id, info["task_status"] ) # Else, add it to the notification queue else: - self.add_notif_email(obj_type, obj_id, email) + self.add_notif_email(bundle_type, obj_id, email) - return self.progress(obj_type, obj_id) + return self.progress(bundle_type, obj_id) @db_transaction() def batch_cook( self, batch: List[Tuple[str, str]], db=None, cur=None ) -> Dict[str, int]: # Import execute_values at runtime only, because it requires # psycopg2 >= 2.7 (only available on postgresql servers) from psycopg2.extras import execute_values - for obj_type, _ in batch: - if obj_type not in COOKER_TYPES: - raise NotFoundExc(f"{obj_type} is an unknown type.") + for bundle_type, _ in batch: + if bundle_type not in COOKER_TYPES: + raise NotFoundExc(f"{bundle_type} is an unknown type.") cur.execute( """ INSERT INTO vault_batch (id) VALUES (DEFAULT) RETURNING id""" ) batch_id = cur.fetchone()["id"] batch_bytes = batch_to_bytes(batch) # Delete all failed bundles from the batch cur.execute( """ DELETE FROM vault_bundle WHERE task_status = 'failed' AND (type, object_id) IN %s""", (tuple(batch_bytes),), ) # Insert all the bundles, return the new ones execute_values( cur, """ INSERT INTO vault_bundle (type, object_id) VALUES %s ON CONFLICT DO NOTHING""", batch_bytes, ) # Get the bundle ids and task status cur.execute( """ SELECT id, type, object_id, task_id FROM vault_bundle WHERE (type, object_id) IN %s""", (tuple(batch_bytes),), ) bundles = cur.fetchall() # Insert the batch-bundle entries batch_id_bundle_ids = [(batch_id, row["id"]) for row in bundles] execute_values( cur, """ INSERT INTO vault_batch_bundle (batch_id, bundle_id) VALUES %s ON CONFLICT DO NOTHING""", batch_id_bundle_ids, ) db.conn.commit() # Get the tasks to fetch batch_new = [ (row["type"], bytes(row["object_id"])) for row in bundles if row["task_id"] is None ] # Send the tasks args_batch = [ - (obj_type, hashutil.hash_to_hex(obj_id)) for obj_type, obj_id in batch_new + (bundle_type, hashutil.hash_to_hex(obj_id)) + for bundle_type, obj_id in batch_new ] # TODO: change once the scheduler handles priority tasks tasks = [ create_oneshot_task_dict("swh-vault-batch-cooking", *args) for args in args_batch ] added_tasks = self.scheduler.create_tasks(tasks) tasks_ids_bundle_ids = [ - (task_id, obj_type, obj_id) - for task_id, (obj_type, obj_id) in zip( + (task_id, bundle_type, obj_id) + for task_id, (bundle_type, obj_id) in zip( [task["id"] for task in added_tasks], batch_new ) ] # Update the task ids execute_values( cur, """ UPDATE vault_bundle SET task_id = s_task_id FROM (VALUES %s) AS sub (s_task_id, s_type, s_object_id) WHERE type = s_type::cook_type AND object_id = s_object_id """, tasks_ids_bundle_ids, ) return {"id": batch_id} @db_transaction() def batch_progress(self, batch_id: int, db=None, cur=None) -> Dict[str, Any]: cur.execute( """ SELECT vault_bundle.id as id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_batch_bundle LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id WHERE batch_id = %s""", (batch_id,), ) bundles = cur.fetchall() if not bundles: raise NotFoundExc(f"Batch {batch_id} does not exist.") for bundle in bundles: bundle["object_id"] = hashutil.hash_to_hex(bundle["object_id"]) counter = collections.Counter(b["status"] for b in bundles) res = { "bundles": bundles, "total": len(bundles), **{k: 0 for k in ("new", "pending", "done", "failed")}, **dict(counter), } return res @db_transaction() - def is_available(self, obj_type: str, obj_id: ObjectId, db=None, cur=None): + def is_available(self, bundle_type: str, obj_id: ObjectId, db=None, cur=None): """Check whether a bundle is available for retrieval""" - info = self.progress(obj_type, obj_id, raise_notfound=False, cur=cur) + info = self.progress(bundle_type, obj_id, raise_notfound=False, cur=cur) obj_id = hashutil.hash_to_bytes(obj_id) return ( info is not None and info["task_status"] == "done" - and self.cache.is_cached(obj_type, obj_id) + and self.cache.is_cached(bundle_type, obj_id) ) @db_transaction() def fetch( - self, obj_type: str, obj_id: ObjectId, raise_notfound=True, db=None, cur=None + self, bundle_type: str, obj_id: ObjectId, raise_notfound=True, db=None, cur=None ) -> Optional[bytes]: """Retrieve a bundle from the cache""" hex_id, obj_id = self._compute_ids(obj_id) - available = self.is_available(obj_type, obj_id, cur=cur) + available = self.is_available(bundle_type, obj_id, cur=cur) if not available: if raise_notfound: - raise NotFoundExc(f"{obj_type} {hex_id} is not available.") + raise NotFoundExc(f"{bundle_type} {hex_id} is not available.") return None - self.update_access_ts(obj_type, obj_id, cur=cur) - return self.cache.get(obj_type, obj_id) + self.update_access_ts(bundle_type, obj_id, cur=cur) + return self.cache.get(bundle_type, obj_id) @db_transaction() - def update_access_ts(self, obj_type: str, obj_id: bytes, db=None, cur=None): + def update_access_ts(self, bundle_type: str, obj_id: bytes, db=None, cur=None): """Update the last access timestamp of a bundle""" cur.execute( """ UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s""", - (obj_type, obj_id), + (bundle_type, obj_id), ) @db_transaction() def set_status( - self, obj_type: str, obj_id: ObjectId, status: str, db=None, cur=None + self, bundle_type: str, obj_id: ObjectId, status: str, db=None, cur=None ) -> bool: obj_id = hashutil.hash_to_bytes(obj_id) req = ( """ UPDATE vault_bundle SET task_status = %s """ + (""", ts_done = NOW() """ if status == "done" else "") + """WHERE type = %s AND object_id = %s""" ) - cur.execute(req, (status, obj_type, obj_id)) + cur.execute(req, (status, bundle_type, obj_id)) return True @db_transaction() def set_progress( - self, obj_type: str, obj_id: ObjectId, progress: str, db=None, cur=None + self, bundle_type: str, obj_id: ObjectId, progress: str, db=None, cur=None ) -> bool: obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( """ UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s""", - (progress, obj_type, obj_id), + (progress, bundle_type, obj_id), ) return True @db_transaction() - def send_notif(self, obj_type: str, obj_id: ObjectId, db=None, cur=None) -> bool: + def send_notif(self, bundle_type: str, obj_id: ObjectId, db=None, cur=None) -> bool: hex_id, obj_id = self._compute_ids(obj_id) cur.execute( """ SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s""", - (obj_type, obj_id), + (bundle_type, obj_id), ) for d in cur: self.send_notification( d["id"], d["email"], - obj_type, + bundle_type, hex_id, status=d["task_status"], progress_msg=d["progress_msg"], ) return True @db_transaction() def send_notification( self, n_id: Optional[int], email: str, - obj_type: str, + bundle_type: str, hex_id: str, status: str, progress_msg: Optional[str] = None, db=None, cur=None, ) -> None: """Send the notification of a bundle to a specific e-mail""" short_id = hex_id[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = "https://archive.softwareheritage.org/api/1/vault/{}/{}/" "raw".format( - obj_type, hex_id + bundle_type, hex_id ) if status == "done": text = NOTIF_EMAIL_BODY_SUCCESS.strip() - text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) + text = text.format(bundle_type=bundle_type, hex_id=hex_id, url=url) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_SUCCESS.format( - obj_type=obj_type, short_id=short_id + bundle_type=bundle_type, short_id=short_id ) elif status == "failed": text = NOTIF_EMAIL_BODY_FAILURE.strip() text = text.format( - obj_type=obj_type, hex_id=hex_id, progress_msg=progress_msg + bundle_type=bundle_type, hex_id=hex_id, progress_msg=progress_msg ) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_FAILURE.format( - obj_type=obj_type, short_id=short_id + bundle_type=bundle_type, short_id=short_id ) else: raise RuntimeError( "send_notification called on a '{}' bundle".format(status) ) msg["From"] = NOTIF_EMAIL_FROM msg["To"] = email self._smtp_send(msg) if n_id is not None: cur.execute( """ DELETE FROM vault_notif_email WHERE id = %s""", (n_id,), ) def _smtp_send(self, msg: MIMEText): # Reconnect if needed try: status = self.smtp_server.noop()[0] except smtplib.SMTPException: status = -1 if status != 250: self.smtp_server.connect("localhost", 25) # Send the message self.smtp_server.send_message(msg) @db_transaction() def _cache_expire(self, cond, *args, db=None, cur=None) -> None: """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cur.execute( """ DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, object_id """.format( cond ), args, ) for d in cur: self.cache.delete(d["type"], bytes(d["object_id"])) @db_transaction() def cache_expire_oldest(self, n=1, by="last_access", db=None, cur=None) -> None: """Expire the `n` oldest bundles""" assert by in ("created", "done", "last_access") filter = """ORDER BY ts_{} LIMIT {}""".format(by, n) return self._cache_expire(filter) @db_transaction() def cache_expire_until(self, date, by="last_access", db=None, cur=None) -> None: """Expire all the bundles until a certain date""" assert by in ("created", "done", "last_access") filter = """AND ts_{} <= %s""".format(by) return self._cache_expire(filter, date) diff --git a/swh/vault/cache.py b/swh/vault/cache.py index 9691817..cdbb524 100644 --- a/swh/vault/cache.py +++ b/swh/vault/cache.py @@ -1,47 +1,47 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.model import hashutil from swh.objstorage.factory import get_objstorage from swh.objstorage.objstorage import compute_hash class VaultCache: """The Vault cache is an object storage that stores Vault bundles. This implementation computes sha1(':') as the internal identifiers used in the underlying objstorage. """ def __init__(self, **objstorage): self.objstorage = get_objstorage(**objstorage) - def add(self, obj_type, obj_id, content): - sid = self._get_internal_id(obj_type, obj_id) + def add(self, bundle_type, obj_id, content): + sid = self._get_internal_id(bundle_type, obj_id) return self.objstorage.add(content, sid) - def get(self, obj_type, obj_id): - sid = self._get_internal_id(obj_type, obj_id) + def get(self, bundle_type, obj_id): + sid = self._get_internal_id(bundle_type, obj_id) return self.objstorage.get(hashutil.hash_to_bytes(sid)) - def delete(self, obj_type, obj_id): - sid = self._get_internal_id(obj_type, obj_id) + def delete(self, bundle_type, obj_id): + sid = self._get_internal_id(bundle_type, obj_id) return self.objstorage.delete(hashutil.hash_to_bytes(sid)) - def add_stream(self, obj_type, obj_id, content_iter): - sid = self._get_internal_id(obj_type, obj_id) + def add_stream(self, bundle_type, obj_id, content_iter): + sid = self._get_internal_id(bundle_type, obj_id) return self.objstorage.add_stream(content_iter, sid) - def get_stream(self, obj_type, obj_id): - sid = self._get_internal_id(obj_type, obj_id) + def get_stream(self, bundle_type, obj_id): + sid = self._get_internal_id(bundle_type, obj_id) return self.objstorage.get_stream(hashutil.hash_to_bytes(sid)) - def is_cached(self, obj_type, obj_id): - sid = self._get_internal_id(obj_type, obj_id) + def is_cached(self, bundle_type, obj_id): + sid = self._get_internal_id(bundle_type, obj_id) return hashutil.hash_to_bytes(sid) in self.objstorage - def _get_internal_id(self, obj_type, obj_id): + def _get_internal_id(self, bundle_type, obj_id): obj_id = hashutil.hash_to_hex(obj_id) - return compute_hash("{}:{}".format(obj_type, obj_id).encode()) + return compute_hash("{}:{}".format(bundle_type, obj_id).encode()) diff --git a/swh/vault/cli.py b/swh/vault/cli.py index e5cc712..7aa5758 100644 --- a/swh/vault/cli.py +++ b/swh/vault/cli.py @@ -1,192 +1,195 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging from typing import TYPE_CHECKING, Optional import click from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.core.cli import swh as swh_cli_group if TYPE_CHECKING: import io from swh.model.identifiers import CoreSWHID class SwhidParamType(click.ParamType): name = "swhid" def convert(self, value, param, ctx): from swh.model.exceptions import ValidationError from swh.model.identifiers import CoreSWHID try: return CoreSWHID.from_string(value) except ValidationError: self.fail(f"expected core SWHID, got {value!r}", param, ctx) @swh_cli_group.group(name="vault", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) @click.pass_context def vault(ctx): """Software Heritage Vault tools.""" @vault.command() @click.option( "--config-file", "-C", default=None, metavar="CONFIGFILE", type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.argument("swhid", type=SwhidParamType()) @click.argument("outfile", type=click.File("wb")) @click.option( "--cooker-type", type=click.Choice(["flat", "gitfast", "git_bare"]), help="Selects which cooker to use, when there is more than one available " "for the given object type.", ) @click.pass_context def cook( ctx, config_file: str, swhid: CoreSWHID, outfile: io.RawIOBase, cooker_type: Optional[str], ): """ Runs a vault cooker for a single object (identified by a SWHID), and outputs it to the given file. """ from swh.core import config from swh.objstorage.exc import ObjNotFoundError from swh.objstorage.factory import get_objstorage from swh.storage import get_storage from .cookers import COOKER_TYPES, get_cooker_cls from .in_memory_backend import InMemoryVaultBackend conf = config.read(config_file) supported_object_types = {name.split("_")[0] for name in COOKER_TYPES} if swhid.object_type.name.lower() not in supported_object_types: raise click.ClickException( f"No cooker available for {swhid.object_type.name} objects." ) cooker_name = swhid.object_type.name.lower() if cooker_type: cooker_name = f"{cooker_name}_{cooker_type}" if cooker_name not in COOKER_TYPES: raise click.ClickException( f"{swhid.object_type.name.lower()} objects do not have " f"a {cooker_type} cooker." ) else: if cooker_name not in COOKER_TYPES: raise click.ClickException( f"{swhid.object_type.name.lower()} objects need " f"an explicit --cooker-type." ) try: from swh.graph.client import RemoteGraphClient # optional dependency graph = RemoteGraphClient(**conf["graph"]) if conf.get("graph") else None except ModuleNotFoundError: if conf.get("graph"): raise EnvironmentError( "Graph configuration required but module is not installed." ) else: graph = None backend = InMemoryVaultBackend() storage = get_storage(**conf["storage"]) objstorage = get_objstorage(**conf["objstorage"]) if "objstorage" in conf else None cooker_cls = get_cooker_cls(cooker_name) cooker = cooker_cls( - obj_type=cooker_name, + bundle_type=cooker_name, obj_id=swhid.object_id, backend=backend, storage=storage, graph=graph, objstorage=objstorage, max_bundle_size=None, # No need for a size limit, we are running locally ) cooker.cook() try: bundle = backend.fetch(cooker_name, swhid.object_id) except ObjNotFoundError: bundle = None if bundle is None: + import pdb + + pdb.set_trace() raise click.ClickException("Cooker did not write a bundle to the backend.") outfile.write(bundle) @vault.command(name="rpc-serve") @click.option( "--config-file", "-C", default=None, metavar="CONFIGFILE", type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.option( "--host", default="0.0.0.0", metavar="IP", show_default=True, help="Host ip address to bind the server on", ) @click.option( "--port", default=5005, type=click.INT, metavar="PORT", help="Binding port of the server", ) @click.option( "--debug/--no-debug", default=True, help="Indicates if the server should run in debug mode", ) @click.pass_context def serve(ctx, config_file, host, port, debug): """Software Heritage Vault RPC server.""" import aiohttp from swh.vault.api.server import make_app_from_configfile ctx.ensure_object(dict) try: app = make_app_from_configfile(config_file, debug=debug) except EnvironmentError as e: click.echo(e.msg, err=True) ctx.exit(1) aiohttp.web.run_app(app, host=host, port=int(port)) def main(): logging.basicConfig() return serve(auto_envvar_prefix="SWH_VAULT") if __name__ == "__main__": main() diff --git a/swh/vault/cookers/__init__.py b/swh/vault/cookers/__init__.py index 1ab2a8d..ef3975e 100644 --- a/swh/vault/cookers/__init__.py +++ b/swh/vault/cookers/__init__.py @@ -1,113 +1,113 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations import os from typing import Any, Dict from swh.core.config import load_named_config from swh.core.config import read as read_config from swh.storage import get_storage from swh.vault import get_vault from swh.vault.cookers.base import DEFAULT_CONFIG, DEFAULT_CONFIG_PATH from swh.vault.cookers.directory import DirectoryCooker from swh.vault.cookers.git_bare import GitBareCooker from swh.vault.cookers.revision_flat import RevisionFlatCooker from swh.vault.cookers.revision_gitfast import RevisionGitfastCooker COOKER_TYPES = { "directory": DirectoryCooker, "revision_flat": RevisionFlatCooker, "revision_gitfast": RevisionGitfastCooker, "snapshot_git_bare": GitBareCooker, "revision_git_bare": GitBareCooker, "directory_git_bare": GitBareCooker, } -def get_cooker_cls(obj_type): - return COOKER_TYPES[obj_type] +def get_cooker_cls(bundle_type): + return COOKER_TYPES[bundle_type] def check_config(cfg: Dict[str, Any]) -> Dict[str, Any]: """Ensure the configuration is ok to run a vault worker, and propagate defaults Raises: EnvironmentError if the configuration is not for remote instance ValueError if one of the following keys is missing: vault, storage Returns: New configuration dict to instantiate a vault worker instance """ cfg = cfg.copy() if "vault" not in cfg: raise ValueError("missing 'vault' configuration") vcfg = cfg["vault"] if vcfg["cls"] != "remote": raise EnvironmentError( "This vault backend can only be a 'remote' configuration" ) # TODO: Soft-deprecation of args key. Remove when ready. vcfg.update(vcfg.get("args", {})) # Default to top-level value if any if "storage" not in vcfg: vcfg["storage"] = cfg.get("storage") if not vcfg.get("storage"): raise ValueError("invalid configuration: missing 'storage' config entry.") return cfg -def get_cooker(obj_type: str, obj_id: str): - """Instantiate a cooker class of type obj_type. +def get_cooker(bundle_type: str, obj_id: str): + """Instantiate a cooker class of type bundle_type. Returns: - Cooker class in charge of cooking the obj_type with id obj_id. + Cooker class in charge of cooking the bundle_type with id obj_id. Raises: ValueError in case of a missing top-level vault key configuration or a storage key. EnvironmentError in case the vault configuration reference a non remote class. """ if "SWH_CONFIG_FILENAME" in os.environ: cfg = read_config(os.environ["SWH_CONFIG_FILENAME"], DEFAULT_CONFIG) else: cfg = load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG) - cooker_cls = get_cooker_cls(obj_type) + cooker_cls = get_cooker_cls(bundle_type) cfg = check_config(cfg) vcfg = cfg["vault"] storage = get_storage(**vcfg.pop("storage")) backend = get_vault(**vcfg) try: from swh.graph.client import RemoteGraphClient # optional dependency graph = RemoteGraphClient(**vcfg["graph"]) if vcfg.get("graph") else None except ModuleNotFoundError: if vcfg.get("graph"): raise EnvironmentError( "Graph configuration required but module is not installed." ) else: graph = None return cooker_cls( - obj_type, + bundle_type, obj_id, backend=backend, storage=storage, graph=graph, max_bundle_size=cfg["max_bundle_size"], ) diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index 5473ff6..fc796fe 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,151 +1,152 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import logging from typing import Optional from psycopg2.extensions import QueryCanceledError from swh.model import hashutil from swh.model.model import Sha1Git from swh.storage.interface import StorageInterface MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB DEFAULT_CONFIG_PATH = "vault/cooker" DEFAULT_CONFIG = { "max_bundle_size": ("int", MAX_BUNDLE_SIZE), } class PolicyError(Exception): """Raised when the bundle violates the cooking policy.""" pass class BundleTooLargeError(PolicyError): """Raised when the bundle is too large to be cooked.""" pass class BytesIOBundleSizeLimit(io.BytesIO): def __init__(self, *args, size_limit=None, **kwargs): super().__init__(*args, **kwargs) self.size_limit = size_limit def write(self, chunk): if ( self.size_limit is not None and self.getbuffer().nbytes + len(chunk) > self.size_limit ): raise BundleTooLargeError( "The requested bundle exceeds the maximum allowed " "size of {} bytes.".format(self.size_limit) ) return super().write(chunk) class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle """ CACHE_TYPE_KEY = None # type: Optional[str] def __init__( self, - obj_type: str, + bundle_type: str, obj_id: Sha1Git, backend, storage: StorageInterface, graph=None, objstorage=None, max_bundle_size: int = MAX_BUNDLE_SIZE, ): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: - obj_type: type of the object to be cooked into a bundle (directory, + bundle_type: type of the object to be cooked into a bundle (directory, revision_flat or revision_gitfast; see swh.vault.cooker.COOKER_TYPES). obj_id: id of the object to be cooked into a bundle. backend: the vault backend (swh.vault.backend.VaultBackend). """ - self.obj_type = obj_type + self.bundle_type = bundle_type self.obj_id = hashutil.hash_to_bytes(obj_id) self.backend = backend self.storage = storage self.objstorage = objstorage self.graph = graph self.max_bundle_size = max_bundle_size @abc.abstractmethod def check_exists(self): """Checks that the requested object exists and can be cooked. Override this in the cooker implementation. """ raise NotImplementedError @abc.abstractmethod def prepare_bundle(self): """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplementedError def cache_type_key(self) -> str: assert self.CACHE_TYPE_KEY return self.CACHE_TYPE_KEY def write(self, chunk): self.fileobj.write(chunk) def cook(self): """Cook the requested object into a bundle """ - self.backend.set_status(self.obj_type, self.obj_id, "pending") - self.backend.set_progress(self.obj_type, self.obj_id, "Processing...") + self.backend.set_status(self.bundle_type, self.obj_id, "pending") + self.backend.set_progress(self.bundle_type, self.obj_id, "Processing...") self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) try: try: self.prepare_bundle() except QueryCanceledError: raise PolicyError( "Timeout reached while assembling the requested bundle" ) bundle = self.fileobj.getvalue() # TODO: use proper content streaming instead of put_bundle() self.backend.put_bundle(self.cache_type_key(), self.obj_id, bundle) except PolicyError as e: - self.backend.set_status(self.obj_type, self.obj_id, "failed") - self.backend.set_progress(self.obj_type, self.obj_id, str(e)) + logging.info("Bundle cooking violated policy: %s", e) + self.backend.set_status(self.bundle_type, self.obj_id, "failed") + self.backend.set_progress(self.bundle_type, self.obj_id, str(e)) except Exception: - self.backend.set_status(self.obj_type, self.obj_id, "failed") + self.backend.set_status(self.bundle_type, self.obj_id, "failed") self.backend.set_progress( - self.obj_type, + self.bundle_type, self.obj_id, "Internal Server Error. This incident will be reported.", ) logging.exception("Bundle cooking failed.") else: - self.backend.set_status(self.obj_type, self.obj_id, "done") - self.backend.set_progress(self.obj_type, self.obj_id, None) + self.backend.set_status(self.bundle_type, self.obj_id, "done") + self.backend.set_progress(self.bundle_type, self.obj_id, None) finally: - self.backend.send_notif(self.obj_type, self.obj_id) + self.backend.send_notif(self.bundle_type, self.obj_id) diff --git a/swh/vault/cookers/git_bare.py b/swh/vault/cookers/git_bare.py index 17790c0..7a5ab9d 100644 --- a/swh/vault/cookers/git_bare.py +++ b/swh/vault/cookers/git_bare.py @@ -1,545 +1,545 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ This cooker creates tarballs containing a bare .git directory, that can be unpacked and cloned like any git repository. It works in three steps: 1. Write objects one by one in :file:`.git/objects/` 2. Calls ``git repack`` to pack all these objects into git packfiles. 3. Creates a tarball of the resulting repository It keeps a set of all written (or about-to-be-written) object hashes in memory to avoid downloading and writing the same objects twice. """ import datetime import logging import os.path import re import subprocess import tarfile import tempfile from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple import zlib from swh.core.api.classes import stream_results_optional from swh.model import identifiers from swh.model.hashutil import hash_to_bytehex, hash_to_hex from swh.model.model import ( Content, DirectoryEntry, ObjectType, Person, Release, Revision, RevisionType, Sha1Git, TargetType, TimestampWithTimezone, ) from swh.storage.algos.revisions_walker import DFSRevisionsWalker from swh.storage.algos.snapshot import snapshot_get_all_branches from swh.vault.cookers.base import BaseVaultCooker from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE RELEASE_BATCH_SIZE = 10000 REVISION_BATCH_SIZE = 10000 DIRECTORY_BATCH_SIZE = 10000 CONTENT_BATCH_SIZE = 100 logger = logging.getLogger(__name__) class GitBareCooker(BaseVaultCooker): use_fsck = True def cache_type_key(self) -> str: - return self.obj_type + return self.bundle_type def check_exists(self): - obj_type = self.obj_type.split("_")[0] + obj_type = self.bundle_type.split("_")[0] if obj_type == "revision": return not list(self.storage.revision_missing([self.obj_id])) elif obj_type == "directory": return not list(self.storage.directory_missing([self.obj_id])) if obj_type == "snapshot": return not list(self.storage.snapshot_missing([self.obj_id])) else: raise NotImplementedError(f"GitBareCooker for {obj_type}") def obj_swhid(self) -> identifiers.CoreSWHID: - obj_type = self.obj_type.split("_")[0] + obj_type = self.bundle_type.split("_")[0] return identifiers.CoreSWHID( object_type=identifiers.ObjectType[obj_type.upper()], object_id=self.obj_id, ) def _push(self, stack: List[Sha1Git], obj_ids: Iterable[Sha1Git]) -> None: assert not isinstance(obj_ids, bytes) revision_ids = [id_ for id_ in obj_ids if id_ not in self._seen] self._seen.update(revision_ids) stack.extend(revision_ids) def _pop(self, stack: List[Sha1Git], n: int) -> List[Sha1Git]: obj_ids = stack[-n:] stack[-n:] = [] return obj_ids def prepare_bundle(self): # Objects we will visit soon: self._rel_stack: List[Sha1Git] = [] self._rev_stack: List[Sha1Git] = [] self._dir_stack: List[Sha1Git] = [] self._cnt_stack: List[Sha1Git] = [] # Set of objects already in any of the stacks: self._seen: Set[Sha1Git] = set() self._walker_state: Optional[Any] = None # Set of errors we expect git-fsck to raise at the end: self._expected_fsck_errors = set() with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir: # Initialize a Git directory self.workdir = workdir self.gitdir = os.path.join(workdir, "clone.git") os.mkdir(self.gitdir) self.init_git() # Add the root object to the stack of objects to visit - self.push_subgraph(self.obj_type.split("_")[0], self.obj_id) + self.push_subgraph(self.bundle_type.split("_")[0], self.obj_id) # Load and write all the objects to disk self.load_objects() # Write the root object as a ref (this step is skipped if it's a snapshot) # This must be done before repacking; git-repack ignores orphan objects. self.write_refs() if self.use_fsck: self.git_fsck() self.repack() self.write_archive() def init_git(self) -> None: subprocess.run(["git", "-C", self.gitdir, "init", "--bare"], check=True) self.create_object_dirs() def create_object_dirs(self) -> None: # Create all possible dirs ahead of time, so we don't have to check for # existence every time. for byte in range(256): try: os.mkdir(os.path.join(self.gitdir, "objects", f"{byte:02x}")) except FileExistsError: pass def repack(self) -> None: # Add objects we wrote in a pack subprocess.run(["git", "-C", self.gitdir, "repack", "-d"], check=True) # Remove their non-packed originals subprocess.run(["git", "-C", self.gitdir, "prune-packed"], check=True) def git_fsck(self) -> None: proc = subprocess.run( ["git", "-C", self.gitdir, "fsck"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"LANG": "C.utf8"}, ) # Split on newlines not followed by a space errors = re.split("\n(?! )", proc.stdout.decode()) errors = [ error for error in errors if error and not error.startswith("warning ") ] unexpected_errors = set(errors) - self._expected_fsck_errors if unexpected_errors: raise Exception( "\n".join( ["Unexpected errors from git-fsck:"] + sorted(unexpected_errors) ) ) def write_refs(self, snapshot=None): refs: Dict[bytes, bytes] # ref name -> target - obj_type = self.obj_type.split("_")[0] + obj_type = self.bundle_type.split("_")[0] if obj_type == "directory": # We need a synthetic revision pointing to the directory author = Person.from_fullname( b"swh-vault, git-bare cooker " ) dt = datetime.datetime.now(tz=datetime.timezone.utc) dt = dt.replace(microsecond=0) # not supported by git date = TimestampWithTimezone.from_datetime(dt) revision = Revision( author=author, committer=author, date=date, committer_date=date, message=b"Initial commit", type=RevisionType.GIT, directory=self.obj_id, synthetic=True, ) self.write_revision_node(revision.to_dict()) refs = {b"refs/heads/master": hash_to_bytehex(revision.id)} elif obj_type == "revision": refs = {b"refs/heads/master": hash_to_bytehex(self.obj_id)} elif obj_type == "snapshot": if snapshot is None: # refs were already written in a previous step return branches = [] for (branch_name, branch) in snapshot.branches.items(): if branch is None: logging.error( "%s has dangling branch: %r", snapshot.swhid(), branch_name ) else: branches.append((branch_name, branch)) refs = { branch_name: ( b"ref: " + branch.target if branch.target_type == TargetType.ALIAS else hash_to_bytehex(branch.target) ) for (branch_name, branch) in branches } else: assert False, obj_type for (ref_name, ref_target) in refs.items(): path = os.path.join(self.gitdir.encode(), ref_name) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "wb") as fd: fd.write(ref_target) def write_archive(self): with tarfile.TarFile(mode="w", fileobj=self.fileobj) as tf: tf.add(self.gitdir, arcname=f"{self.obj_swhid()}.git", recursive=True) def _obj_path(self, obj_id: Sha1Git): return os.path.join(self.gitdir, self._obj_relative_path(obj_id)) def _obj_relative_path(self, obj_id: Sha1Git): obj_id_hex = hash_to_hex(obj_id) directory = obj_id_hex[0:2] filename = obj_id_hex[2:] return os.path.join("objects", directory, filename) def object_exists(self, obj_id: Sha1Git) -> bool: return os.path.exists(self._obj_path(obj_id)) def write_object(self, obj_id: Sha1Git, obj: bytes) -> bool: """Writes a git object on disk. Returns whether it was already written.""" # Git requires objects to be zlib-compressed; but repacking decompresses and # removes them, so we don't need to compress them too much. data = zlib.compress(obj, level=1) with open(self._obj_path(obj_id), "wb") as fd: fd.write(data) return True def push_subgraph(self, obj_type, obj_id) -> None: if obj_type == "revision": self.push_revision_subgraph(obj_id) elif obj_type == "directory": self._push(self._dir_stack, [obj_id]) elif obj_type == "snapshot": self.push_snapshot_subgraph(obj_id) else: raise NotImplementedError( f"GitBareCooker.queue_subgraph({obj_type!r}, ...)" ) def load_objects(self) -> None: while self._rel_stack or self._rev_stack or self._dir_stack or self._cnt_stack: release_ids = self._pop(self._rel_stack, RELEASE_BATCH_SIZE) if release_ids: self.load_releases(release_ids) revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) if revision_ids: self.load_revisions(revision_ids) directory_ids = self._pop(self._dir_stack, DIRECTORY_BATCH_SIZE) if directory_ids: self.load_directories(directory_ids) content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE) if content_ids: self.load_contents(content_ids) def push_revision_subgraph(self, obj_id: Sha1Git) -> None: """Fetches a revision and all its children, and writes them to disk""" loaded_from_graph = False if self.graph: from swh.graph.client import GraphArgumentException # First, try to cook using swh-graph, as it is more efficient than # swh-storage for querying the history obj_swhid = identifiers.CoreSWHID( object_type=identifiers.ObjectType.REVISION, object_id=obj_id, ) try: revision_ids = ( swhid.object_id for swhid in map( identifiers.CoreSWHID.from_string, self.graph.visit_nodes(str(obj_swhid), edges="rev:rev"), ) ) self._push(self._rev_stack, revision_ids) except GraphArgumentException as e: logger.info( "Revision %s not found in swh-graph, falling back to fetching " "history using swh-storage. %s", hash_to_hex(obj_id), e.args[0], ) else: loaded_from_graph = True if not loaded_from_graph: # If swh-graph is not available, or the revision is not yet in # swh-graph, fall back to self.storage.revision_log. # self.storage.revision_log also gives us the full revisions, # so we load them right now instead of just pushing them on the stack. walker = DFSRevisionsWalker(self.storage, obj_id, state=self._walker_state) for revision in walker: self.write_revision_node(revision) self._push(self._dir_stack, [revision["directory"]]) # Save the state, so the next call to the walker won't return the same # revisions self._walker_state = walker.export_state() def push_snapshot_subgraph(self, obj_id: Sha1Git) -> None: """Fetches a snapshot and all its children, and writes them to disk""" loaded_from_graph = False if self.graph: revision_ids = [] release_ids = [] directory_ids = [] from swh.graph.client import GraphArgumentException # First, try to cook using swh-graph, as it is more efficient than # swh-storage for querying the history obj_swhid = identifiers.CoreSWHID( object_type=identifiers.ObjectType.SNAPSHOT, object_id=obj_id, ) try: swhids = map( identifiers.CoreSWHID.from_string, self.graph.visit_nodes(str(obj_swhid), edges="snp:*,rel:*,rev:rev"), ) for swhid in swhids: if swhid.object_type == identifiers.ObjectType.REVISION: revision_ids.append(swhid.object_id) elif swhid.object_type == identifiers.ObjectType.RELEASE: release_ids.append(swhid.object_id) elif swhid.object_type == identifiers.ObjectType.DIRECTORY: directory_ids.append(swhid.object_id) elif swhid.object_type == identifiers.ObjectType.SNAPSHOT: assert ( swhid.object_id == obj_id ), f"Snapshot {obj_id.hex()} references a different snapshot" else: raise NotImplementedError( f"{swhid.object_type} objects in snapshot subgraphs." ) except GraphArgumentException as e: logger.info( "Snapshot %s not found in swh-graph, falling back to fetching " "history for each branch. %s", hash_to_hex(obj_id), e.args[0], ) else: self._push(self._rev_stack, revision_ids) self._push(self._rel_stack, release_ids) self._push(self._dir_stack, directory_ids) loaded_from_graph = True # TODO: when self.graph is available and supports edge labels, use it # directly to get branch names. snapshot = snapshot_get_all_branches(self.storage, obj_id) assert snapshot, "Unknown snapshot" # should have been caught by check_exists() for branch in snapshot.branches.values(): if not loaded_from_graph: if branch is None: logging.warning("Dangling branch: %r", branch) elif branch.target_type == TargetType.REVISION: self.push_revision_subgraph(branch.target) elif branch.target_type == TargetType.RELEASE: self.push_releases_subgraphs([branch.target]) elif branch.target_type == TargetType.ALIAS: # Nothing to do, this for loop also iterates on the target branch # (if it exists) pass elif branch.target_type == TargetType.DIRECTORY: self._push(self._dir_stack, [branch.target]) else: raise NotImplementedError(f"{branch.target_type} branches") self.write_refs(snapshot=snapshot) def load_revisions(self, obj_ids: List[Sha1Git]) -> None: """Given a list of revision ids, loads these revisions and their directories; but not their parent revisions.""" ret: List[Optional[Revision]] = self.storage.revision_get(obj_ids) revisions: List[Revision] = list(filter(None, ret)) if len(ret) != len(revisions): logger.error("Missing revision(s), ignoring them.") for revision in revisions: self.write_revision_node(revision.to_dict()) self._push(self._dir_stack, (rev.directory for rev in revisions)) def write_revision_node(self, revision: Dict[str, Any]) -> bool: """Writes a revision object to disk""" git_object = identifiers.revision_git_object(revision) return self.write_object(revision["id"], git_object) def load_releases(self, obj_ids: List[Sha1Git]) -> List[Release]: """Loads release objects, and returns them.""" ret = self.storage.release_get(obj_ids) releases = list(filter(None, ret)) if len(ret) != len(releases): logger.error("Missing release(s), ignoring them.") for release in releases: self.write_release_node(release.to_dict()) return releases def push_releases_subgraphs(self, obj_ids: List[Sha1Git]) -> None: """Given a list of release ids, loads these releases and adds their target to the list of objects to visit""" for release in self.load_releases(obj_ids): if release.target_type == ObjectType.REVISION: assert release.target, "{release.swhid(}) has no target" self.push_revision_subgraph(release.target) elif release.target_type == ObjectType.DIRECTORY: assert release.target, "{release.swhid(}) has no target" self._push(self._dir_stack, [release.target]) else: raise NotImplementedError( f"{release.swhid()} targets {release.target_type}" ) def write_release_node(self, release: Dict[str, Any]) -> bool: """Writes a release object to disk""" git_object = identifiers.release_git_object(release) return self.write_object(release["id"], git_object) def load_directories(self, obj_ids: List[Sha1Git]) -> None: for obj_id in obj_ids: self.load_directory(obj_id) def load_directory(self, obj_id: Sha1Git) -> None: # Load the directory entries_it: Optional[Iterable[DirectoryEntry]] = stream_results_optional( self.storage.directory_get_entries, obj_id ) if entries_it is None: logger.error("Missing swh:1:dir:%s, ignoring.", hash_to_hex(obj_id)) return entries = [entry.to_dict() for entry in entries_it] directory = {"id": obj_id, "entries": entries} git_object = identifiers.directory_git_object(directory) self.write_object(obj_id, git_object) # Add children to the stack entry_loaders: Dict[str, List[Sha1Git]] = { "file": self._cnt_stack, "dir": self._dir_stack, "rev": self._rev_stack, } for entry in directory["entries"]: stack = entry_loaders[entry["type"]] self._push(stack, [entry["target"]]) def load_contents(self, obj_ids: List[Sha1Git]) -> None: # TODO: add support of filtered objects, somehow? # It's tricky, because, by definition, we can't write a git object with # the expected hash, so git-fsck *will* choke on it. contents = self.storage.content_get(obj_ids, "sha1_git") visible_contents = [] for (obj_id, content) in zip(obj_ids, contents): if content is None: # FIXME: this may also happen for missing content self.write_content(obj_id, SKIPPED_MESSAGE) self._expect_mismatched_object_error(obj_id) elif content.status == "visible": visible_contents.append(content) elif content.status == "hidden": self.write_content(obj_id, HIDDEN_MESSAGE) self._expect_mismatched_object_error(obj_id) else: assert False, ( f"unexpected status {content.status!r} " f"for content {hash_to_hex(content.sha1_git)}" ) contents_and_data: Iterator[Tuple[Content, Optional[bytes]]] if self.objstorage is None: contents_and_data = ( (content, self.storage.content_get_data(content.sha1)) for content in visible_contents ) else: contents_and_data = zip( visible_contents, self.objstorage.get_batch(c.sha1 for c in visible_contents), ) for (content, datum) in contents_and_data: if datum is None: logger.error( "{content.swhid()} is visible, but is missing data. Skipping." ) continue self.write_content(content.sha1_git, datum) def write_content(self, obj_id: Sha1Git, content: bytes) -> None: header = identifiers.git_object_header("blob", len(content)) self.write_object(obj_id, header + content) def _expect_mismatched_object_error(self, obj_id): obj_id_hex = hash_to_hex(obj_id) obj_path = self._obj_relative_path(obj_id) # For Git < 2.21: self._expected_fsck_errors.add( f"error: sha1 mismatch for ./{obj_path} (expected {obj_id_hex})" ) # For Git >= 2.21: self._expected_fsck_errors.add( f"error: hash mismatch for ./{obj_path} (expected {obj_id_hex})" ) self._expected_fsck_errors.add( f"error: {obj_id_hex}: object corrupt or missing: ./{obj_path}" ) self._expected_fsck_errors.add(f"missing blob {obj_id_hex}") diff --git a/swh/vault/cookers/revision_gitfast.py b/swh/vault/cookers/revision_gitfast.py index a1b97f3..7199c80 100644 --- a/swh/vault/cookers/revision_gitfast.py +++ b/swh/vault/cookers/revision_gitfast.py @@ -1,219 +1,219 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import os import time import zlib from fastimport.commands import ( BlobCommand, CommitCommand, FileDeleteCommand, FileModifyCommand, ResetCommand, ) from swh.model import hashutil from swh.model.from_disk import DentryPerms, mode_to_perms from swh.model.toposort import toposort from swh.vault.cookers.base import BaseVaultCooker from swh.vault.cookers.utils import revision_log from swh.vault.to_disk import get_filtered_files_content class RevisionGitfastCooker(BaseVaultCooker): """Cooker to create a git fast-import bundle """ CACHE_TYPE_KEY = "revision_gitfast" def check_exists(self): return not list(self.storage.revision_missing([self.obj_id])) def prepare_bundle(self): self.log = list(toposort(revision_log(self.storage, self.obj_id))) self.gzobj = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) self.fastexport() self.write(self.gzobj.flush()) def write_cmd(self, cmd): chunk = bytes(cmd) + b"\n" super().write(self.gzobj.compress(chunk)) def fastexport(self): """Generate all the git fast-import commands from a given log. """ self.rev_by_id = {r["id"]: r for r in self.log} self.obj_done = set() self.obj_to_mark = {} self.next_available_mark = 1 last_progress_report = None for i, rev in enumerate(self.log, 1): # Update progress if needed ct = time.time() if last_progress_report is None or last_progress_report + 2 <= ct: last_progress_report = ct pg = "Computing revision {}/{}".format(i, len(self.log)) - self.backend.set_progress(self.obj_type, self.obj_id, pg) + self.backend.set_progress(self.bundle_type, self.obj_id, pg) # Compute the current commit self._compute_commit_command(rev) def mark(self, obj_id): """Get the mark ID as bytes of a git object. If the object has not yet been marked, assign a new ID and add it to the mark dictionary. """ if obj_id not in self.obj_to_mark: self.obj_to_mark[obj_id] = self.next_available_mark self.next_available_mark += 1 return str(self.obj_to_mark[obj_id]).encode() def _compute_blob_command_content(self, file_data): """Compute the blob command of a file entry if it has not been computed yet. """ obj_id = file_data["sha1"] if obj_id in self.obj_done: return contents = list(get_filtered_files_content(self.storage, [file_data])) content = contents[0]["content"] self.write_cmd(BlobCommand(mark=self.mark(obj_id), data=content)) self.obj_done.add(obj_id) def _author_tuple_format(self, author, date): # We never want to have None values here so we replace null entries # by ''. if author is not None: author_tuple = (author.get("name") or b"", author.get("email") or b"") else: author_tuple = (b"", b"") if date is not None: date_tuple = ( date.get("timestamp", {}).get("seconds") or 0, (date.get("offset") or 0) * 60, ) else: date_tuple = (0, 0) return author_tuple + date_tuple def _compute_commit_command(self, rev): """Compute a commit command from a specific revision. """ if "parents" in rev and rev["parents"]: from_ = b":" + self.mark(rev["parents"][0]) merges = [b":" + self.mark(r) for r in rev["parents"][1:]] parent = self.rev_by_id[rev["parents"][0]] else: # We issue a reset command before all the new roots so that they # are not automatically added as children of the current branch. self.write_cmd(ResetCommand(b"refs/heads/master", None)) from_ = None merges = None parent = None # Retrieve the file commands while yielding new blob commands if # needed. files = list(self._compute_file_commands(rev, parent)) # Construct and write the commit command author = self._author_tuple_format(rev["author"], rev["date"]) committer = self._author_tuple_format(rev["committer"], rev["committer_date"]) self.write_cmd( CommitCommand( ref=b"refs/heads/master", mark=self.mark(rev["id"]), author=author, committer=committer, message=rev["message"] or b"", from_=from_, merges=merges, file_iter=files, ) ) @functools.lru_cache(maxsize=4096) def _get_dir_ents(self, dir_id=None): """Get the entities of a directory as a dictionary (name -> entity). This function has a cache to avoid doing multiple requests to retrieve the same entities, as doing a directory_ls() is expensive. """ data = self.storage.directory_ls(dir_id) if dir_id is not None else [] return {f["name"]: f for f in data} def _compute_file_commands(self, rev, parent=None): """Compute all the file commands of a revision. Generate a diff of the files between the revision and its main parent to find the necessary file commands to apply. """ # Initialize the stack with the root of the tree. cur_dir = rev["directory"] parent_dir = parent["directory"] if parent else None stack = [(b"", cur_dir, parent_dir)] while stack: # Retrieve the current directory and the directory of the parent # commit in order to compute the diff of the trees. root, cur_dir_id, prev_dir_id = stack.pop() cur_dir = self._get_dir_ents(cur_dir_id) prev_dir = self._get_dir_ents(prev_dir_id) # Find subtrees to delete: # - Subtrees that are not in the new tree (file or directory # deleted). # - Subtrees that do not have the same type in the new tree # (file -> directory or directory -> file) # After this step, every node remaining in the previous directory # has the same type than the one in the current directory. for fname, f in prev_dir.items(): if fname not in cur_dir or f["type"] != cur_dir[fname]["type"]: yield FileDeleteCommand(path=os.path.join(root, fname)) # Find subtrees to modify: # - Leaves (files) will be added or modified using `filemodify` # - Other subtrees (directories) will be added to the stack and # processed in the next iteration. for fname, f in cur_dir.items(): # A file is added or modified if it was not in the tree, if its # permissions changed or if its content changed. if f["type"] == "file" and ( fname not in prev_dir or f["sha1"] != prev_dir[fname]["sha1"] or f["perms"] != prev_dir[fname]["perms"] ): # Issue a blob command for the new blobs if needed. self._compute_blob_command_content(f) yield FileModifyCommand( path=os.path.join(root, fname), mode=mode_to_perms(f["perms"]).value, dataref=(b":" + self.mark(f["sha1"])), data=None, ) # A revision is added or modified if it was not in the tree or # if its target changed elif f["type"] == "rev" and ( fname not in prev_dir or f["target"] != prev_dir[fname]["target"] ): yield FileModifyCommand( path=os.path.join(root, fname), mode=DentryPerms.revision, dataref=hashutil.hash_to_hex(f["target"]).encode(), data=None, ) # A directory is added or modified if it was not in the tree or # if its target changed. elif f["type"] == "dir": f_prev_target = None if fname in prev_dir and prev_dir[fname]["type"] == "dir": f_prev_target = prev_dir[fname]["target"] if f_prev_target is None or f["target"] != f_prev_target: stack.append( (os.path.join(root, fname), f["target"], f_prev_target) ) diff --git a/swh/vault/cooking_tasks.py b/swh/vault/cooking_tasks.py index cb41261..004d19e 100644 --- a/swh/vault/cooking_tasks.py +++ b/swh/vault/cooking_tasks.py @@ -1,21 +1,21 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import current_app as app from swh.vault.cookers import get_cooker @app.task(name=__name__ + ".SWHCookingTask") -def cook_bundle(obj_type, obj_id): +def cook_bundle(bundle_type, obj_id): """Main task to cook a bundle.""" - get_cooker(obj_type, obj_id).cook() + get_cooker(bundle_type, obj_id).cook() # TODO: remove once the scheduler handles priority tasks @app.task(name=__name__ + ".SWHBatchCookingTask") -def batch_cook_bundle(obj_type, obj_id): +def batch_cook_bundle(bundle_type, obj_id): """Temporary task for the batch queue.""" - get_cooker(obj_type, obj_id).cook() + get_cooker(bundle_type, obj_id).cook() diff --git a/swh/vault/in_memory_backend.py b/swh/vault/in_memory_backend.py index e7cbb2a..360ce31 100644 --- a/swh/vault/in_memory_backend.py +++ b/swh/vault/in_memory_backend.py @@ -1,53 +1,53 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, List, Optional, Tuple, Union from swh.model.hashutil import hash_to_bytes from .cache import VaultCache ObjectId = Union[str, bytes] class InMemoryVaultBackend: """Stub vault backend, for use in the CLI.""" def __init__(self): self._cache = VaultCache(cls="memory") - def fetch(self, obj_type: str, obj_id: ObjectId) -> Optional[bytes]: - return self._cache.get(obj_type, hash_to_bytes(obj_id)) + def fetch(self, bundle_type: str, obj_id: ObjectId) -> Optional[bytes]: + return self._cache.get(bundle_type, hash_to_bytes(obj_id)) def cook( - self, obj_type: str, obj_id: ObjectId, email: Optional[str] = None + self, bundle_type: str, obj_id: ObjectId, email: Optional[str] = None ) -> Dict[str, Any]: raise NotImplementedError("InMemoryVaultBackend.cook()") - def progress(self, obj_type: str, obj_id: ObjectId): + def progress(self, bundle_type: str, obj_id: ObjectId): raise NotImplementedError("InMemoryVaultBackend.progress()") # Cookers endpoints - def set_progress(self, obj_type: str, obj_id: ObjectId, progress: str) -> None: + def set_progress(self, bundle_type: str, obj_id: ObjectId, progress: str) -> None: pass - def set_status(self, obj_type: str, obj_id: ObjectId, status: str) -> None: + def set_status(self, bundle_type: str, obj_id: ObjectId, status: str) -> None: pass - def put_bundle(self, obj_type: str, obj_id: ObjectId, bundle) -> bool: - self._cache.add(obj_type, hash_to_bytes(obj_id), bundle) + def put_bundle(self, bundle_type: str, obj_id: ObjectId, bundle) -> bool: + self._cache.add(bundle_type, hash_to_bytes(obj_id), bundle) return True - def send_notif(self, obj_type: str, obj_id: ObjectId): + def send_notif(self, bundle_type: str, obj_id: ObjectId): pass # Batch endpoints def batch_cook(self, batch: List[Tuple[str, str]]) -> int: raise NotImplementedError("InMemoryVaultBackend.batch_cook()") def batch_progress(self, batch_id: int) -> Dict[str, Any]: pass diff --git a/swh/vault/interface.py b/swh/vault/interface.py index cfc3a5c..23e683b 100644 --- a/swh/vault/interface.py +++ b/swh/vault/interface.py @@ -1,70 +1,70 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, List, Optional, Tuple, Union from typing_extensions import Protocol, runtime_checkable from swh.core.api import remote_api_endpoint ObjectId = Union[str, bytes] @runtime_checkable class VaultInterface(Protocol): """ Backend Interface for the Software Heritage vault. """ @remote_api_endpoint("fetch") - def fetch(self, obj_type: str, obj_id: ObjectId) -> Optional[bytes]: + def fetch(self, bundle_type: str, obj_id: ObjectId) -> Optional[bytes]: """Fetch information from a bundle""" ... @remote_api_endpoint("cook") def cook( - self, obj_type: str, obj_id: ObjectId, email: Optional[str] = None + self, bundle_type: str, obj_id: ObjectId, email: Optional[str] = None ) -> Dict[str, Any]: """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" ... @remote_api_endpoint("progress") - def progress(self, obj_type: str, obj_id: ObjectId): + def progress(self, bundle_type: str, obj_id: ObjectId): ... # Cookers endpoints @remote_api_endpoint("set_progress") - def set_progress(self, obj_type: str, obj_id: ObjectId, progress: str) -> None: + def set_progress(self, bundle_type: str, obj_id: ObjectId, progress: str) -> None: """Set the cooking progress of a bundle""" ... @remote_api_endpoint("set_status") - def set_status(self, obj_type: str, obj_id: ObjectId, status: str) -> bool: + def set_status(self, bundle_type: str, obj_id: ObjectId, status: str) -> bool: """Set the cooking status of a bundle""" ... @remote_api_endpoint("put_bundle") - def put_bundle(self, obj_type: str, obj_id: ObjectId, bundle): + def put_bundle(self, bundle_type: str, obj_id: ObjectId, bundle): """Store bundle in vault cache""" ... @remote_api_endpoint("send_notif") - def send_notif(self, obj_type: str, obj_id: ObjectId): + def send_notif(self, bundle_type: str, obj_id: ObjectId): """Send all the e-mails in the notification list of a bundle""" ... # Batch endpoints @remote_api_endpoint("batch_cook") def batch_cook(self, batch: List[Tuple[str, str]]) -> int: """Cook a batch of bundles and returns the cooking id.""" ... @remote_api_endpoint("batch_progress") def batch_progress(self, batch_id: int) -> Dict[str, Any]: """Fetch information from a batch of bundles""" ... diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py index 2ec68e5..f716fa1 100644 --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -1,353 +1,353 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime from unittest.mock import MagicMock, patch import attr import psycopg2 import pytest from swh.model import hashutil from swh.vault.exc import NotFoundExc from swh.vault.tests.vault_testing import hash_content @contextlib.contextmanager def mock_cooking(vault_backend): with patch.object(vault_backend, "_send_task") as mt: mt.return_value = 42 with patch("swh.vault.backend.get_cooker_cls") as mg: mcc = MagicMock() mc = MagicMock() mg.return_value = mcc mcc.return_value = mc mc.check_exists.return_value = True yield { "_send_task": mt, "get_cooker_cls": mg, "cooker_cls": mcc, "cooker": mc, } def assertTimestampAlmostNow(ts, tolerance_secs=1.0): # noqa now = datetime.datetime.now(datetime.timezone.utc) creation_delta_secs = (ts - now).total_seconds() assert creation_delta_secs < tolerance_secs -def fake_cook(backend, obj_type, result_content, sticky=False): +def fake_cook(backend, bundle_type, result_content, sticky=False): content, obj_id = hash_content(result_content) with mock_cooking(backend): - backend.create_task(obj_type, obj_id, sticky) - backend.cache.add(obj_type, obj_id, b"content") - backend.set_status(obj_type, obj_id, "done") + backend.create_task(bundle_type, obj_id, sticky) + backend.cache.add(bundle_type, obj_id, b"content") + backend.set_status(bundle_type, obj_id, "done") return obj_id, content -def fail_cook(backend, obj_type, obj_id, failure_reason): +def fail_cook(backend, bundle_type, obj_id, failure_reason): with mock_cooking(backend): - backend.create_task(obj_type, obj_id) - backend.set_status(obj_type, obj_id, "failed") - backend.set_progress(obj_type, obj_id, failure_reason) + backend.create_task(bundle_type, obj_id) + backend.set_status(bundle_type, obj_id, "failed") + backend.set_progress(bundle_type, obj_id, failure_reason) TEST_TYPE = "revision_gitfast" TEST_HEX_ID = "4a4b9771542143cf070386f86b4b92d42966bdbc" TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) TEST_PROGRESS = ( "Mr. White, You're telling me you're cooking again?" " \N{ASTONISHED FACE} " ) TEST_EMAIL = "ouiche@lorraine.fr" @pytest.fixture def swh_vault(swh_vault, sample_data): # make the vault's storage consistent with test data revision = attr.evolve(sample_data.revision, id=TEST_OBJ_ID) swh_vault.storage.revision_add([revision]) return swh_vault def test_create_task_simple(swh_vault): with mock_cooking(swh_vault) as m: swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) m["get_cooker_cls"].assert_called_once_with(TEST_TYPE) args = m["cooker_cls"].call_args[0] assert args[0] == TEST_TYPE assert args[1] == TEST_HEX_ID assert m["cooker"].check_exists.call_count == 1 assert m["_send_task"].call_count == 1 args = m["_send_task"].call_args[0] assert args[0] == TEST_TYPE assert args[1] == TEST_HEX_ID info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["object_id"] == TEST_HEX_ID assert info["type"] == TEST_TYPE assert info["task_status"] == "new" assert info["task_id"] == 42 assertTimestampAlmostNow(info["ts_created"]) assert info["ts_done"] is None assert info["progress_msg"] is None def test_create_fail_duplicate_task(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) with pytest.raises(psycopg2.IntegrityError): swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) def test_create_fail_nonexisting_object(swh_vault): with mock_cooking(swh_vault) as m: m["cooker"].check_exists.side_effect = ValueError("Nothing here.") with pytest.raises(ValueError): swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) def test_create_set_progress(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["progress_msg"] is None swh_vault.set_progress(TEST_TYPE, TEST_HEX_ID, TEST_PROGRESS) info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["progress_msg"] == TEST_PROGRESS def test_create_set_status(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["task_status"] == "new" assert info["ts_done"] is None swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "pending") info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["task_status"] == "pending" assert info["ts_done"] is None swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["task_status"] == "done" assertTimestampAlmostNow(info["ts_done"]) def test_create_update_access_ts(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.progress(TEST_TYPE, TEST_OBJ_ID) access_ts_1 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_1) swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.progress(TEST_TYPE, TEST_OBJ_ID) access_ts_2 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_2) swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.progress(TEST_TYPE, TEST_OBJ_ID) access_ts_3 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_3) assert access_ts_1 < access_ts_2 assert access_ts_2 < access_ts_3 def test_cook_idempotent(swh_vault, sample_data): with mock_cooking(swh_vault): info1 = swh_vault.cook(TEST_TYPE, TEST_HEX_ID) info2 = swh_vault.cook(TEST_TYPE, TEST_HEX_ID) info3 = swh_vault.cook(TEST_TYPE, TEST_HEX_ID) assert info1 == info2 assert info1 == info3 def test_cook_email_pending_done(swh_vault): with mock_cooking(swh_vault), patch.object( swh_vault, "add_notif_email" ) as madd, patch.object(swh_vault, "send_notification") as msend: swh_vault.cook(TEST_TYPE, TEST_HEX_ID) madd.assert_not_called() msend.assert_not_called() madd.reset_mock() msend.reset_mock() swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email=TEST_EMAIL) madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) msend.assert_not_called() madd.reset_mock() msend.reset_mock() swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email=TEST_EMAIL) msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_HEX_ID, "done") madd.assert_not_called() def test_send_all_emails(swh_vault): with mock_cooking(swh_vault): emails = ("a@example.com", "billg@example.com", "test+42@example.org") for email in emails: swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email=email) swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") with patch.object(swh_vault, "smtp_server") as m: swh_vault.send_notif(TEST_TYPE, TEST_HEX_ID) sent_emails = {k[0][0] for k in m.send_message.call_args_list} assert {k["To"] for k in sent_emails} == set(emails) for e in sent_emails: assert "bot@softwareheritage.org" in e["From"] assert TEST_TYPE in e["Subject"] assert TEST_HEX_ID[:5] in e["Subject"] assert TEST_TYPE in str(e) assert "https://archive.softwareheritage.org/" in str(e) assert TEST_HEX_ID[:5] in str(e) assert "--\x20\n" in str(e) # Well-formated signature!!! # Check that the entries have been deleted and recalling the # function does not re-send the e-mails m.reset_mock() swh_vault.send_notif(TEST_TYPE, TEST_HEX_ID) m.assert_not_called() def test_available(swh_vault): assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) swh_vault.cache.add(TEST_TYPE, TEST_HEX_ID, b"content") assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") assert swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) def test_fetch(swh_vault): assert swh_vault.fetch(TEST_TYPE, TEST_HEX_ID, raise_notfound=False) is None with pytest.raises( NotFoundExc, match=f"{TEST_TYPE} {TEST_HEX_ID} is not available." ): swh_vault.fetch(TEST_TYPE, TEST_HEX_ID) obj_id, content = fake_cook(swh_vault, TEST_TYPE, b"content") info = swh_vault.progress(TEST_TYPE, obj_id) access_ts_before = info["ts_last_access"] assert swh_vault.fetch(TEST_TYPE, obj_id) == b"content" info = swh_vault.progress(TEST_TYPE, obj_id) access_ts_after = info["ts_last_access"] assertTimestampAlmostNow(access_ts_after) assert access_ts_before < access_ts_after def test_cache_expire_oldest(swh_vault): r = range(1, 10) inserted = {} for i in r: sticky = i == 5 content = b"content%s" % str(i).encode() obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) swh_vault.cache_expire_oldest(n=4) should_be_still_here = {2, 3, 5, 8, 9} for i in r: assert swh_vault.is_available(TEST_TYPE, inserted[i][0]) == ( i in should_be_still_here ) def test_cache_expire_until(swh_vault): r = range(1, 10) inserted = {} for i in r: sticky = i == 5 content = b"content%s" % str(i).encode() obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) if i == 7: cutoff_date = datetime.datetime.now() swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) swh_vault.cache_expire_until(date=cutoff_date) should_be_still_here = {2, 3, 5, 8, 9} for i in r: assert swh_vault.is_available(TEST_TYPE, inserted[i][0]) == ( i in should_be_still_here ) def test_fail_cook_simple(swh_vault): fail_cook(swh_vault, TEST_TYPE, TEST_HEX_ID, "error42") assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["progress_msg"] == "error42" def test_send_failure_email(swh_vault): with mock_cooking(swh_vault): swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email="a@example.com") swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "failed") swh_vault.set_progress(TEST_TYPE, TEST_HEX_ID, "test error") with patch.object(swh_vault, "smtp_server") as m: swh_vault.send_notif(TEST_TYPE, TEST_HEX_ID) e = [k[0][0] for k in m.send_message.call_args_list][0] assert e["To"] == "a@example.com" assert "bot@softwareheritage.org" in e["From"] assert TEST_TYPE in e["Subject"] assert TEST_HEX_ID[:5] in e["Subject"] assert "fail" in e["Subject"] assert TEST_TYPE in str(e) assert TEST_HEX_ID[:5] in str(e) assert "test error" in str(e) assert "--\x20\n" in str(e) # Well-formated signature def test_retry_failed_bundle(swh_vault): fail_cook(swh_vault, TEST_TYPE, TEST_HEX_ID, "error42") info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["task_status"] == "failed" with mock_cooking(swh_vault): swh_vault.cook(TEST_TYPE, TEST_HEX_ID) info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["task_status"] == "new" diff --git a/swh/vault/tests/test_cli.py b/swh/vault/tests/test_cli.py index 783a20e..3958add 100644 --- a/swh/vault/tests/test_cli.py +++ b/swh/vault/tests/test_cli.py @@ -1,105 +1,107 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile from unittest.mock import MagicMock import click import click.testing import pytest from swh.vault.cli import vault as vault_cli_group from swh.vault.cookers.base import BaseVaultCooker from swh.vault.in_memory_backend import InMemoryVaultBackend def test_cook_unsupported_swhid(): runner = click.testing.CliRunner() result = runner.invoke(vault_cli_group, ["cook", "swh:1:dir:f00b4r", "-"]) assert isinstance(result.exception, SystemExit) assert "expected core SWHID" in result.stdout result = runner.invoke(vault_cli_group, ["cook", "swh:1:ori:" + "0" * 40, "-"]) assert isinstance(result.exception, SystemExit) assert "expected core SWHID" in result.stdout result = runner.invoke(vault_cli_group, ["cook", "swh:1:cnt:" + "0" * 40, "-"]) assert isinstance(result.exception, SystemExit) assert "No cooker available for CONTENT" in result.stdout def test_cook_unknown_cooker(): runner = click.testing.CliRunner() result = runner.invoke( vault_cli_group, ["cook", "swh:1:dir:" + "0" * 40, "-", "--cooker-type", "gitfast"], ) assert isinstance(result.exception, SystemExit) assert "do not have a gitfast cooker" in result.stdout result = runner.invoke(vault_cli_group, ["cook", "swh:1:rev:" + "0" * 40, "-"]) assert isinstance(result.exception, SystemExit) assert "explicit --cooker-type" in result.stdout @pytest.mark.parametrize( - "obj_type,cooker_name_suffix,swhid_type", + "bundle_type,cooker_name_suffix,swhid_type", [("directory", "", "dir"), ("revision", "gitfast", "rev"),], ) -def test_cook_directory(obj_type, cooker_name_suffix, swhid_type, mocker): +def test_cook_directory(bundle_type, cooker_name_suffix, swhid_type, mocker): storage = object() mocker.patch("swh.storage.get_storage", return_value=storage) backend = MagicMock(spec=InMemoryVaultBackend) backend.fetch.return_value = b"bundle content" mocker.patch( "swh.vault.in_memory_backend.InMemoryVaultBackend", return_value=backend ) cooker = MagicMock(spec=BaseVaultCooker) cooker_cls = MagicMock(return_value=cooker) mocker.patch("swh.vault.cookers.get_cooker_cls", return_value=cooker_cls) runner = click.testing.CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: config_fd.write('{"storage": {}}') config_fd.seek(0) if cooker_name_suffix: result = runner.invoke( vault_cli_group, [ "cook", f"swh:1:{swhid_type}:{'0'*40}", "-", "-C", config_fd.name, "--cooker-type", cooker_name_suffix, ], ) else: result = runner.invoke( vault_cli_group, ["cook", f"swh:1:{swhid_type}:{'0'*40}", "-", "-C", config_fd.name], ) if result.exception is not None: raise result.exception cooker_cls.assert_called_once_with( - obj_type=f"{obj_type}_{cooker_name_suffix}" if cooker_name_suffix else obj_type, + bundle_type=f"{bundle_type}_{cooker_name_suffix}" + if cooker_name_suffix + else bundle_type, obj_id=b"\x00" * 20, backend=backend, storage=storage, graph=None, objstorage=None, max_bundle_size=None, ) cooker.cook.assert_called_once_with() assert result.stdout_bytes == b"bundle content" diff --git a/swh/vault/tests/test_cookers_base.py b/swh/vault/tests/test_cookers_base.py index 3eacecf..e47c072 100644 --- a/swh/vault/tests/test_cookers_base.py +++ b/swh/vault/tests/test_cookers_base.py @@ -1,74 +1,74 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from unittest.mock import MagicMock from swh.model import hashutil from swh.vault.cookers.base import BaseVaultCooker TEST_BUNDLE_CHUNKS = [b"test content 1\n", b"test content 2\n", b"test content 3\n"] TEST_BUNDLE_CONTENT = b"".join(TEST_BUNDLE_CHUNKS) TEST_OBJ_TYPE = "test_type" TEST_HEX_ID = "17a3e48bce37be5226490e750202ad3a9a1a3fe9" TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) class BaseVaultCookerMock(BaseVaultCooker): CACHE_TYPE_KEY = TEST_OBJ_TYPE def __init__(self): # we do not call super() here to bypass the building of db objects from # config since we do mock these db objects self.config = {} self.storage = MagicMock() self.backend = MagicMock() - self.obj_type = self.CACHE_TYPE_KEY + self.bundle_type = self.CACHE_TYPE_KEY self.obj_id = hashutil.hash_to_bytes(TEST_OBJ_ID) self.max_bundle_size = 1024 def check_exists(self): return True def prepare_bundle(self): for chunk in TEST_BUNDLE_CHUNKS: self.write(chunk) def test_simple_cook(): cooker = BaseVaultCookerMock() cooker.cook() cooker.backend.put_bundle.assert_called_once_with( TEST_OBJ_TYPE, TEST_OBJ_ID, TEST_BUNDLE_CONTENT ) cooker.backend.set_status.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID, "done") cooker.backend.set_progress.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID, None) cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID) def test_code_exception_cook(): cooker = BaseVaultCookerMock() cooker.prepare_bundle = MagicMock() cooker.prepare_bundle.side_effect = RuntimeError("Nope") cooker.cook() # Potentially remove this when we have objstorage streaming cooker.backend.put_bundle.assert_not_called() cooker.backend.set_status.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID, "failed") assert "Nope" not in cooker.backend.set_progress.call_args[0][2] cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID) def test_policy_exception_cook(): cooker = BaseVaultCookerMock() cooker.max_bundle_size = 8 cooker.cook() # Potentially remove this when we have objstorage streaming cooker.backend.put_bundle.assert_not_called() cooker.backend.set_status.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID, "failed") assert "exceeds" in cooker.backend.set_progress.call_args[0][2] cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID) diff --git a/swh/vault/tests/test_server.py b/swh/vault/tests/test_server.py index 6072e54..5d55293 100644 --- a/swh/vault/tests/test_server.py +++ b/swh/vault/tests/test_server.py @@ -1,157 +1,157 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import os from typing import Any, Dict import pytest import yaml from swh.core.api.serializers import json_dumps, msgpack_dumps, msgpack_loads from swh.vault.api.server import ( VaultServerApp, check_config, make_app, make_app_from_configfile, ) from swh.vault.tests.test_backend import TEST_HEX_ID def test_make_app_from_file_missing(): with pytest.raises(ValueError, match="Missing configuration path."): make_app_from_configfile() def test_make_app_from_file_does_not_exist(tmp_path): conf_path = os.path.join(str(tmp_path), "vault-server.yml") assert os.path.exists(conf_path) is False with pytest.raises( ValueError, match=f"Configuration path {conf_path} should exist." ): make_app_from_configfile(conf_path) def test_make_app_from_env_variable(swh_vault_config_file): """Server initialization happens through env variable when no path is provided """ app = make_app_from_configfile() assert app is not None def test_make_app_from_file(swh_local_vault_config, tmp_path): """Server initialization happens trough path if provided """ conf_path = os.path.join(str(tmp_path), "vault-server.yml") with open(conf_path, "w") as f: f.write(yaml.dump(swh_local_vault_config)) app = make_app_from_configfile(conf_path) assert app is not None @pytest.fixture def async_app(swh_local_vault_config: Dict[str, Any],) -> VaultServerApp: """Instantiate the vault server application. Note: This requires the db setup to run (fixture swh_vault in charge of this) """ return make_app(swh_local_vault_config) @pytest.fixture def cli(async_app, aiohttp_client, loop): return loop.run_until_complete(aiohttp_client(async_app)) async def test_client_index(cli): resp = await cli.get("/") assert resp.status == 200 async def test_client_cook_notfound(cli): resp = await cli.post( "/cook", - data=json_dumps({"obj_type": "directory", "obj_id": TEST_HEX_ID}), + data=json_dumps({"bundle_type": "directory", "obj_id": TEST_HEX_ID}), headers=[("Content-Type", "application/json")], ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["type"] == "NotFoundExc" assert content["args"] == [f"directory {TEST_HEX_ID} was not found."] async def test_client_progress_notfound(cli): resp = await cli.post( "/progress", - data=json_dumps({"obj_type": "directory", "obj_id": TEST_HEX_ID}), + data=json_dumps({"bundle_type": "directory", "obj_id": TEST_HEX_ID}), headers=[("Content-Type", "application/json")], ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["type"] == "NotFoundExc" assert content["args"] == [f"directory {TEST_HEX_ID} was not found."] async def test_client_batch_cook_invalid_type(cli): resp = await cli.post( "/batch_cook", data=msgpack_dumps({"batch": [("foobar", [])]}), headers={"Content-Type": "application/x-msgpack"}, ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["type"] == "NotFoundExc" assert content["args"] == ["foobar is an unknown type."] async def test_client_batch_progress_notfound(cli): resp = await cli.post( "/batch_progress", data=msgpack_dumps({"batch_id": 1}), headers={"Content-Type": "application/x-msgpack"}, ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["type"] == "NotFoundExc" assert content["args"] == ["Batch 1 does not exist."] def test_check_config_missing_vault_configuration() -> None: """Irrelevant configuration file path raises""" with pytest.raises(ValueError, match="missing 'vault' configuration"): check_config({}) def test_check_config_not_local() -> None: """Wrong configuration raises""" expected_error = ( "The vault backend can only be started with a 'local' configuration" ) with pytest.raises(EnvironmentError, match=expected_error): check_config({"vault": {"cls": "remote"}}) @pytest.mark.parametrize("missing_key", ["storage", "cache", "scheduler"]) def test_check_config_missing_key(missing_key, swh_vault_config) -> None: """Any other configuration than 'local' (the default) is rejected""" config_ok = {"vault": {"cls": "local", **swh_vault_config}} config_ko = copy.deepcopy(config_ok) config_ko["vault"].pop(missing_key, None) expected_error = f"invalid configuration: missing {missing_key} config entry" with pytest.raises(ValueError, match=expected_error): check_config(config_ko) @pytest.mark.parametrize("missing_key", ["storage", "cache", "scheduler"]) def test_check_config_ok(missing_key, swh_vault_config) -> None: """Any other configuration than 'local' (the default) is rejected""" config_ok = {"vault": {"cls": "local", **swh_vault_config}} assert check_config(config_ok) is not None