diff --git a/PKG-INFO b/PKG-INFO index 11c8383..051f96a 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,38 +1,38 @@ Metadata-Version: 2.1 Name: swh.vault -Version: 1.7.0 +Version: 1.7.1 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-vault Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-vault/ Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/x-rst Provides-Extra: testing Provides-Extra: graph License-File: LICENSE License-File: AUTHORS Software Heritage - Vault ========================= User-facing service that allows to retrieve parts of the archive as self-contained bundles (e.g., individual releases, entire repository snapshots, etc.) The creation of a bundle is called "cooking" a bundle. Architecture ------------ The vault is made of two main parts: 1. a stateful RPC server called the **backend** 2. Celery tasks, called **cookers** diff --git a/swh.vault.egg-info/PKG-INFO b/swh.vault.egg-info/PKG-INFO index 11c8383..051f96a 100644 --- a/swh.vault.egg-info/PKG-INFO +++ b/swh.vault.egg-info/PKG-INFO @@ -1,38 +1,38 @@ Metadata-Version: 2.1 Name: swh.vault -Version: 1.7.0 +Version: 1.7.1 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-vault Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-vault/ Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/x-rst Provides-Extra: testing Provides-Extra: graph License-File: LICENSE License-File: AUTHORS Software Heritage - Vault ========================= User-facing service that allows to retrieve parts of the archive as self-contained bundles (e.g., individual releases, entire repository snapshots, etc.) The creation of a bundle is called "cooking" a bundle. Architecture ------------ The vault is made of two main parts: 1. a stateful RPC server called the **backend** 2. Celery tasks, called **cookers** diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py index 91584c8..5b58d61 100644 --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -1,120 +1,121 @@ # Copyright (C) 2016-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations import os from typing import Any, Dict, Optional from swh.core.api import RPCServerApp from swh.core.api import encode_data_server as encode_data from swh.core.api import error_handler from swh.core.config import config_basepath, merge_configs, read_raw_config from swh.vault import get_vault as get_swhvault from swh.vault.backend import NotFoundExc from swh.vault.interface import VaultInterface from .serializers import DECODERS, ENCODERS # do not define default services here DEFAULT_CONFIG = { "client_max_size": 1024**3, } def get_vault(): global vault if not vault: vault = get_swhvault(**app.config["vault"]) return vault class VaultServerApp(RPCServerApp): extra_type_decoders = DECODERS extra_type_encoders = ENCODERS vault = None app = VaultServerApp( __name__, backend_class=VaultInterface, backend_factory=get_vault, ) @app.errorhandler(NotFoundExc) def argument_error_handler(exception): return error_handler(exception, encode_data, status_code=400) @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) @app.route("/") def index(): return "SWH Vault API server" def check_config(cfg: Dict[str, Any]) -> Dict[str, Any]: - """Ensure the configuration is ok to run a local vault server, and propagate defaults. + """Ensure the configuration is ok to run a postgresql vault server, and propagate + defaults. Raises: - EnvironmentError if the configuration is not for local instance + EnvironmentError if the configuration is not for postgresql instance ValueError if one of the following keys is missing: vault, cache, storage, scheduler Returns: - New configuration dict to instantiate a local vault server instance. + New configuration dict to instantiate a postgresql vault server instance. """ cfg = cfg.copy() if "vault" not in cfg: raise ValueError("missing 'vault' configuration") vcfg = cfg["vault"] - if vcfg["cls"] != "local": + if vcfg["cls"] not in ("local", "postgresql"): raise EnvironmentError( - "The vault backend can only be started with a 'local' configuration", + "The vault backend can only be started with a 'postgresql' configuration", ) # TODO: Soft-deprecation of args key. Remove when ready. vcfg.update(vcfg.get("args", {})) # Default to top-level value if any vcfg = {**cfg, **vcfg} for key in ("cache", "storage", "scheduler"): if not vcfg.get(key): raise ValueError(f"invalid configuration: missing {key} config entry.") return vcfg def make_app_from_configfile( config_path: Optional[str] = None, **kwargs ) -> VaultServerApp: """Load and check configuration if ok, then instantiate (once) a vault server application. """ config_path = os.environ.get("SWH_CONFIG_FILENAME", config_path) if not config_path: raise ValueError("Missing configuration path.") if not os.path.isfile(config_path): raise ValueError(f"Configuration path {config_path} should exist.") app_config = read_raw_config(config_basepath(config_path)) app_config["vault"] = check_config(app_config) app.config.update(merge_configs(DEFAULT_CONFIG, app_config)) return app if __name__ == "__main__": print("Deprecated. Use swh-vault ") diff --git a/swh/vault/backend.py b/swh/vault/backend.py index c75bd00..b7d9bf3 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,531 +1,536 @@ -# Copyright (C) 2017-2020 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections from email.mime.text import MIMEText import smtplib from typing import Any, Dict, List, Optional, Tuple import psycopg2.extras import psycopg2.pool from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.model.swhids import CoreSWHID from swh.scheduler import get_scheduler from swh.scheduler.utils import create_oneshot_task_dict from swh.storage import get_storage from swh.vault.cache import VaultCache from swh.vault.cookers import COOKER_TYPES, get_cooker_cls from swh.vault.exc import NotFoundExc cooking_task_name = "swh.vault.cooking_tasks.SWHCookingTask" NOTIF_EMAIL_FROM = '"Software Heritage Vault" ' "" NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {bundle_type} {short_id}" NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {bundle_type} {short_id}" NOTIF_EMAIL_BODY_SUCCESS = """ You have requested the following bundle from the Software Heritage Vault: Bundle Type: {bundle_type} Object SWHID: {swhid} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ NOTIF_EMAIL_BODY_FAILURE = """ You have requested the following bundle from the Software Heritage Vault: Bundle Type: {bundle_type} Object SWHID: {swhid} This bundle could not be cooked for the following reason: {progress_msg} We apologize for the inconvenience. --\x20 The Software Heritage Developers """ class VaultBackend: """ Backend for the Software Heritage Vault. """ current_version = 4 def __init__(self, **config): self.config = config self.cache = VaultCache(**config["cache"]) self.scheduler = get_scheduler(**config["scheduler"]) self.storage = get_storage(**config["storage"]) self.smtp_server = smtplib.SMTP(**config.get("smtp", {})) + if "db" not in self.config: + raise ValueError( + "The 'db' configuration entry is missing " + "in the vault configuration file" + ) db_conn = config["db"] self._pool = psycopg2.pool.ThreadedConnectionPool( config.get("min_pool_conns", 1), config.get("max_pool_conns", 10), db_conn, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() @db_transaction() def progress( self, bundle_type: str, swhid: CoreSWHID, raise_notfound: bool = True, db=None, cur=None, ) -> Optional[Dict[str, Any]]: cur.execute( """ SELECT id, type, swhid, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND swhid = %s""", (bundle_type, str(swhid)), ) res = cur.fetchone() if not res: if raise_notfound: raise NotFoundExc(f"{bundle_type} {swhid} was not found.") return None res["swhid"] = CoreSWHID.from_string(res["swhid"]) return res def _send_task(self, bundle_type: str, swhid: CoreSWHID): """Send a cooking task to the celery scheduler""" task = create_oneshot_task_dict("cook-vault-bundle", bundle_type, str(swhid)) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]["id"] @db_transaction() def create_task( self, bundle_type: str, swhid: CoreSWHID, sticky: bool = False, db=None, cur=None, ): """Create and send a cooking task""" cooker_class = get_cooker_cls(bundle_type, swhid.object_type) cooker = cooker_class(swhid, backend=self, storage=self.storage) if not cooker.check_exists(): raise NotFoundExc(f"{bundle_type} {swhid} was not found.") cur.execute( """ INSERT INTO vault_bundle (type, swhid, sticky) VALUES (%s, %s, %s)""", (bundle_type, str(swhid), sticky), ) db.conn.commit() task_id = self._send_task(bundle_type, swhid) cur.execute( """ UPDATE vault_bundle SET task_id = %s WHERE type = %s AND swhid = %s""", (task_id, bundle_type, str(swhid)), ) @db_transaction() def add_notif_email( self, bundle_type: str, swhid: CoreSWHID, email: str, db=None, cur=None ): """Add an e-mail address to notify when a given bundle is ready""" cur.execute( """ INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND swhid = %s))""", (email, bundle_type, str(swhid)), ) def put_bundle(self, bundle_type: str, swhid: CoreSWHID, bundle) -> bool: self.cache.add(bundle_type, swhid, bundle) return True @db_transaction() def cook( self, bundle_type: str, swhid: CoreSWHID, *, sticky: bool = False, email: Optional[str] = None, db=None, cur=None, ) -> Dict[str, Any]: info = self.progress(bundle_type, swhid, raise_notfound=False) if bundle_type not in COOKER_TYPES: raise NotFoundExc(f"{bundle_type} is an unknown type.") # If there's a failed bundle entry, delete it first. if info is not None and info["task_status"] == "failed": cur.execute( "DELETE FROM vault_bundle WHERE type = %s AND swhid = %s", (bundle_type, str(swhid)), ) db.conn.commit() info = None # If there's no bundle entry, create the task. if info is None: self.create_task(bundle_type, swhid, sticky) if email is not None: # If the task is already done, send the email directly if info is not None and info["task_status"] == "done": self.send_notification( None, email, bundle_type, swhid, info["task_status"] ) # Else, add it to the notification queue else: self.add_notif_email(bundle_type, swhid, email) return self.progress(bundle_type, swhid) @db_transaction() def batch_cook( self, batch: List[Tuple[str, str]], db=None, cur=None ) -> Dict[str, int]: # Import execute_values at runtime only, because it requires # psycopg2 >= 2.7 (only available on postgresql servers) from psycopg2.extras import execute_values for bundle_type, _ in batch: if bundle_type not in COOKER_TYPES: raise NotFoundExc(f"{bundle_type} is an unknown type.") cur.execute( """ INSERT INTO vault_batch (id) VALUES (DEFAULT) RETURNING id""" ) batch_id = cur.fetchone()["id"] # Delete all failed bundles from the batch cur.execute( """ DELETE FROM vault_bundle WHERE task_status = 'failed' AND (type, swhid) IN %s""", (tuple(batch),), ) # Insert all the bundles, return the new ones execute_values( cur, """ INSERT INTO vault_bundle (type, swhid) VALUES %s ON CONFLICT DO NOTHING""", batch, ) # Get the bundle ids and task status cur.execute( """ SELECT id, type, swhid, task_id FROM vault_bundle WHERE (type, swhid) IN %s""", (tuple(batch),), ) bundles = cur.fetchall() # Insert the batch-bundle entries batch_id_bundle_ids = [(batch_id, row["id"]) for row in bundles] execute_values( cur, """ INSERT INTO vault_batch_bundle (batch_id, bundle_id) VALUES %s ON CONFLICT DO NOTHING""", batch_id_bundle_ids, ) db.conn.commit() # Get the tasks to fetch batch_new = [ (row["type"], CoreSWHID.from_string(row["swhid"])) for row in bundles if row["task_id"] is None ] # Send the tasks args_batch = [(bundle_type, swhid) for bundle_type, swhid in batch_new] # TODO: change once the scheduler handles priority tasks tasks = [ create_oneshot_task_dict("swh-vault-batch-cooking", *args) for args in args_batch ] added_tasks = self.scheduler.create_tasks(tasks) tasks_ids_bundle_ids = [ (task_id, bundle_type, swhid) for task_id, (bundle_type, swhid) in zip( [task["id"] for task in added_tasks], batch_new ) ] # Update the task ids execute_values( cur, """ UPDATE vault_bundle SET task_id = s_task_id FROM (VALUES %s) AS sub (s_task_id, s_type, s_swhid) WHERE type = s_type::cook_type AND swhid = s_swhid """, tasks_ids_bundle_ids, ) return {"id": batch_id} @db_transaction() def batch_progress(self, batch_id: int, db=None, cur=None) -> Dict[str, Any]: cur.execute( """ SELECT vault_bundle.id as id, type, swhid, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_batch_bundle LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id WHERE batch_id = %s""", (batch_id,), ) bundles = cur.fetchall() if not bundles: raise NotFoundExc(f"Batch {batch_id} does not exist.") for bundle in bundles: bundle["swhid"] = CoreSWHID.from_string(bundle["swhid"]) counter = collections.Counter(b["status"] for b in bundles) res = { "bundles": bundles, "total": len(bundles), **{k: 0 for k in ("new", "pending", "done", "failed")}, **dict(counter), } return res @db_transaction() def is_available(self, bundle_type: str, swhid: CoreSWHID, db=None, cur=None): """Check whether a bundle is available for retrieval""" info = self.progress(bundle_type, swhid, raise_notfound=False, cur=cur) return ( info is not None and info["task_status"] == "done" and self.cache.is_cached(bundle_type, swhid) ) @db_transaction() def fetch( self, bundle_type: str, swhid: CoreSWHID, raise_notfound=True, db=None, cur=None ) -> Optional[bytes]: """Retrieve a bundle from the cache""" available = self.is_available(bundle_type, swhid, cur=cur) if not available: if raise_notfound: raise NotFoundExc(f"{bundle_type} {swhid} is not available.") return None self.update_access_ts(bundle_type, swhid, cur=cur) return self.cache.get(bundle_type, swhid) @db_transaction() def update_access_ts(self, bundle_type: str, swhid: CoreSWHID, db=None, cur=None): """Update the last access timestamp of a bundle""" cur.execute( """ UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND swhid = %s""", (bundle_type, str(swhid)), ) @db_transaction() def set_status( self, bundle_type: str, swhid: CoreSWHID, status: str, db=None, cur=None ) -> bool: req = ( """ UPDATE vault_bundle SET task_status = %s """ + (""", ts_done = NOW() """ if status == "done" else "") + """WHERE type = %s AND swhid = %s""" ) cur.execute(req, (status, bundle_type, str(swhid))) return True @db_transaction() def set_progress( self, bundle_type: str, swhid: CoreSWHID, progress: str, db=None, cur=None ) -> bool: cur.execute( """ UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND swhid = %s""", (progress, bundle_type, str(swhid)), ) return True @db_transaction() def send_notif(self, bundle_type: str, swhid: CoreSWHID, db=None, cur=None) -> bool: cur.execute( """ SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.swhid = %s""", (bundle_type, str(swhid)), ) for d in cur: self.send_notification( d["id"], d["email"], bundle_type, swhid, status=d["task_status"], progress_msg=d["progress_msg"], ) return True @db_transaction() def send_notification( self, n_id: Optional[int], email: str, bundle_type: str, swhid: CoreSWHID, status: str, progress_msg: Optional[str] = None, db=None, cur=None, ) -> None: """Send the notification of a bundle to a specific e-mail""" short_id = swhid.object_id.hex()[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = "https://archive.softwareheritage.org/api/1/vault/{}/{}/" "raw".format( bundle_type, swhid ) if status == "done": text = NOTIF_EMAIL_BODY_SUCCESS.strip() text = text.format(bundle_type=bundle_type, swhid=swhid, url=url) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_SUCCESS.format( bundle_type=bundle_type, short_id=short_id ) elif status == "failed": text = NOTIF_EMAIL_BODY_FAILURE.strip() text = text.format( bundle_type=bundle_type, swhid=swhid, progress_msg=progress_msg ) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_FAILURE.format( bundle_type=bundle_type, short_id=short_id ) else: raise RuntimeError( "send_notification called on a '{}' bundle".format(status) ) msg["From"] = NOTIF_EMAIL_FROM msg["To"] = email self._smtp_send(msg) if n_id is not None: cur.execute( """ DELETE FROM vault_notif_email WHERE id = %s""", (n_id,), ) def _smtp_send(self, msg: MIMEText): # Reconnect if needed try: status = self.smtp_server.noop()[0] except smtplib.SMTPException: status = -1 if status != 250: self.smtp_server.connect("localhost", 25) # Send the message self.smtp_server.send_message(msg) @db_transaction() def _cache_expire(self, cond, *args, db=None, cur=None) -> None: """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cur.execute( """ DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, swhid """.format( cond ), args, ) for d in cur: self.cache.delete(d["type"], CoreSWHID.from_string(d["swhid"])) @db_transaction() def cache_expire_oldest(self, n=1, by="last_access", db=None, cur=None) -> None: """Expire the `n` oldest bundles""" assert by in ("created", "done", "last_access") filter = """ORDER BY ts_{} LIMIT {}""".format(by, n) return self._cache_expire(filter) @db_transaction() def cache_expire_until(self, date, by="last_access", db=None, cur=None) -> None: """Expire all the bundles until a certain date""" assert by in ("created", "done", "last_access") filter = """AND ts_{} <= %s""".format(by) return self._cache_expire(filter, date) diff --git a/swh/vault/tests/test_cli.py b/swh/vault/tests/test_cli.py index ac6864e..60f18a6 100644 --- a/swh/vault/tests/test_cli.py +++ b/swh/vault/tests/test_cli.py @@ -1,106 +1,160 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile from unittest.mock import MagicMock import click import click.testing import pytest +from swh.core.cli.db import db as swhdb +from swh.core.db import BaseDb +from swh.core.db.db_utils import swh_db_module, swh_db_version +from swh.core.db.tests.test_cli import craft_conninfo from swh.model.swhids import CoreSWHID +from swh.vault.backend import VaultBackend from swh.vault.cli import vault as vault_cli_group from swh.vault.cookers.base import BaseVaultCooker from swh.vault.in_memory_backend import InMemoryVaultBackend def test_cook_unsupported_swhid(): runner = click.testing.CliRunner() result = runner.invoke(vault_cli_group, ["cook", "swh:1:dir:f00b4r", "-"]) assert isinstance(result.exception, SystemExit) assert "expected core SWHID" in result.stdout result = runner.invoke(vault_cli_group, ["cook", "swh:1:ori:" + "0" * 40, "-"]) assert isinstance(result.exception, SystemExit) assert "expected core SWHID" in result.stdout def test_cook_unknown_cooker(): runner = click.testing.CliRunner() result = runner.invoke( vault_cli_group, ["cook", "swh:1:dir:" + "0" * 40, "-", "--bundle-type", "gitfast"], ) assert isinstance(result.exception, SystemExit) assert "do not have a gitfast cooker" in result.stdout result = runner.invoke(vault_cli_group, ["cook", "swh:1:rev:" + "0" * 40, "-"]) assert isinstance(result.exception, SystemExit) assert "use --bundle-type" in result.stdout @pytest.mark.parametrize( "bundle_type,cooker_name_suffix,swhid_type", [ ("directory", "", "dir"), ("revision", "gitfast", "rev"), ], ) def test_cook_directory(bundle_type, cooker_name_suffix, swhid_type, mocker): storage = object() mocker.patch("swh.storage.get_storage", return_value=storage) backend = MagicMock(spec=InMemoryVaultBackend) backend.fetch.return_value = b"bundle content" mocker.patch( "swh.vault.in_memory_backend.InMemoryVaultBackend", return_value=backend ) cooker = MagicMock(spec=BaseVaultCooker) cooker_cls = MagicMock(return_value=cooker) mocker.patch("swh.vault.cookers.get_cooker_cls", return_value=cooker_cls) runner = click.testing.CliRunner() swhid = CoreSWHID.from_string(f"swh:1:{swhid_type}:{'0'*40}") with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: config_fd.write('{"storage": {}}') config_fd.seek(0) if cooker_name_suffix: result = runner.invoke( vault_cli_group, [ "cook", f"swh:1:{swhid_type}:{'0'*40}", "-", "-C", config_fd.name, "--bundle-type", cooker_name_suffix, ], ) else: result = runner.invoke( vault_cli_group, ["cook", str(swhid), "-", "-C", config_fd.name], ) if result.exception is not None: raise result.exception cooker_cls.assert_called_once_with( swhid=swhid, backend=backend, storage=storage, graph=None, objstorage=None, max_bundle_size=None, ) cooker.cook.assert_called_once_with() assert result.stdout_bytes == b"bundle content" + + +def test_cli_swh_vault_db_create_and_init_db(postgresql, tmp_path): + """Test that 'swh db init vault' works""" + module_name = "vault" + conninfo = craft_conninfo(postgresql, "new-db") + + cfgfile = tmp_path / "config.yml" + CFG = f""" +vault: + cls: postgresql + db: {conninfo} + cache: + cls: memory + storage: + cls: memory + scheduler: + cls: remote + url: mock://scheduler + """ + cfgfile.write_text(CFG) + + cli_runner = click.testing.CliRunner() + # This creates the db and installs the necessary admin extensions + result = cli_runner.invoke(swhdb, ["create", module_name, "--dbname", conninfo]) + assert result.exit_code == 0, f"Unexpected output: {result.output}" + + result = cli_runner.invoke(swhdb, ["init-admin", module_name, "--dbname", conninfo]) + assert result.exit_code == 0, f"Unexpected output: {result.output}" + + # This initializes the schema and data + result = cli_runner.invoke(swhdb, ["-C", cfgfile, "init", module_name]) + assert result.exit_code == 0, f"Unexpected output: {result.output}" + + assert swh_db_module(conninfo) == "vault" + assert swh_db_version(conninfo) == VaultBackend.current_version + + with BaseDb.connect(conninfo).cursor() as cur: + cur.execute("select tablename from pg_tables where schemaname='public'") + tables = {table for table, in cur.fetchall()} + + assert tables == { + "dbmodule", + "dbversion", + "vault_bundle", + "vault_notif_email", + "vault_batch", + "vault_batch_bundle", + } diff --git a/swh/vault/tests/test_server.py b/swh/vault/tests/test_server.py index 184f426..1f67ccf 100644 --- a/swh/vault/tests/test_server.py +++ b/swh/vault/tests/test_server.py @@ -1,184 +1,187 @@ # Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import os from typing import Any, Dict import pytest import yaml from swh.core.api.serializers import json_dumps, msgpack_dumps, msgpack_loads from swh.vault.api.serializers import ENCODERS import swh.vault.api.server from swh.vault.api.server import app, check_config, get_vault, make_app_from_configfile from swh.vault.tests.test_backend import TEST_SWHID @pytest.fixture def swh_vault_server_config(swh_vault_config: Dict[str, Any]) -> Dict[str, Any]: """Returns a vault server configuration, with ``storage``, ``scheduler`` and ``cache`` set at the toplevel""" return { - "vault": {"cls": "local", "db": swh_vault_config["db"]}, + "vault": {"cls": "postgresql", "db": swh_vault_config["db"]}, "client_max_size": 1024**3, **{k: v for k, v in swh_vault_config.items() if k != "db"}, } @pytest.fixture def swh_vault_server_config_file(swh_vault_server_config, monkeypatch, tmp_path): """Creates a vault server configuration file and sets it into SWH_CONFIG_FILENAME""" conf_path = os.path.join(str(tmp_path), "vault-server.yml") with open(conf_path, "w") as f: f.write(yaml.dump(swh_vault_server_config)) monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) return conf_path def test_make_app_from_file_missing(): with pytest.raises(ValueError, match="Missing configuration path."): make_app_from_configfile() def test_make_app_from_file_does_not_exist(tmp_path): conf_path = os.path.join(str(tmp_path), "vault-server.yml") assert os.path.exists(conf_path) is False with pytest.raises( ValueError, match=f"Configuration path {conf_path} should exist." ): make_app_from_configfile(conf_path) def test_make_app_from_env_variable(swh_vault_server_config_file): """Server initialization happens through env variable when no path is provided""" app = make_app_from_configfile() assert app is not None assert get_vault() is not None # Cleanup app del app.config["vault"] swh.vault.api.server.vault = None def test_make_app_from_file(swh_vault_server_config, tmp_path): """Server initialization happens through path if provided""" conf_path = os.path.join(str(tmp_path), "vault-server.yml") with open(conf_path, "w") as f: f.write(yaml.dump(swh_vault_server_config)) app = make_app_from_configfile(conf_path) assert app is not None assert get_vault() is not None # Cleanup app del app.config["vault"] swh.vault.api.server.vault = None @pytest.fixture def vault_app(swh_vault_server_config_file): yield make_app_from_configfile() # Cleanup app del app.config["vault"] swh.vault.api.server.vault = None @pytest.fixture def cli(vault_app): cli = vault_app.test_client() return cli def test_client_index(cli): resp = cli.get("/") assert resp.status == "200 OK" def test_client_cook_notfound(cli): resp = cli.post( "/cook", data=json_dumps( {"bundle_type": "flat", "swhid": TEST_SWHID}, extra_encoders=ENCODERS ), headers=[("Content-Type", "application/json")], ) assert resp.status == "400 BAD REQUEST" content = msgpack_loads(resp.data) assert content["type"] == "NotFoundExc" assert content["args"] == [f"flat {TEST_SWHID} was not found."] def test_client_progress_notfound(cli): resp = cli.post( "/progress", data=json_dumps( {"bundle_type": "flat", "swhid": TEST_SWHID}, extra_encoders=ENCODERS ), headers=[("Content-Type", "application/json")], ) assert resp.status == "400 BAD REQUEST" content = msgpack_loads(resp.data) assert content["type"] == "NotFoundExc" assert content["args"] == [f"flat {TEST_SWHID} was not found."] def test_client_batch_cook_invalid_type(cli): resp = cli.post( "/batch_cook", data=msgpack_dumps({"batch": [("foobar", [])]}), headers={"Content-Type": "application/x-msgpack"}, ) assert resp.status == "400 BAD REQUEST" content = msgpack_loads(resp.data) assert content["type"] == "NotFoundExc" assert content["args"] == ["foobar is an unknown type."] def test_client_batch_progress_notfound(cli): resp = cli.post( "/batch_progress", data=msgpack_dumps({"batch_id": 1}), headers={"Content-Type": "application/x-msgpack"}, ) assert resp.status == "400 BAD REQUEST" content = msgpack_loads(resp.data) assert content["type"] == "NotFoundExc" assert content["args"] == ["Batch 1 does not exist."] def test_check_config_missing_vault_configuration() -> None: """Irrelevant configuration file path raises""" with pytest.raises(ValueError, match="missing 'vault' configuration"): check_config({}) def test_check_config_not_local() -> None: """Wrong configuration raises""" expected_error = ( - "The vault backend can only be started with a 'local' configuration" + "The vault backend can only be started with a 'postgresql' configuration" ) with pytest.raises(EnvironmentError, match=expected_error): check_config({"vault": {"cls": "remote"}}) -def test_check_config_ok(swh_vault_server_config) -> None: +@pytest.mark.parametrize("clazz", ["local", "postgresql"]) +def test_check_config_ok(swh_vault_server_config, clazz) -> None: """Check that the default config is accepted""" + config = swh_vault_server_config.copy() + config["vault"]["cls"] = clazz assert check_config(swh_vault_server_config) is not None @pytest.mark.parametrize("missing_key", ["storage", "cache", "scheduler"]) def test_check_config_missing_key(missing_key, swh_vault_server_config) -> None: """Check that configs with a missing key get rejected""" config_ok = swh_vault_server_config config_ko = copy.deepcopy(config_ok) config_ko["vault"].pop(missing_key, None) config_ko.pop(missing_key, None) expected_error = f"invalid configuration: missing {missing_key} config entry" with pytest.raises(ValueError, match=expected_error): check_config(config_ko) diff --git a/swh/vault/tests/test_to_disk.py b/swh/vault/tests/test_to_disk.py index 21bcf67..71424e5 100644 --- a/swh/vault/tests/test_to_disk.py +++ b/swh/vault/tests/test_to_disk.py @@ -1,78 +1,184 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest -from swh.model.model import Content, SkippedContent -from swh.vault.to_disk import get_filtered_files_content +from swh.model.from_disk import DentryPerms +from swh.model.model import Content, Directory, DirectoryEntry, SkippedContent +from swh.vault.to_disk import DirectoryBuilder, get_filtered_files_content def test_get_filtered_files_content(swh_storage): content = Content.from_data(b"foo bar") skipped_content = SkippedContent( sha1=None, sha1_git=b"c" * 20, sha256=None, blake2s256=None, length=42, status="absent", reason="for some reason", ) swh_storage.content_add([content]) swh_storage.skipped_content_add([skipped_content]) files_data = [ { "status": "visible", "sha1": content.sha1, "sha1_git": content.sha1_git, "target": content.sha1_git, }, { "status": "absent", "target": skipped_content.sha1_git, }, ] res = list(get_filtered_files_content(swh_storage, files_data)) assert res == [ { "content": content.data, "status": "visible", "sha1": content.sha1, "sha1_git": content.sha1_git, "target": content.sha1_git, }, { "content": ( b"This content has not been retrieved in the " b"Software Heritage archive due to its size." ), "status": "absent", "target": skipped_content.sha1_git, }, ] def test_get_filtered_files_content__unknown_status(swh_storage): content = Content.from_data(b"foo bar") swh_storage.content_add([content]) files_data = [ { "status": "visible", "sha1": content.sha1, "sha1_git": content.sha1_git, "target": content.sha1_git, }, { - "status": None, + "status": "blah", "target": b"c" * 20, }, ] - with pytest.raises(AssertionError, match="unexpected status None"): + with pytest.raises(AssertionError, match="unexpected status 'blah'"): list(get_filtered_files_content(swh_storage, files_data)) + + +def _fill_storage(swh_storage, exclude_cnt3=False, exclude_dir1=False): + cnt1 = Content.from_data(b"foo bar") + cnt2 = Content.from_data(b"bar baz") + cnt3 = Content.from_data(b"baz qux") + dir1 = Directory( + entries=( + DirectoryEntry( + name=b"content1", + type="file", + target=cnt1.sha1_git, + perms=DentryPerms.content, + ), + DirectoryEntry( + name=b"content2", + type="file", + target=cnt2.sha1_git, + perms=DentryPerms.content, + ), + ) + ) + dir2 = Directory( + entries=( + DirectoryEntry( + name=b"content3", + type="file", + target=cnt3.sha1_git, + perms=DentryPerms.content, + ), + DirectoryEntry( + name=b"subdirectory", + type="dir", + target=dir1.id, + perms=DentryPerms.directory, + ), + ) + ) + if exclude_cnt3: + swh_storage.content_add([cnt1, cnt2]) + else: + swh_storage.content_add([cnt1, cnt2, cnt3]) + if exclude_dir1: + swh_storage.directory_add([dir2]) + else: + swh_storage.directory_add([dir1, dir2]) + + return dir2 + + +def test_directory_builder(swh_storage, tmp_path): + dir2 = _fill_storage(swh_storage) + + root = tmp_path / "root" + builder = DirectoryBuilder(swh_storage, bytes(root), dir2.id) + + assert not root.exists() + + builder.build() + + assert root.is_dir() + assert set(root.glob("**/*")) == { + root / "subdirectory", + root / "subdirectory" / "content1", + root / "subdirectory" / "content2", + root / "content3", + } + + assert (root / "subdirectory" / "content1").open().read() == "foo bar" + assert (root / "subdirectory" / "content2").open().read() == "bar baz" + assert (root / "content3").open().read() == "baz qux" + + +def test_directory_builder_missing_content(swh_storage, tmp_path): + dir2 = _fill_storage(swh_storage, exclude_cnt3=True) + + root = tmp_path / "root" + builder = DirectoryBuilder(swh_storage, bytes(root), dir2.id) + + assert not root.exists() + + builder.build() + + assert root.is_dir() + + assert "This content is missing" in (root / "content3").open().read() + + +def test_directory_builder_missing_directory(swh_storage, tmp_path): + dir2 = _fill_storage(swh_storage, exclude_dir1=True) + + root = tmp_path / "root" + builder = DirectoryBuilder(swh_storage, bytes(root), dir2.id) + + assert not root.exists() + + builder.build() + + assert root.is_dir() + assert set(root.glob("**/*")) == { + root / "subdirectory", + root / "content3", + } + + assert (root / "content3").open().read() == "baz qux" diff --git a/swh/vault/to_disk.py b/swh/vault/to_disk.py index 105e85a..2721642 100644 --- a/swh/vault/to_disk.py +++ b/swh/vault/to_disk.py @@ -1,137 +1,146 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections import functools import os from typing import Any, Dict, Iterator, List from swh.model import hashutil from swh.model.from_disk import DentryPerms, mode_to_perms from swh.storage.algos.dir_iterators import dir_iterator from swh.storage.interface import StorageInterface +MISSING_MESSAGE = ( + b"This content is missing from the Software Heritage archive " + b"(or from the mirror used while retrieving it)." +) + SKIPPED_MESSAGE = ( b"This content has not been retrieved in the " b"Software Heritage archive due to its size." ) HIDDEN_MESSAGE = b"This content is hidden." def get_filtered_files_content( storage: StorageInterface, files_data: List[Dict] ) -> Iterator[Dict[str, Any]]: """Retrieve the files specified by files_data and apply filters for skipped and missing contents. Args: storage: the storage from which to retrieve the objects files_data: list of file entries as returned by directory_ls() Yields: The entries given in files_data with a new 'content' key that points to the file content in bytes. The contents can be replaced by a specific message to indicate that they could not be retrieved (either due to privacy policy or because their sizes were too big for us to archive it). """ for file_data in files_data: status = file_data["status"] - if status == "absent": - content = SKIPPED_MESSAGE - elif status == "hidden": - content = HIDDEN_MESSAGE - elif status == "visible": + if status == "visible": sha1 = file_data["sha1"] data = storage.content_get_data(sha1) if data is None: content = SKIPPED_MESSAGE else: content = data + elif status == "absent": + content = SKIPPED_MESSAGE + elif status == "hidden": + content = HIDDEN_MESSAGE + elif status is None: + content = MISSING_MESSAGE else: assert False, ( f"unexpected status {status!r} " f"for content {hashutil.hash_to_hex(file_data['target'])}" ) yield {"content": content, **file_data} def apply_chunked(func, input_list, chunk_size): """Apply func on input_list divided in chunks of size chunk_size""" for i in range(0, len(input_list), chunk_size): yield from func(input_list[i : i + chunk_size]) class DirectoryBuilder: """Reconstructs the on-disk representation of a directory in the storage.""" - def __init__(self, storage, root, dir_id): + def __init__(self, storage: StorageInterface, root: bytes, dir_id: bytes): """Initialize the directory builder. Args: storage: the storage object root: the path where the directory should be reconstructed dir_id: the identifier of the directory in the storage """ self.storage = storage self.root = root self.dir_id = dir_id - def build(self): + def build(self) -> None: """Perform the reconstruction of the directory in the given root.""" # Retrieve data from the database. # Split into files, revisions and directory data. entries = collections.defaultdict(list) for entry in dir_iterator(self.storage, self.dir_id): entries[entry["type"]].append(entry) # Recreate the directory's subtree and then the files into it. self._create_tree(entries["dir"]) self._create_files(entries["file"]) self._create_revisions(entries["rev"]) - def _create_tree(self, directories): + def _create_tree(self, directories: List[Dict[str, Any]]) -> None: """Create a directory tree from the given paths The tree is created from `root` and each given directory in `directories` will be created. """ # Directories are sorted by depth so they are created in the # right order bsep = os.path.sep.encode() directories = sorted(directories, key=lambda x: len(x["path"].split(bsep))) for dir in directories: os.makedirs(os.path.join(self.root, dir["path"])) - def _create_files(self, files_data): + def _create_files(self, files_data: List[Dict[str, Any]]) -> None: """Create the files in the tree and fetch their contents.""" f = functools.partial(get_filtered_files_content, self.storage) files_data = apply_chunked(f, files_data, 1000) for file_data in files_data: path = os.path.join(self.root, file_data["path"]) self._create_file(path, file_data["content"], file_data["perms"]) - def _create_revisions(self, revs_data): + def _create_revisions(self, revs_data: List[Dict[str, Any]]) -> None: """Create the revisions in the tree as broken symlinks to the target identifier.""" for file_data in revs_data: path = os.path.join(self.root, file_data["path"]) - target = hashutil.hash_to_hex(file_data["target"]) + target = hashutil.hash_to_bytehex(file_data["target"]) self._create_file(path, target, mode=DentryPerms.symlink) - def _create_file(self, path, content, mode=DentryPerms.content): + def _create_file( + self, path: bytes, content: bytes, mode: int = DentryPerms.content + ) -> None: """Create the given file and fill it with content.""" perms = mode_to_perms(mode) if perms == DentryPerms.symlink: os.symlink(content, path) else: with open(path, "wb") as f: f.write(content) os.chmod(path, perms.value)