diff --git a/requirements.txt b/requirements.txt index 328c1d5..bbc6f49 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ click flask psycopg2 python-dateutil fastimport - +typing-extensions diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py index 79a2fd4..0a5fd42 100644 --- a/swh/vault/api/client.py +++ b/swh/vault/api/client.py @@ -1,59 +1,15 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.core.api import RPCClient -from swh.model import hashutil from swh.vault.exc import NotFoundExc +from swh.vault.interface import VaultInterface class RemoteVaultClient(RPCClient): """Client to the Software Heritage vault cache.""" + backend_class = VaultInterface reraise_exceptions = [NotFoundExc] - - # Web API endpoints - - def fetch(self, obj_type, obj_id): - hex_id = hashutil.hash_to_hex(obj_id) - return self.get("fetch/{}/{}".format(obj_type, hex_id)) - - def cook(self, obj_type, obj_id, email=None): - hex_id = hashutil.hash_to_hex(obj_id) - return self.post( - "cook/{}/{}".format(obj_type, hex_id), - data={}, - params=({"email": email} if email else None), - ) - - def progress(self, obj_type, obj_id): - hex_id = hashutil.hash_to_hex(obj_id) - return self.get("progress/{}/{}".format(obj_type, hex_id)) - - # Cookers endpoints - - def set_progress(self, obj_type, obj_id, progress): - hex_id = hashutil.hash_to_hex(obj_id) - return self.post("set_progress/{}/{}".format(obj_type, hex_id), data=progress) - - def set_status(self, obj_type, obj_id, status): - hex_id = hashutil.hash_to_hex(obj_id) - return self.post("set_status/{}/{}".format(obj_type, hex_id), data=status) - - # TODO: handle streaming properly - def put_bundle(self, obj_type, obj_id, bundle): - hex_id = hashutil.hash_to_hex(obj_id) - return self.post("put_bundle/{}/{}".format(obj_type, hex_id), data=bundle) - - def send_notif(self, obj_type, obj_id): - hex_id = hashutil.hash_to_hex(obj_id) - return self.post("send_notif/{}/{}".format(obj_type, hex_id), data=None) - - # Batch endpoints - - def batch_cook(self, batch): - return self.post("batch_cook", data=batch) - - def batch_progress(self, batch_id): - return self.get("batch_progress/{}".format(batch_id)) diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py index 10336bc..31ef3cc 100644 --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -1,235 +1,124 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio -import collections import os from typing import Any, Dict, Optional import aiohttp.web -from swh.core import config -from swh.core.api.asynchronous import RPCServerApp, decode_request -from swh.core.api.asynchronous import encode_data_server as encode_data -from swh.model import hashutil -from swh.vault import get_vault +from swh.core.api.asynchronous import RPCServerApp +from swh.core.config import config_basepath, merge_configs, read_raw_config +from swh.vault import get_vault as get_swhvault from swh.vault.backend import NotFoundExc -from swh.vault.cookers import COOKER_TYPES +from swh.vault.interface import VaultInterface -DEFAULT_CONFIG_PATH = "vault/server" DEFAULT_CONFIG = { - "storage": ("dict", {"cls": "remote", "args": {"url": "http://localhost:5002/",},}), - "cache": ( - "dict", - { - "cls": "pathslicing", - "args": {"root": "/srv/softwareheritage/vault", "slicing": "0:1/1:5",}, - }, - ), - "client_max_size": ("int", 1024 ** 3), - "vault": ( - "dict", - {"cls": "local", "args": {"db": "dbname=softwareheritage-vault-dev",},}, - ), - "scheduler": ("dict", {"cls": "remote", "url": "http://localhost:5008/",},), + "storage": {"cls": "remote", "url": "http://localhost:5002/"}, + "cache": { + "cls": "pathslicing", + "args": {"root": "/srv/softwareheritage/vault", "slicing": "0:1/1:5"}, + }, + "client_max_size": 1024 ** 3, + "vault": {"cls": "local", "args": {"db": "dbname=softwareheritage-vault-dev",}}, + "scheduler": {"cls": "remote", "url": "http://localhost:5008/"}, } -@asyncio.coroutine -def index(request): - return aiohttp.web.Response(body="SWH Vault API server") - - -# Web API endpoints - - -@asyncio.coroutine -def vault_fetch(request): - obj_type = request.match_info["type"] - obj_id = request.match_info["id"] - - if not request.app["backend"].is_available(obj_type, obj_id): - raise NotFoundExc(f"{obj_type} {obj_id} is not available.") - - return encode_data(request.app["backend"].fetch(obj_type, obj_id)) - - -def user_info(task_info): - return { - "id": task_info["id"], - "status": task_info["task_status"], - "progress_message": task_info["progress_msg"], - "obj_type": task_info["type"], - "obj_id": hashutil.hash_to_hex(task_info["object_id"]), - } - - -@asyncio.coroutine -def vault_cook(request): - obj_type = request.match_info["type"] - obj_id = request.match_info["id"] - email = request.query.get("email") - sticky = request.query.get("sticky") in ("true", "1") +vault = None +app = None - if obj_type not in COOKER_TYPES: - raise NotFoundExc(f"{obj_type} is an unknown type.") - info = request.app["backend"].cook_request( - obj_type, obj_id, email=email, sticky=sticky - ) - - # TODO: return 201 status (Created) once the api supports it - return encode_data(user_info(info)) - - -@asyncio.coroutine -def vault_progress(request): - obj_type = request.match_info["type"] - obj_id = request.match_info["id"] - - info = request.app["backend"].task_info(obj_type, obj_id) - if not info: - raise NotFoundExc(f"{obj_type} {obj_id} was not found.") +def get_vault(config: Optional[Dict[str, Any]] = None) -> VaultInterface: + global vault + if not vault: + assert config is not None + vault = get_swhvault(**config) + return vault - return encode_data(user_info(info)) - -# Cookers endpoints +class VaultServerApp(RPCServerApp): + client_exception_classes = (NotFoundExc,) @asyncio.coroutine -def set_progress(request): - obj_type = request.match_info["type"] - obj_id = request.match_info["id"] - progress = yield from decode_request(request) - request.app["backend"].set_progress(obj_type, obj_id, progress) - return encode_data(True) # FIXME: success value? - - -@asyncio.coroutine -def set_status(request): - obj_type = request.match_info["type"] - obj_id = request.match_info["id"] - status = yield from decode_request(request) - request.app["backend"].set_status(obj_type, obj_id, status) - return encode_data(True) # FIXME: success value? - - -@asyncio.coroutine -def put_bundle(request): - obj_type = request.match_info["type"] - obj_id = request.match_info["id"] - - # TODO: handle streaming properly - content = yield from decode_request(request) - request.app["backend"].cache.add(obj_type, obj_id, content) - return encode_data(True) # FIXME: success value? - - -@asyncio.coroutine -def send_notif(request): - obj_type = request.match_info["type"] - obj_id = request.match_info["id"] - request.app["backend"].send_all_notifications(obj_type, obj_id) - return encode_data(True) # FIXME: success value? - - -# Batch endpoints - - -@asyncio.coroutine -def batch_cook(request): - batch = yield from decode_request(request) - for obj_type, obj_id in batch: - if obj_type not in COOKER_TYPES: - raise NotFoundExc(f"{obj_type} is an unknown type.") - batch_id = request.app["backend"].batch_cook(batch) - return encode_data({"id": batch_id}) - - -@asyncio.coroutine -def batch_progress(request): - batch_id = request.match_info["batch_id"] - bundles = request.app["backend"].batch_info(batch_id) - if not bundles: - raise NotFoundExc(f"Batch {batch_id} does not exist.") - bundles = [user_info(bundle) for bundle in bundles] - counter = collections.Counter(b["status"] for b in bundles) - res = { - "bundles": bundles, - "total": len(bundles), - **{k: 0 for k in ("new", "pending", "done", "failed")}, - **dict(counter), - } - return encode_data(res) - - -# Web server - - -def make_app(backend, **kwargs): - app = RPCServerApp(**kwargs) - app.router.add_route("GET", "/", index) - app.client_exception_classes = (NotFoundExc,) +def index(request): + return aiohttp.web.Response(body="SWH Vault API server") - # Endpoints used by the web API - app.router.add_route("GET", "/fetch/{type}/{id}", vault_fetch) - app.router.add_route("POST", "/cook/{type}/{id}", vault_cook) - app.router.add_route("GET", "/progress/{type}/{id}", vault_progress) - # Endpoints used by the Cookers - app.router.add_route("POST", "/set_progress/{type}/{id}", set_progress) - app.router.add_route("POST", "/set_status/{type}/{id}", set_status) - app.router.add_route("POST", "/put_bundle/{type}/{id}", put_bundle) - app.router.add_route("POST", "/send_notif/{type}/{id}", send_notif) +def check_config(cfg: Dict[str, Any]) -> Dict[str, Any]: + """Ensure the configuration is ok to run a local vault server - # Endpoints for batch requests - app.router.add_route("POST", "/batch_cook", batch_cook) - app.router.add_route("GET", "/batch_progress/{batch_id}", batch_progress) + Raises: + EnvironmentError if the configuration is not for local instance + ValueError if one of the following keys is missing: vault, cache, storage, + scheduler - app["backend"] = backend - return app + Returns: + Configuration dict to instantiate a local vault server instance - -def check_config(cfg: Dict[str, Any]) -> Dict[str, Any]: + """ if "vault" not in cfg: raise ValueError("missing 'vault' configuration") vcfg = cfg["vault"] if vcfg["cls"] != "local": raise EnvironmentError( "The vault backend can only be started with a 'local' configuration", ) args = vcfg["args"] if "cache" not in args: args["cache"] = cfg.get("cache") if "storage" not in args: args["storage"] = cfg.get("storage") if "scheduler" not in args: args["scheduler"] = cfg.get("scheduler") for key in ("cache", "storage", "scheduler"): if not args.get(key): raise ValueError(f"invalid configuration: missing {key} config entry.") - return args - - -def make_app_from_configfile(config_file: Optional[str] = None, **kwargs): - if config_file is None: - config_file = DEFAULT_CONFIG_PATH - config_file = os.environ.get("SWH_CONFIG_FILENAME", config_file) - assert config_file is not None - if os.path.isfile(config_file): - cfg = config.read(config_file, DEFAULT_CONFIG) - else: - cfg = config.load_named_config(config_file, DEFAULT_CONFIG) - kwargs = check_config(cfg) - vault = get_vault("local", **kwargs) - return make_app(backend=vault, client_max_size=cfg["client_max_size"], **kwargs) + return cfg + + +def make_app(config_to_check: Dict[str, Any]) -> VaultServerApp: + """Ensure the configuration is ok, then instantiate the server application + + """ + config_ok = check_config(config_to_check) + app = VaultServerApp( + __name__, + backend_class=VaultInterface, + backend_factory=lambda: get_vault(config_ok["vault"]), + client_max_size=config_ok["client_max_size"], + ) + app.router.add_route("GET", "/", index) + return app + + +def make_app_from_configfile( + config_path: Optional[str] = None, **kwargs +) -> VaultServerApp: + """Load and check configuration if ok, then instantiate (once) a vault server + application. + + """ + global app + if not app: + config_path = os.environ.get("SWH_CONFIG_FILENAME", config_path) + if not config_path: + raise ValueError("Missing configuration path.") + if not os.path.isfile(config_path): + raise ValueError(f"Configuration path {config_path} should exist.") + + app_config = read_raw_config(config_basepath(config_path)) + app_config = merge_configs(DEFAULT_CONFIG, app_config) + app = make_app(app_config) + + return app if __name__ == "__main__": print("Deprecated. Use swh-vault ") diff --git a/swh/vault/backend.py b/swh/vault/backend.py index 69d4690..9355b06 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,490 +1,551 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import collections from email.mime.text import MIMEText import smtplib +from typing import Any, Dict, List, Optional, Tuple import psycopg2.extras import psycopg2.pool from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.model import hashutil from swh.scheduler import get_scheduler from swh.scheduler.utils import create_oneshot_task_dict from swh.storage import get_storage from swh.vault.cache import VaultCache -from swh.vault.cookers import get_cooker_cls +from swh.vault.cookers import COOKER_TYPES, get_cooker_cls from swh.vault.exc import NotFoundExc +from swh.vault.interface import ObjectId cooking_task_name = "swh.vault.cooking_tasks.SWHCookingTask" NOTIF_EMAIL_FROM = '"Software Heritage Vault" ' "" NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {obj_type} {short_id}" NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {obj_type} {short_id}" NOTIF_EMAIL_BODY_SUCCESS = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ NOTIF_EMAIL_BODY_FAILURE = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle could not be cooked for the following reason: {progress_msg} We apologize for the inconvenience. --\x20 The Software Heritage Developers """ -def batch_to_bytes(batch): - return [(obj_type, hashutil.hash_to_bytes(obj_id)) for obj_type, obj_id in batch] +def batch_to_bytes(batch: List[Tuple[str, str]]) -> List[Tuple[str, bytes]]: + return [(obj_type, hashutil.hash_to_bytes(hex_id)) for obj_type, hex_id in batch] class VaultBackend: """ Backend for the Software Heritage vault. """ def __init__(self, db, **config): self.config = config self.cache = VaultCache(**config["cache"]) self.scheduler = get_scheduler(**config["scheduler"]) self.storage = get_storage(**config["storage"]) self.smtp_server = smtplib.SMTP() self._pool = psycopg2.pool.ThreadedConnectionPool( config.get("min_pool_conns", 1), config.get("max_pool_conns", 10), db, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() + def _compute_ids(self, obj_id: ObjectId) -> Tuple[str, bytes]: + """Internal method to reconcile multiple possible inputs + + """ + if isinstance(obj_id, str): + return obj_id, hashutil.hash_to_bytes(obj_id) + return hashutil.hash_to_hex(obj_id), obj_id + @db_transaction() - def task_info(self, obj_type, obj_id, db=None, cur=None): - """Fetch information from a bundle""" - obj_id = hashutil.hash_to_bytes(obj_id) + def progress( + self, + obj_type: str, + obj_id: ObjectId, + raise_notfound: bool = True, + db=None, + cur=None, + ) -> Optional[Dict[str, Any]]: + hex_id, obj_id = self._compute_ids(obj_id) cur.execute( """ SELECT id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s""", (obj_type, obj_id), ) res = cur.fetchone() - if res: - res["object_id"] = bytes(res["object_id"]) + if not res: + if raise_notfound: + raise NotFoundExc(f"{obj_type} {hex_id} was not found.") + return None + + res["object_id"] = hashutil.hash_to_hex(res["object_id"]) return res - def _send_task(self, *args): + def _send_task(self, obj_type: str, hex_id: ObjectId): """Send a cooking task to the celery scheduler""" - task = create_oneshot_task_dict("cook-vault-bundle", *args) + task = create_oneshot_task_dict("cook-vault-bundle", obj_type, hex_id) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]["id"] @db_transaction() - def create_task(self, obj_type, obj_id, sticky=False, db=None, cur=None): + def create_task( + self, obj_type: str, obj_id: bytes, sticky: bool = False, db=None, cur=None + ): """Create and send a cooking task""" - obj_id = hashutil.hash_to_bytes(obj_id) - hex_id = hashutil.hash_to_hex(obj_id) + hex_id, obj_id = self._compute_ids(obj_id) cooker_class = get_cooker_cls(obj_type) cooker = cooker_class(obj_type, hex_id, backend=self, storage=self.storage) + if not cooker.check_exists(): - raise NotFoundExc("Object {} was not found.".format(hex_id)) + raise NotFoundExc(f"{obj_type} {hex_id} was not found.") cur.execute( """ INSERT INTO vault_bundle (type, object_id, sticky) VALUES (%s, %s, %s)""", (obj_type, obj_id, sticky), ) db.conn.commit() task_id = self._send_task(obj_type, hex_id) cur.execute( """ UPDATE vault_bundle SET task_id = %s WHERE type = %s AND object_id = %s""", (task_id, obj_type, obj_id), ) @db_transaction() - def add_notif_email(self, obj_type, obj_id, email, db=None, cur=None): + def add_notif_email( + self, obj_type: str, obj_id: bytes, email: str, db=None, cur=None + ): """Add an e-mail address to notify when a given bundle is ready""" - obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( """ INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))""", (email, obj_type, obj_id), ) + def put_bundle(self, obj_type: str, obj_id: ObjectId, bundle) -> bool: + _, obj_id = self._compute_ids(obj_id) + self.cache.add(obj_type, obj_id, bundle) + return True + @db_transaction() - def cook_request( - self, obj_type, obj_id, *, sticky=False, email=None, db=None, cur=None - ): - """Main entry point for cooking requests. This starts a cooking task if - needed, and add the given e-mail to the notify list""" - obj_id = hashutil.hash_to_bytes(obj_id) - info = self.task_info(obj_type, obj_id) + def cook( + self, + obj_type: str, + obj_id: ObjectId, + *, + sticky: bool = False, + email: Optional[str] = None, + db=None, + cur=None, + ) -> Dict[str, Any]: + hex_id, obj_id = self._compute_ids(obj_id) + info = self.progress(obj_type, obj_id, raise_notfound=False) + + if obj_type not in COOKER_TYPES: + raise NotFoundExc(f"{obj_type} is an unknown type.") # If there's a failed bundle entry, delete it first. if info is not None and info["task_status"] == "failed": + obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( - """DELETE FROM vault_bundle - WHERE type = %s AND object_id = %s""", + "DELETE FROM vault_bundle WHERE type = %s AND object_id = %s", (obj_type, obj_id), ) db.conn.commit() info = None # If there's no bundle entry, create the task. if info is None: self.create_task(obj_type, obj_id, sticky) if email is not None: # If the task is already done, send the email directly if info is not None and info["task_status"] == "done": self.send_notification( - None, email, obj_type, obj_id, info["task_status"] + None, email, obj_type, hex_id, info["task_status"] ) # Else, add it to the notification queue else: self.add_notif_email(obj_type, obj_id, email) - info = self.task_info(obj_type, obj_id) - return info + return self.progress(obj_type, obj_id) @db_transaction() - def batch_cook(self, batch, db=None, cur=None): - """Cook a batch of bundles and returns the cooking id.""" + def batch_cook( + self, batch: List[Tuple[str, str]], db=None, cur=None + ) -> Dict[str, int]: # Import execute_values at runtime only, because it requires # psycopg2 >= 2.7 (only available on postgresql servers) from psycopg2.extras import execute_values + for obj_type, _ in batch: + if obj_type not in COOKER_TYPES: + raise NotFoundExc(f"{obj_type} is an unknown type.") + cur.execute( """ INSERT INTO vault_batch (id) VALUES (DEFAULT) RETURNING id""" ) batch_id = cur.fetchone()["id"] - batch = batch_to_bytes(batch) + batch_bytes = batch_to_bytes(batch) # Delete all failed bundles from the batch cur.execute( """ DELETE FROM vault_bundle WHERE task_status = 'failed' AND (type, object_id) IN %s""", - (tuple(batch),), + (tuple(batch_bytes),), ) # Insert all the bundles, return the new ones execute_values( cur, """ INSERT INTO vault_bundle (type, object_id) VALUES %s ON CONFLICT DO NOTHING""", - batch, + batch_bytes, ) # Get the bundle ids and task status cur.execute( """ SELECT id, type, object_id, task_id FROM vault_bundle WHERE (type, object_id) IN %s""", - (tuple(batch),), + (tuple(batch_bytes),), ) bundles = cur.fetchall() # Insert the batch-bundle entries batch_id_bundle_ids = [(batch_id, row["id"]) for row in bundles] execute_values( cur, """ INSERT INTO vault_batch_bundle (batch_id, bundle_id) VALUES %s ON CONFLICT DO NOTHING""", batch_id_bundle_ids, ) db.conn.commit() # Get the tasks to fetch batch_new = [ (row["type"], bytes(row["object_id"])) for row in bundles if row["task_id"] is None ] # Send the tasks args_batch = [ (obj_type, hashutil.hash_to_hex(obj_id)) for obj_type, obj_id in batch_new ] # TODO: change once the scheduler handles priority tasks tasks = [ create_oneshot_task_dict("swh-vault-batch-cooking", *args) for args in args_batch ] added_tasks = self.scheduler.create_tasks(tasks) - tasks_ids_bundle_ids = zip([task["id"] for task in added_tasks], batch_new) tasks_ids_bundle_ids = [ (task_id, obj_type, obj_id) - for task_id, (obj_type, obj_id) in tasks_ids_bundle_ids + for task_id, (obj_type, obj_id) in zip( + [task["id"] for task in added_tasks], batch_new + ) ] # Update the task ids execute_values( cur, """ UPDATE vault_bundle SET task_id = s_task_id FROM (VALUES %s) AS sub (s_task_id, s_type, s_object_id) WHERE type = s_type::cook_type AND object_id = s_object_id """, tasks_ids_bundle_ids, ) - return batch_id + return {"id": batch_id} @db_transaction() - def batch_info(self, batch_id, db=None, cur=None): - """Fetch information from a batch of bundles""" + def batch_progress(self, batch_id: int, db=None, cur=None) -> Dict[str, Any]: cur.execute( """ SELECT vault_bundle.id as id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_batch_bundle LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id WHERE batch_id = %s""", (batch_id,), ) - res = cur.fetchall() - if res: - for d in res: - d["object_id"] = bytes(d["object_id"]) + bundles = cur.fetchall() + if not bundles: + raise NotFoundExc(f"Batch {batch_id} does not exist.") + + for bundle in bundles: + bundle["object_id"] = hashutil.hash_to_hex(bundle["object_id"]) + + counter = collections.Counter(b["status"] for b in bundles) + res = { + "bundles": bundles, + "total": len(bundles), + **{k: 0 for k in ("new", "pending", "done", "failed")}, + **dict(counter), + } + return res @db_transaction() - def is_available(self, obj_type, obj_id, db=None, cur=None): + def is_available(self, obj_type: str, obj_id: ObjectId, db=None, cur=None): """Check whether a bundle is available for retrieval""" - info = self.task_info(obj_type, obj_id, cur=cur) + info = self.progress(obj_type, obj_id, raise_notfound=False, cur=cur) + obj_id = hashutil.hash_to_bytes(obj_id) return ( info is not None and info["task_status"] == "done" and self.cache.is_cached(obj_type, obj_id) ) @db_transaction() - def fetch(self, obj_type, obj_id, db=None, cur=None): + def fetch( + self, obj_type: str, obj_id: ObjectId, raise_notfound=True, db=None, cur=None + ): """Retrieve a bundle from the cache""" - if not self.is_available(obj_type, obj_id, cur=cur): + hex_id, obj_id = self._compute_ids(obj_id) + available = self.is_available(obj_type, obj_id, cur=cur) + if not available: + if raise_notfound: + raise NotFoundExc(f"{obj_type} {hex_id} is not available.") return None self.update_access_ts(obj_type, obj_id, cur=cur) return self.cache.get(obj_type, obj_id) @db_transaction() - def update_access_ts(self, obj_type, obj_id, db=None, cur=None): + def update_access_ts(self, obj_type: str, obj_id: bytes, db=None, cur=None): """Update the last access timestamp of a bundle""" - obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( """ UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s""", (obj_type, obj_id), ) @db_transaction() - def set_status(self, obj_type, obj_id, status, db=None, cur=None): - """Set the cooking status of a bundle""" + def set_status( + self, obj_type: str, obj_id: ObjectId, status: str, db=None, cur=None + ) -> bool: obj_id = hashutil.hash_to_bytes(obj_id) req = ( """ UPDATE vault_bundle SET task_status = %s """ + (""", ts_done = NOW() """ if status == "done" else "") + """WHERE type = %s AND object_id = %s""" ) cur.execute(req, (status, obj_type, obj_id)) + return True @db_transaction() - def set_progress(self, obj_type, obj_id, progress, db=None, cur=None): - """Set the cooking progress of a bundle""" + def set_progress( + self, obj_type: str, obj_id: ObjectId, progress: str, db=None, cur=None + ) -> bool: obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( """ UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s""", (progress, obj_type, obj_id), ) + return True @db_transaction() - def send_all_notifications(self, obj_type, obj_id, db=None, cur=None): - """Send all the e-mails in the notification list of a bundle""" - obj_id = hashutil.hash_to_bytes(obj_id) + def send_notif(self, obj_type: str, obj_id: ObjectId, db=None, cur=None) -> bool: + hex_id, obj_id = self._compute_ids(obj_id) cur.execute( """ SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s""", (obj_type, obj_id), ) for d in cur: self.send_notification( d["id"], d["email"], obj_type, - obj_id, + hex_id, status=d["task_status"], progress_msg=d["progress_msg"], ) + return True @db_transaction() def send_notification( self, - n_id, - email, - obj_type, - obj_id, - status, - progress_msg=None, + n_id: Optional[int], + email: str, + obj_type: str, + hex_id: str, + status: str, + progress_msg: Optional[str] = None, db=None, cur=None, - ): + ) -> None: """Send the notification of a bundle to a specific e-mail""" - hex_id = hashutil.hash_to_hex(obj_id) short_id = hex_id[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = "https://archive.softwareheritage.org/api/1/vault/{}/{}/" "raw".format( obj_type, hex_id ) if status == "done": text = NOTIF_EMAIL_BODY_SUCCESS.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_SUCCESS.format( obj_type=obj_type, short_id=short_id ) elif status == "failed": text = NOTIF_EMAIL_BODY_FAILURE.strip() text = text.format( obj_type=obj_type, hex_id=hex_id, progress_msg=progress_msg ) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_FAILURE.format( obj_type=obj_type, short_id=short_id ) else: raise RuntimeError( "send_notification called on a '{}' bundle".format(status) ) msg["From"] = NOTIF_EMAIL_FROM msg["To"] = email self._smtp_send(msg) if n_id is not None: cur.execute( """ DELETE FROM vault_notif_email WHERE id = %s""", (n_id,), ) - def _smtp_send(self, msg): + def _smtp_send(self, msg: MIMEText): # Reconnect if needed try: status = self.smtp_server.noop()[0] except smtplib.SMTPException: status = -1 if status != 250: self.smtp_server.connect("localhost", 25) # Send the message self.smtp_server.send_message(msg) @db_transaction() - def _cache_expire(self, cond, *args, db=None, cur=None): + def _cache_expire(self, cond, *args, db=None, cur=None) -> None: """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cur.execute( """ DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, object_id """.format( cond ), args, ) for d in cur: self.cache.delete(d["type"], bytes(d["object_id"])) @db_transaction() - def cache_expire_oldest(self, n=1, by="last_access", db=None, cur=None): + def cache_expire_oldest(self, n=1, by="last_access", db=None, cur=None) -> None: """Expire the `n` oldest bundles""" assert by in ("created", "done", "last_access") filter = """ORDER BY ts_{} LIMIT {}""".format(by, n) return self._cache_expire(filter) @db_transaction() - def cache_expire_until(self, date, by="last_access", db=None, cur=None): + def cache_expire_until(self, date, by="last_access", db=None, cur=None) -> None: """Expire all the bundles until a certain date""" assert by in ("created", "done", "last_access") filter = """AND ts_{} <= %s""".format(by) return self._cache_expire(filter, date) diff --git a/swh/vault/interface.py b/swh/vault/interface.py new file mode 100644 index 0000000..6657c2a --- /dev/null +++ b/swh/vault/interface.py @@ -0,0 +1,70 @@ +# Copyright (C) 2017-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, List, Optional, Tuple, Union + +from typing_extensions import Protocol, runtime_checkable + +from swh.core.api import remote_api_endpoint + +ObjectId = Union[str, bytes] + + +@runtime_checkable +class VaultInterface(Protocol): + """ + Backend Interface for the Software Heritage vault. + """ + + @remote_api_endpoint("fetch") + def fetch(self, obj_type: str, obj_id: ObjectId) -> Dict[str, Any]: + """Fetch information from a bundle""" + ... + + @remote_api_endpoint("cook") + def cook( + self, obj_type: str, obj_id: ObjectId, email: Optional[str] = None + ) -> Dict[str, Any]: + """Main entry point for cooking requests. This starts a cooking task if + needed, and add the given e-mail to the notify list""" + ... + + @remote_api_endpoint("progress") + def progress(self, obj_type: str, obj_id: ObjectId): + ... + + # Cookers endpoints + + @remote_api_endpoint("set_progress") + def set_progress(self, obj_type: str, obj_id: ObjectId, progress: str) -> None: + """Set the cooking progress of a bundle""" + ... + + @remote_api_endpoint("set_status") + def set_status(self, obj_type: str, obj_id: ObjectId, status: str) -> None: + """Set the cooking status of a bundle""" + ... + + @remote_api_endpoint("put_bundle") + def put_bundle(self, obj_type: str, obj_id: ObjectId, bundle): + """Store bundle in vault cache""" + ... + + @remote_api_endpoint("send_notif") + def send_notif(self, obj_type: str, obj_id: ObjectId): + """Send all the e-mails in the notification list of a bundle""" + ... + + # Batch endpoints + + @remote_api_endpoint("batch_cook") + def batch_cook(self, batch: List[Tuple[str, str]]) -> int: + """Cook a batch of bundles and returns the cooking id.""" + ... + + @remote_api_endpoint("batch_progress") + def batch_progress(self, batch_id: int) -> Dict[str, Any]: + """Fetch information from a batch of bundles""" + ... diff --git a/swh/vault/tests/conftest.py b/swh/vault/tests/conftest.py index 1a0a71c..adc1a8b 100644 --- a/swh/vault/tests/conftest.py +++ b/swh/vault/tests/conftest.py @@ -1,79 +1,97 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Any, Dict import pkg_resources.extern.packaging.version import pytest +import yaml from swh.core.db.pytest_plugin import postgresql_fact from swh.storage.tests import SQL_DIR as STORAGE_SQL_DIR import swh.vault from swh.vault import get_vault os.environ["LC_ALL"] = "C.UTF-8" pytest_v = pkg_resources.get_distribution("pytest").parsed_version if pytest_v < pkg_resources.extern.packaging.version.parse("3.9"): @pytest.fixture def tmp_path(request): import pathlib import tempfile with tempfile.TemporaryDirectory() as tmpdir: yield pathlib.Path(tmpdir) def db_url(name, postgresql_proc): return "postgresql://{user}@{host}:{port}/{dbname}".format( host=postgresql_proc.host, port=postgresql_proc.port, user="postgres", dbname=name, ) VAULT_SQL_DIR = os.path.join(os.path.dirname(swh.vault.__file__), "sql") postgres_vault = postgresql_fact( "postgresql_proc", db_name="vault", dump_files=f"{VAULT_SQL_DIR}/*.sql" ) postgres_storage = postgresql_fact( "postgresql_proc", db_name="storage", dump_files=f"{STORAGE_SQL_DIR}/*.sql" ) @pytest.fixture def swh_vault_config(postgres_vault, postgres_storage, tmp_path) -> Dict[str, Any]: tmp_path = str(tmp_path) return { "db": postgres_vault.dsn, "storage": { "cls": "local", "db": postgres_storage.dsn, "objstorage": { "cls": "pathslicing", "args": {"root": tmp_path, "slicing": "0:1/1:5",}, }, }, "cache": { "cls": "pathslicing", - "args": {"root": tmp_path, "slicing": "0:1/1:5", "allow_delete": True,}, + "args": {"root": tmp_path, "slicing": "0:1/1:5", "allow_delete": True}, }, "scheduler": {"cls": "remote", "url": "http://swh-scheduler:5008",}, } +@pytest.fixture +def swh_local_vault_config(swh_vault_config: Dict[str, Any]) -> Dict[str, Any]: + return { + "vault": {"cls": "local", "args": swh_vault_config}, + "client_max_size": 1024 ** 3, + } + + +@pytest.fixture +def swh_vault_config_file(swh_local_vault_config, monkeypatch, tmp_path): + conf_path = os.path.join(str(tmp_path), "vault-server.yml") + with open(conf_path, "w") as f: + f.write(yaml.dump(swh_local_vault_config)) + monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) + return conf_path + + @pytest.fixture def swh_vault(request, swh_vault_config): return get_vault("local", **swh_vault_config) @pytest.fixture def swh_storage(swh_vault): return swh_vault.storage diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py index c699e55..2ec68e5 100644 --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -1,336 +1,353 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime from unittest.mock import MagicMock, patch +import attr import psycopg2 import pytest from swh.model import hashutil +from swh.vault.exc import NotFoundExc from swh.vault.tests.vault_testing import hash_content @contextlib.contextmanager def mock_cooking(vault_backend): with patch.object(vault_backend, "_send_task") as mt: mt.return_value = 42 with patch("swh.vault.backend.get_cooker_cls") as mg: mcc = MagicMock() mc = MagicMock() mg.return_value = mcc mcc.return_value = mc mc.check_exists.return_value = True yield { "_send_task": mt, "get_cooker_cls": mg, "cooker_cls": mcc, "cooker": mc, } def assertTimestampAlmostNow(ts, tolerance_secs=1.0): # noqa now = datetime.datetime.now(datetime.timezone.utc) creation_delta_secs = (ts - now).total_seconds() assert creation_delta_secs < tolerance_secs def fake_cook(backend, obj_type, result_content, sticky=False): content, obj_id = hash_content(result_content) with mock_cooking(backend): backend.create_task(obj_type, obj_id, sticky) backend.cache.add(obj_type, obj_id, b"content") backend.set_status(obj_type, obj_id, "done") return obj_id, content def fail_cook(backend, obj_type, obj_id, failure_reason): with mock_cooking(backend): backend.create_task(obj_type, obj_id) backend.set_status(obj_type, obj_id, "failed") backend.set_progress(obj_type, obj_id, failure_reason) TEST_TYPE = "revision_gitfast" TEST_HEX_ID = "4a4b9771542143cf070386f86b4b92d42966bdbc" TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) TEST_PROGRESS = ( "Mr. White, You're telling me you're cooking again?" " \N{ASTONISHED FACE} " ) TEST_EMAIL = "ouiche@lorraine.fr" +@pytest.fixture +def swh_vault(swh_vault, sample_data): + # make the vault's storage consistent with test data + revision = attr.evolve(sample_data.revision, id=TEST_OBJ_ID) + swh_vault.storage.revision_add([revision]) + return swh_vault + + def test_create_task_simple(swh_vault): with mock_cooking(swh_vault) as m: - swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) m["get_cooker_cls"].assert_called_once_with(TEST_TYPE) args = m["cooker_cls"].call_args[0] assert args[0] == TEST_TYPE assert args[1] == TEST_HEX_ID assert m["cooker"].check_exists.call_count == 1 assert m["_send_task"].call_count == 1 args = m["_send_task"].call_args[0] assert args[0] == TEST_TYPE assert args[1] == TEST_HEX_ID - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) - assert info["object_id"] == TEST_OBJ_ID + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) + assert info["object_id"] == TEST_HEX_ID assert info["type"] == TEST_TYPE assert info["task_status"] == "new" assert info["task_id"] == 42 assertTimestampAlmostNow(info["ts_created"]) assert info["ts_done"] is None assert info["progress_msg"] is None def test_create_fail_duplicate_task(swh_vault): with mock_cooking(swh_vault): - swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) with pytest.raises(psycopg2.IntegrityError): - swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) def test_create_fail_nonexisting_object(swh_vault): with mock_cooking(swh_vault) as m: m["cooker"].check_exists.side_effect = ValueError("Nothing here.") with pytest.raises(ValueError): - swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) def test_create_set_progress(swh_vault): with mock_cooking(swh_vault): - swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["progress_msg"] is None - swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, TEST_PROGRESS) - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + swh_vault.set_progress(TEST_TYPE, TEST_HEX_ID, TEST_PROGRESS) + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["progress_msg"] == TEST_PROGRESS def test_create_set_status(swh_vault): with mock_cooking(swh_vault): - swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) + swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["task_status"] == "new" assert info["ts_done"] is None - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "pending") - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "pending") + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["task_status"] == "pending" assert info["ts_done"] is None - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "done") - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["task_status"] == "done" assertTimestampAlmostNow(info["ts_done"]) def test_create_update_access_ts(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + info = swh_vault.progress(TEST_TYPE, TEST_OBJ_ID) access_ts_1 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_1) swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + info = swh_vault.progress(TEST_TYPE, TEST_OBJ_ID) access_ts_2 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_2) swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + info = swh_vault.progress(TEST_TYPE, TEST_OBJ_ID) + access_ts_3 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_3) assert access_ts_1 < access_ts_2 assert access_ts_2 < access_ts_3 -def test_cook_request_idempotent(swh_vault): +def test_cook_idempotent(swh_vault, sample_data): with mock_cooking(swh_vault): - info1 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) - info2 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) - info3 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) + info1 = swh_vault.cook(TEST_TYPE, TEST_HEX_ID) + info2 = swh_vault.cook(TEST_TYPE, TEST_HEX_ID) + info3 = swh_vault.cook(TEST_TYPE, TEST_HEX_ID) assert info1 == info2 assert info1 == info3 def test_cook_email_pending_done(swh_vault): with mock_cooking(swh_vault), patch.object( swh_vault, "add_notif_email" ) as madd, patch.object(swh_vault, "send_notification") as msend: - swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) + swh_vault.cook(TEST_TYPE, TEST_HEX_ID) madd.assert_not_called() msend.assert_not_called() madd.reset_mock() msend.reset_mock() - swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) + swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email=TEST_EMAIL) madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) msend.assert_not_called() madd.reset_mock() msend.reset_mock() - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "done") - swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) - msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_OBJ_ID, "done") + swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") + swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email=TEST_EMAIL) + msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_HEX_ID, "done") madd.assert_not_called() def test_send_all_emails(swh_vault): with mock_cooking(swh_vault): emails = ("a@example.com", "billg@example.com", "test+42@example.org") for email in emails: - swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email=email) + swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email=email) - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "done") + swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") with patch.object(swh_vault, "smtp_server") as m: - swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) + swh_vault.send_notif(TEST_TYPE, TEST_HEX_ID) sent_emails = {k[0][0] for k in m.send_message.call_args_list} assert {k["To"] for k in sent_emails} == set(emails) for e in sent_emails: assert "bot@softwareheritage.org" in e["From"] assert TEST_TYPE in e["Subject"] assert TEST_HEX_ID[:5] in e["Subject"] assert TEST_TYPE in str(e) assert "https://archive.softwareheritage.org/" in str(e) assert TEST_HEX_ID[:5] in str(e) assert "--\x20\n" in str(e) # Well-formated signature!!! # Check that the entries have been deleted and recalling the # function does not re-send the e-mails m.reset_mock() - swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) + swh_vault.send_notif(TEST_TYPE, TEST_HEX_ID) m.assert_not_called() def test_available(swh_vault): - assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) + assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) with mock_cooking(swh_vault): - swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) - assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) + swh_vault.create_task(TEST_TYPE, TEST_HEX_ID) + assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) - swh_vault.cache.add(TEST_TYPE, TEST_OBJ_ID, b"content") - assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) + swh_vault.cache.add(TEST_TYPE, TEST_HEX_ID, b"content") + assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "done") - assert swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) + swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "done") + assert swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) def test_fetch(swh_vault): - assert swh_vault.fetch(TEST_TYPE, TEST_OBJ_ID) is None + assert swh_vault.fetch(TEST_TYPE, TEST_HEX_ID, raise_notfound=False) is None + + with pytest.raises( + NotFoundExc, match=f"{TEST_TYPE} {TEST_HEX_ID} is not available." + ): + swh_vault.fetch(TEST_TYPE, TEST_HEX_ID) + obj_id, content = fake_cook(swh_vault, TEST_TYPE, b"content") - info = swh_vault.task_info(TEST_TYPE, obj_id) + info = swh_vault.progress(TEST_TYPE, obj_id) access_ts_before = info["ts_last_access"] assert swh_vault.fetch(TEST_TYPE, obj_id) == b"content" - info = swh_vault.task_info(TEST_TYPE, obj_id) + info = swh_vault.progress(TEST_TYPE, obj_id) access_ts_after = info["ts_last_access"] assertTimestampAlmostNow(access_ts_after) assert access_ts_before < access_ts_after def test_cache_expire_oldest(swh_vault): r = range(1, 10) inserted = {} for i in r: sticky = i == 5 content = b"content%s" % str(i).encode() obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) swh_vault.cache_expire_oldest(n=4) should_be_still_here = {2, 3, 5, 8, 9} for i in r: assert swh_vault.is_available(TEST_TYPE, inserted[i][0]) == ( i in should_be_still_here ) def test_cache_expire_until(swh_vault): r = range(1, 10) inserted = {} for i in r: sticky = i == 5 content = b"content%s" % str(i).encode() obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) if i == 7: cutoff_date = datetime.datetime.now() swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) swh_vault.cache_expire_until(date=cutoff_date) should_be_still_here = {2, 3, 5, 8, 9} for i in r: assert swh_vault.is_available(TEST_TYPE, inserted[i][0]) == ( i in should_be_still_here ) def test_fail_cook_simple(swh_vault): - fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, "error42") - assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + fail_cook(swh_vault, TEST_TYPE, TEST_HEX_ID, "error42") + assert not swh_vault.is_available(TEST_TYPE, TEST_HEX_ID) + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["progress_msg"] == "error42" def test_send_failure_email(swh_vault): with mock_cooking(swh_vault): - swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email="a@example.com") + swh_vault.cook(TEST_TYPE, TEST_HEX_ID, email="a@example.com") - swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "failed") - swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, "test error") + swh_vault.set_status(TEST_TYPE, TEST_HEX_ID, "failed") + swh_vault.set_progress(TEST_TYPE, TEST_HEX_ID, "test error") with patch.object(swh_vault, "smtp_server") as m: - swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) + swh_vault.send_notif(TEST_TYPE, TEST_HEX_ID) e = [k[0][0] for k in m.send_message.call_args_list][0] assert e["To"] == "a@example.com" assert "bot@softwareheritage.org" in e["From"] assert TEST_TYPE in e["Subject"] assert TEST_HEX_ID[:5] in e["Subject"] assert "fail" in e["Subject"] assert TEST_TYPE in str(e) assert TEST_HEX_ID[:5] in str(e) assert "test error" in str(e) assert "--\x20\n" in str(e) # Well-formated signature def test_retry_failed_bundle(swh_vault): - fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, "error42") - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + fail_cook(swh_vault, TEST_TYPE, TEST_HEX_ID, "error42") + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["task_status"] == "failed" with mock_cooking(swh_vault): - swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) - info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) + swh_vault.cook(TEST_TYPE, TEST_HEX_ID) + info = swh_vault.progress(TEST_TYPE, TEST_HEX_ID) assert info["task_status"] == "new" diff --git a/swh/vault/tests/test_server.py b/swh/vault/tests/test_server.py index 8aba030..98af6e9 100644 --- a/swh/vault/tests/test_server.py +++ b/swh/vault/tests/test_server.py @@ -1,91 +1,161 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy +import os +from typing import Any, Dict import pytest +import yaml -from swh.core.api.serializers import msgpack_dumps, msgpack_loads -from swh.vault.api.server import check_config, make_app +from swh.core.api.serializers import json_dumps, msgpack_dumps, msgpack_loads +from swh.vault.api.server import ( + VaultServerApp, + check_config, + make_app, + make_app_from_configfile, +) +from swh.vault.tests.test_backend import TEST_HEX_ID + + +def test_make_app_from_file_missing(): + with pytest.raises(ValueError, match="Missing configuration path."): + make_app_from_configfile() + + +def test_make_app_from_file_does_not_exist(tmp_path): + conf_path = os.path.join(str(tmp_path), "vault-server.yml") + assert os.path.exists(conf_path) is False + + with pytest.raises( + ValueError, match=f"Configuration path {conf_path} should exist." + ): + make_app_from_configfile(conf_path) + + +def test_make_app_from_env_variable(swh_vault_config_file): + """Instantiation of the server should happen once (through environment variable) + + """ + app0 = make_app_from_configfile() + assert app0 is not None + app1 = make_app_from_configfile() + assert app1 == app0 + + +def test_make_app_from_file(swh_local_vault_config, tmp_path): + """Instantiation of the server should happen once (through environment variable) + + """ + conf_path = os.path.join(str(tmp_path), "vault-server.yml") + with open(conf_path, "w") as f: + f.write(yaml.dump(swh_local_vault_config)) + + app0 = make_app_from_configfile(conf_path) + assert app0 is not None + app1 = make_app_from_configfile(conf_path) + assert app1 == app0 + + +@pytest.fixture +def async_app(swh_local_vault_config: Dict[str, Any],) -> VaultServerApp: + """Instantiate the vault server application. + + Note: This requires the db setup to run (fixture swh_vault in charge of this) + + """ + return make_app(swh_local_vault_config) @pytest.fixture -def client(swh_vault, loop, aiohttp_client): - app = make_app(backend=swh_vault) - return loop.run_until_complete(aiohttp_client(app)) +def cli(async_app, aiohttp_client, loop): + return loop.run_until_complete(aiohttp_client(async_app)) -async def test_index(client): - resp = await client.get("/") +async def test_client_index(cli): + resp = await cli.get("/") assert resp.status == 200 -async def test_cook_notfound(client): - resp = await client.post("/cook/directory/000000") +async def test_client_cook_notfound(cli): + resp = await cli.post( + "/cook", + data=json_dumps({"obj_type": "directory", "obj_id": TEST_HEX_ID}), + headers=[("Content-Type", "application/json")], + ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["exception"]["type"] == "NotFoundExc" - assert content["exception"]["args"] == ["Object 000000 was not found."] + assert content["exception"]["args"] == [f"directory {TEST_HEX_ID} was not found."] -async def test_progress_notfound(client): - resp = await client.get("/progress/directory/000000") +async def test_client_progress_notfound(cli): + resp = await cli.post( + "/progress", + data=json_dumps({"obj_type": "directory", "obj_id": TEST_HEX_ID}), + headers=[("Content-Type", "application/json")], + ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["exception"]["type"] == "NotFoundExc" - assert content["exception"]["args"] == ["directory 000000 was not found."] + assert content["exception"]["args"] == [f"directory {TEST_HEX_ID} was not found."] -async def test_batch_cook_invalid_type(client): - data = msgpack_dumps([("foobar", [])]) - resp = await client.post( - "/batch_cook", data=data, headers={"Content-Type": "application/x-msgpack"} +async def test_client_batch_cook_invalid_type(cli): + resp = await cli.post( + "/batch_cook", + data=msgpack_dumps({"batch": [("foobar", [])]}), + headers={"Content-Type": "application/x-msgpack"}, ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["exception"]["type"] == "NotFoundExc" assert content["exception"]["args"] == ["foobar is an unknown type."] -async def test_batch_progress_notfound(client): - resp = await client.get("/batch_progress/1") +async def test_client_batch_progress_notfound(cli): + resp = await cli.post( + "/batch_progress", + data=msgpack_dumps({"batch_id": 1}), + headers={"Content-Type": "application/x-msgpack"}, + ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["exception"]["type"] == "NotFoundExc" assert content["exception"]["args"] == ["Batch 1 does not exist."] def test_check_config_missing_vault_configuration() -> None: """Irrelevant configuration file path raises""" with pytest.raises(ValueError, match="missing 'vault' configuration"): check_config({}) def test_check_config_not_local() -> None: """Wrong configuration raises""" expected_error = ( "The vault backend can only be started with a 'local' configuration" ) with pytest.raises(EnvironmentError, match=expected_error): check_config({"vault": {"cls": "remote"}}) @pytest.mark.parametrize("missing_key", ["storage", "cache", "scheduler"]) def test_check_config_missing_key(missing_key, swh_vault_config) -> None: """Any other configuration than 'local' (the default) is rejected""" config_ok = {"vault": {"cls": "local", "args": swh_vault_config}} config_ko = copy.deepcopy(config_ok) config_ko["vault"]["args"].pop(missing_key, None) expected_error = f"invalid configuration: missing {missing_key} config entry" with pytest.raises(ValueError, match=expected_error): check_config(config_ko) @pytest.mark.parametrize("missing_key", ["storage", "cache", "scheduler"]) def test_check_config_ok(missing_key, swh_vault_config) -> None: """Any other configuration than 'local' (the default) is rejected""" config_ok = {"vault": {"cls": "local", "args": swh_vault_config}} assert check_config(config_ok) is not None