diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3cc45b3..05398bb 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,49 +1,42 @@ repos: -- repo: https://github.com/pre-commit/pre-commit-hooks - rev: v2.4.0 - hooks: - - id: trailing-whitespace - - id: check-json - - id: check-yaml + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.1.0 + hooks: + - id: trailing-whitespace + - id: check-json + - id: check-yaml -- repo: https://gitlab.com/pycqa/flake8 - rev: 3.8.3 - hooks: - - id: flake8 + - repo: https://gitlab.com/pycqa/flake8 + rev: 4.0.1 + hooks: + - id: flake8 -- repo: https://github.com/codespell-project/codespell - rev: v1.16.0 - hooks: - - id: codespell + - repo: https://github.com/codespell-project/codespell + rev: v2.1.0 + hooks: + - id: codespell + name: Check source code spelling + stages: [commit] + - id: codespell + name: Check commit message spelling + stages: [commit-msg] -- repo: local - hooks: - - id: mypy - name: mypy - entry: mypy - args: [swh] - pass_filenames: false - language: system - types: [python] + - repo: local + hooks: + - id: mypy + name: mypy + entry: mypy + args: [swh] + pass_filenames: false + language: system + types: [python] -- repo: https://github.com/PyCQA/isort - rev: 5.5.2 - hooks: - - id: isort - -- repo: https://github.com/python/black - rev: 19.10b0 - hooks: - - id: black - -# unfortunately, we are far from being able to enable this... -# - repo: https://github.com/PyCQA/pydocstyle.git -# rev: 4.0.0 -# hooks: -# - id: pydocstyle -# name: pydocstyle -# description: pydocstyle is a static analysis tool for checking compliance with Python docstring conventions. -# entry: pydocstyle --convention=google -# language: python -# types: [python] + - repo: https://github.com/PyCQA/isort + rev: 5.10.1 + hooks: + - id: isort + - repo: https://github.com/python/black + rev: 19.10b0 + hooks: + - id: black diff --git a/PKG-INFO b/PKG-INFO index a5473b6..7cf58bb 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,42 +1,42 @@ Metadata-Version: 2.1 Name: swh.vault -Version: 1.4.2 +Version: 1.5.0 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-vault Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-vault/ Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/x-rst Provides-Extra: testing Provides-Extra: graph License-File: LICENSE License-File: AUTHORS Software Heritage - Vault ========================= User-facing service that allows to retrieve parts of the archive as self-contained bundles (e.g., individual releases, entire repository snapshots, etc.) The creation of a bundle is called "cooking" a bundle. Architecture ------------ The vault is made of two main parts: 1. a stateful RPC server called the **backend** 2. Celery tasks, called **cookers** diff --git a/requirements-swh.txt b/requirements-swh.txt index 9d34234..6d62ca0 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ -swh.core[db,http] >= 0.14.4 +swh.core[db,http] >= 2 swh.model >= 3.0.0 swh.objstorage >= 0.0.17 swh.scheduler >= 0.7.0 swh.storage >= 0.43.1 diff --git a/swh.vault.egg-info/PKG-INFO b/swh.vault.egg-info/PKG-INFO index a5473b6..7cf58bb 100644 --- a/swh.vault.egg-info/PKG-INFO +++ b/swh.vault.egg-info/PKG-INFO @@ -1,42 +1,42 @@ Metadata-Version: 2.1 Name: swh.vault -Version: 1.4.2 +Version: 1.5.0 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-vault Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-vault/ Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/x-rst Provides-Extra: testing Provides-Extra: graph License-File: LICENSE License-File: AUTHORS Software Heritage - Vault ========================= User-facing service that allows to retrieve parts of the archive as self-contained bundles (e.g., individual releases, entire repository snapshots, etc.) The creation of a bundle is called "cooking" a bundle. Architecture ------------ The vault is made of two main parts: 1. a stateful RPC server called the **backend** 2. Celery tasks, called **cookers** diff --git a/swh.vault.egg-info/SOURCES.txt b/swh.vault.egg-info/SOURCES.txt index 0185d5f..d48f9a9 100644 --- a/swh.vault.egg-info/SOURCES.txt +++ b/swh.vault.egg-info/SOURCES.txt @@ -1,75 +1,75 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile README.rst conftest.py mypy.ini pyproject.toml pytest.ini requirements-swh-graph.txt requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/README.rst docs/api.rst docs/cli.rst docs/conf.py docs/getting-started.rst docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder -sql/upgrades/002.sql -sql/upgrades/003.sql swh/__init__.py swh.vault.egg-info/PKG-INFO swh.vault.egg-info/SOURCES.txt swh.vault.egg-info/dependency_links.txt swh.vault.egg-info/entry_points.txt swh.vault.egg-info/not-zip-safe swh.vault.egg-info/requires.txt swh.vault.egg-info/top_level.txt swh/vault/__init__.py swh/vault/backend.py swh/vault/cache.py swh/vault/cli.py swh/vault/cooking_tasks.py swh/vault/exc.py swh/vault/in_memory_backend.py swh/vault/interface.py swh/vault/py.typed swh/vault/to_disk.py swh/vault/api/__init__.py swh/vault/api/client.py swh/vault/api/serializers.py swh/vault/api/server.py swh/vault/cookers/__init__.py swh/vault/cookers/base.py swh/vault/cookers/directory.py swh/vault/cookers/git_bare.py swh/vault/cookers/revision_flat.py swh/vault/cookers/revision_gitfast.py swh/vault/cookers/utils.py swh/vault/sql/30-schema.sql +swh/vault/sql/upgrades/002.sql +swh/vault/sql/upgrades/003.sql swh/vault/tests/__init__.py swh/vault/tests/conftest.py swh/vault/tests/test_backend.py swh/vault/tests/test_cache.py swh/vault/tests/test_cli.py swh/vault/tests/test_cookers.py swh/vault/tests/test_cookers_base.py swh/vault/tests/test_git_bare_cooker.py swh/vault/tests/test_init.py swh/vault/tests/test_init_cookers.py swh/vault/tests/test_server.py swh/vault/tests/test_to_disk.py swh/vault/tests/vault_testing.py \ No newline at end of file diff --git a/swh.vault.egg-info/entry_points.txt b/swh.vault.egg-info/entry_points.txt index 8f7ddf3..844f927 100644 --- a/swh.vault.egg-info/entry_points.txt +++ b/swh.vault.egg-info/entry_points.txt @@ -1,4 +1,2 @@ - - [swh.cli.subcommands] - vault=swh.vault.cli - \ No newline at end of file +[swh.cli.subcommands] +vault = swh.vault.cli diff --git a/swh.vault.egg-info/requires.txt b/swh.vault.egg-info/requires.txt index c9a8f15..55158dd 100644 --- a/swh.vault.egg-info/requires.txt +++ b/swh.vault.egg-info/requires.txt @@ -1,26 +1,26 @@ click flask psycopg2 python-dateutil fastimport typing-extensions -swh.core[db,http]>=0.14.4 +swh.core[db,http]>=2 swh.model>=3.0.0 swh.objstorage>=0.0.17 swh.scheduler>=0.7.0 swh.storage>=0.43.1 [graph] swh.graph>=0.3.2 [testing] pytest<7.0.0 dulwich>=0.18.7 swh.loader.core swh.loader.git>=0.8 swh.storage[testing] pytest-mock types-click types-python-dateutil types-pyyaml types-requests diff --git a/swh/vault/__init__.py b/swh/vault/__init__.py index 5a57fd7..425a5b2 100644 --- a/swh/vault/__init__.py +++ b/swh/vault/__init__.py @@ -1,55 +1,60 @@ -# Copyright (C) 2018-2020 The Software Heritage developers +# Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations import importlib import logging from typing import Dict import warnings logger = logging.getLogger(__name__) BACKEND_TYPES: Dict[str, str] = { "remote": ".api.client.RemoteVaultClient", - "local": ".backend.VaultBackend", + "postgresql": ".backend.VaultBackend", "memory": ".in_memory_backend.InMemoryVaultBackend", + # deprecated + "local": ".backend.VaultBackend", } def get_vault(cls: str = "remote", **kwargs): """ Get a vault object of class `vault_class` with arguments `vault_args`. Args: cls: vault's class, either 'remote' or 'local' kwargs: arguments to pass to the class' constructor Returns: an instance of VaultBackend (either local or remote) Raises: ValueError if passed an unknown storage class. """ if "args" in kwargs: warnings.warn( 'Explicit "args" key is deprecated, use keys directly instead.', DeprecationWarning, ) kwargs = kwargs["args"] class_path = BACKEND_TYPES.get(cls) if class_path is None: raise ValueError( f"Unknown Vault class `{cls}`. " f"Supported: {', '.join(BACKEND_TYPES)}" ) (module_path, class_name) = class_path.rsplit(".", 1) module = importlib.import_module(module_path, package=__package__) Vault = getattr(module, class_name) return Vault(**kwargs) + + +get_datastore = get_vault diff --git a/swh/vault/backend.py b/swh/vault/backend.py index b5c8ac1..c75bd00 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,529 +1,531 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections from email.mime.text import MIMEText import smtplib from typing import Any, Dict, List, Optional, Tuple import psycopg2.extras import psycopg2.pool from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.model.swhids import CoreSWHID from swh.scheduler import get_scheduler from swh.scheduler.utils import create_oneshot_task_dict from swh.storage import get_storage from swh.vault.cache import VaultCache from swh.vault.cookers import COOKER_TYPES, get_cooker_cls from swh.vault.exc import NotFoundExc cooking_task_name = "swh.vault.cooking_tasks.SWHCookingTask" NOTIF_EMAIL_FROM = '"Software Heritage Vault" ' "" NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {bundle_type} {short_id}" NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {bundle_type} {short_id}" NOTIF_EMAIL_BODY_SUCCESS = """ You have requested the following bundle from the Software Heritage Vault: Bundle Type: {bundle_type} Object SWHID: {swhid} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ NOTIF_EMAIL_BODY_FAILURE = """ You have requested the following bundle from the Software Heritage Vault: Bundle Type: {bundle_type} Object SWHID: {swhid} This bundle could not be cooked for the following reason: {progress_msg} We apologize for the inconvenience. --\x20 The Software Heritage Developers """ class VaultBackend: """ Backend for the Software Heritage Vault. """ + current_version = 4 + def __init__(self, **config): self.config = config self.cache = VaultCache(**config["cache"]) self.scheduler = get_scheduler(**config["scheduler"]) self.storage = get_storage(**config["storage"]) self.smtp_server = smtplib.SMTP(**config.get("smtp", {})) db_conn = config["db"] self._pool = psycopg2.pool.ThreadedConnectionPool( config.get("min_pool_conns", 1), config.get("max_pool_conns", 10), db_conn, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() @db_transaction() def progress( self, bundle_type: str, swhid: CoreSWHID, raise_notfound: bool = True, db=None, cur=None, ) -> Optional[Dict[str, Any]]: cur.execute( """ SELECT id, type, swhid, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND swhid = %s""", (bundle_type, str(swhid)), ) res = cur.fetchone() if not res: if raise_notfound: raise NotFoundExc(f"{bundle_type} {swhid} was not found.") return None res["swhid"] = CoreSWHID.from_string(res["swhid"]) return res def _send_task(self, bundle_type: str, swhid: CoreSWHID): """Send a cooking task to the celery scheduler""" task = create_oneshot_task_dict("cook-vault-bundle", bundle_type, str(swhid)) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]["id"] @db_transaction() def create_task( self, bundle_type: str, swhid: CoreSWHID, sticky: bool = False, db=None, cur=None, ): """Create and send a cooking task""" cooker_class = get_cooker_cls(bundle_type, swhid.object_type) cooker = cooker_class(swhid, backend=self, storage=self.storage) if not cooker.check_exists(): raise NotFoundExc(f"{bundle_type} {swhid} was not found.") cur.execute( """ INSERT INTO vault_bundle (type, swhid, sticky) VALUES (%s, %s, %s)""", (bundle_type, str(swhid), sticky), ) db.conn.commit() task_id = self._send_task(bundle_type, swhid) cur.execute( """ UPDATE vault_bundle SET task_id = %s WHERE type = %s AND swhid = %s""", (task_id, bundle_type, str(swhid)), ) @db_transaction() def add_notif_email( self, bundle_type: str, swhid: CoreSWHID, email: str, db=None, cur=None ): """Add an e-mail address to notify when a given bundle is ready""" cur.execute( """ INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND swhid = %s))""", (email, bundle_type, str(swhid)), ) def put_bundle(self, bundle_type: str, swhid: CoreSWHID, bundle) -> bool: self.cache.add(bundle_type, swhid, bundle) return True @db_transaction() def cook( self, bundle_type: str, swhid: CoreSWHID, *, sticky: bool = False, email: Optional[str] = None, db=None, cur=None, ) -> Dict[str, Any]: info = self.progress(bundle_type, swhid, raise_notfound=False) if bundle_type not in COOKER_TYPES: raise NotFoundExc(f"{bundle_type} is an unknown type.") # If there's a failed bundle entry, delete it first. if info is not None and info["task_status"] == "failed": cur.execute( "DELETE FROM vault_bundle WHERE type = %s AND swhid = %s", (bundle_type, str(swhid)), ) db.conn.commit() info = None # If there's no bundle entry, create the task. if info is None: self.create_task(bundle_type, swhid, sticky) if email is not None: # If the task is already done, send the email directly if info is not None and info["task_status"] == "done": self.send_notification( None, email, bundle_type, swhid, info["task_status"] ) # Else, add it to the notification queue else: self.add_notif_email(bundle_type, swhid, email) return self.progress(bundle_type, swhid) @db_transaction() def batch_cook( self, batch: List[Tuple[str, str]], db=None, cur=None ) -> Dict[str, int]: # Import execute_values at runtime only, because it requires # psycopg2 >= 2.7 (only available on postgresql servers) from psycopg2.extras import execute_values for bundle_type, _ in batch: if bundle_type not in COOKER_TYPES: raise NotFoundExc(f"{bundle_type} is an unknown type.") cur.execute( """ INSERT INTO vault_batch (id) VALUES (DEFAULT) RETURNING id""" ) batch_id = cur.fetchone()["id"] # Delete all failed bundles from the batch cur.execute( """ DELETE FROM vault_bundle WHERE task_status = 'failed' AND (type, swhid) IN %s""", (tuple(batch),), ) # Insert all the bundles, return the new ones execute_values( cur, """ INSERT INTO vault_bundle (type, swhid) VALUES %s ON CONFLICT DO NOTHING""", batch, ) # Get the bundle ids and task status cur.execute( """ SELECT id, type, swhid, task_id FROM vault_bundle WHERE (type, swhid) IN %s""", (tuple(batch),), ) bundles = cur.fetchall() # Insert the batch-bundle entries batch_id_bundle_ids = [(batch_id, row["id"]) for row in bundles] execute_values( cur, """ INSERT INTO vault_batch_bundle (batch_id, bundle_id) VALUES %s ON CONFLICT DO NOTHING""", batch_id_bundle_ids, ) db.conn.commit() # Get the tasks to fetch batch_new = [ (row["type"], CoreSWHID.from_string(row["swhid"])) for row in bundles if row["task_id"] is None ] # Send the tasks args_batch = [(bundle_type, swhid) for bundle_type, swhid in batch_new] # TODO: change once the scheduler handles priority tasks tasks = [ create_oneshot_task_dict("swh-vault-batch-cooking", *args) for args in args_batch ] added_tasks = self.scheduler.create_tasks(tasks) tasks_ids_bundle_ids = [ (task_id, bundle_type, swhid) for task_id, (bundle_type, swhid) in zip( [task["id"] for task in added_tasks], batch_new ) ] # Update the task ids execute_values( cur, """ UPDATE vault_bundle SET task_id = s_task_id FROM (VALUES %s) AS sub (s_task_id, s_type, s_swhid) WHERE type = s_type::cook_type AND swhid = s_swhid """, tasks_ids_bundle_ids, ) return {"id": batch_id} @db_transaction() def batch_progress(self, batch_id: int, db=None, cur=None) -> Dict[str, Any]: cur.execute( """ SELECT vault_bundle.id as id, type, swhid, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_batch_bundle LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id WHERE batch_id = %s""", (batch_id,), ) bundles = cur.fetchall() if not bundles: raise NotFoundExc(f"Batch {batch_id} does not exist.") for bundle in bundles: bundle["swhid"] = CoreSWHID.from_string(bundle["swhid"]) counter = collections.Counter(b["status"] for b in bundles) res = { "bundles": bundles, "total": len(bundles), **{k: 0 for k in ("new", "pending", "done", "failed")}, **dict(counter), } return res @db_transaction() def is_available(self, bundle_type: str, swhid: CoreSWHID, db=None, cur=None): """Check whether a bundle is available for retrieval""" info = self.progress(bundle_type, swhid, raise_notfound=False, cur=cur) return ( info is not None and info["task_status"] == "done" and self.cache.is_cached(bundle_type, swhid) ) @db_transaction() def fetch( self, bundle_type: str, swhid: CoreSWHID, raise_notfound=True, db=None, cur=None ) -> Optional[bytes]: """Retrieve a bundle from the cache""" available = self.is_available(bundle_type, swhid, cur=cur) if not available: if raise_notfound: raise NotFoundExc(f"{bundle_type} {swhid} is not available.") return None self.update_access_ts(bundle_type, swhid, cur=cur) return self.cache.get(bundle_type, swhid) @db_transaction() def update_access_ts(self, bundle_type: str, swhid: CoreSWHID, db=None, cur=None): """Update the last access timestamp of a bundle""" cur.execute( """ UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND swhid = %s""", (bundle_type, str(swhid)), ) @db_transaction() def set_status( self, bundle_type: str, swhid: CoreSWHID, status: str, db=None, cur=None ) -> bool: req = ( """ UPDATE vault_bundle SET task_status = %s """ + (""", ts_done = NOW() """ if status == "done" else "") + """WHERE type = %s AND swhid = %s""" ) cur.execute(req, (status, bundle_type, str(swhid))) return True @db_transaction() def set_progress( self, bundle_type: str, swhid: CoreSWHID, progress: str, db=None, cur=None ) -> bool: cur.execute( """ UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND swhid = %s""", (progress, bundle_type, str(swhid)), ) return True @db_transaction() def send_notif(self, bundle_type: str, swhid: CoreSWHID, db=None, cur=None) -> bool: cur.execute( """ SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.swhid = %s""", (bundle_type, str(swhid)), ) for d in cur: self.send_notification( d["id"], d["email"], bundle_type, swhid, status=d["task_status"], progress_msg=d["progress_msg"], ) return True @db_transaction() def send_notification( self, n_id: Optional[int], email: str, bundle_type: str, swhid: CoreSWHID, status: str, progress_msg: Optional[str] = None, db=None, cur=None, ) -> None: """Send the notification of a bundle to a specific e-mail""" short_id = swhid.object_id.hex()[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = "https://archive.softwareheritage.org/api/1/vault/{}/{}/" "raw".format( bundle_type, swhid ) if status == "done": text = NOTIF_EMAIL_BODY_SUCCESS.strip() text = text.format(bundle_type=bundle_type, swhid=swhid, url=url) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_SUCCESS.format( bundle_type=bundle_type, short_id=short_id ) elif status == "failed": text = NOTIF_EMAIL_BODY_FAILURE.strip() text = text.format( bundle_type=bundle_type, swhid=swhid, progress_msg=progress_msg ) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_FAILURE.format( bundle_type=bundle_type, short_id=short_id ) else: raise RuntimeError( "send_notification called on a '{}' bundle".format(status) ) msg["From"] = NOTIF_EMAIL_FROM msg["To"] = email self._smtp_send(msg) if n_id is not None: cur.execute( """ DELETE FROM vault_notif_email WHERE id = %s""", (n_id,), ) def _smtp_send(self, msg: MIMEText): # Reconnect if needed try: status = self.smtp_server.noop()[0] except smtplib.SMTPException: status = -1 if status != 250: self.smtp_server.connect("localhost", 25) # Send the message self.smtp_server.send_message(msg) @db_transaction() def _cache_expire(self, cond, *args, db=None, cur=None) -> None: """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cur.execute( """ DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, swhid """.format( cond ), args, ) for d in cur: self.cache.delete(d["type"], CoreSWHID.from_string(d["swhid"])) @db_transaction() def cache_expire_oldest(self, n=1, by="last_access", db=None, cur=None) -> None: """Expire the `n` oldest bundles""" assert by in ("created", "done", "last_access") filter = """ORDER BY ts_{} LIMIT {}""".format(by, n) return self._cache_expire(filter) @db_transaction() def cache_expire_until(self, date, by="last_access", db=None, cur=None) -> None: """Expire all the bundles until a certain date""" assert by in ("created", "done", "last_access") filter = """AND ts_{} <= %s""".format(by) return self._cache_expire(filter, date) diff --git a/swh/vault/cookers/git_bare.py b/swh/vault/cookers/git_bare.py index de0ac1c..bb1d748 100644 --- a/swh/vault/cookers/git_bare.py +++ b/swh/vault/cookers/git_bare.py @@ -1,691 +1,720 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ This cooker creates tarballs containing a bare .git directory, that can be unpacked and cloned like any git repository. It works in three steps: 1. Write objects one by one in :file:`.git/objects/` 2. Calls ``git repack`` to pack all these objects into git packfiles. 3. Creates a tarball of the resulting repository It keeps a set of all written (or about-to-be-written) object hashes in memory to avoid downloading and writing the same objects twice. The first step is the most complex. When swh-graph is available, this roughly does the following: 1. Find all the revisions and releases in the induced subgraph, adds them to todo-lists 2. Grab a batch from (release/revision/directory/content) todo-lists, and load them. Add directory and content objects they reference to the todo-list 3. If any todo-list is not empty, goto 1 When swh-graph is not available, steps 1 and 2 are merged, because revisions need to be loaded in order to compute the subgraph. """ import datetime import enum import glob import logging import multiprocessing.dummy import os.path import re import subprocess import tarfile import tempfile from typing import Any, Dict, Iterable, Iterator, List, NoReturn, Optional, Set, Tuple import zlib from swh.core.api.classes import stream_results_optional from swh.model import git_objects from swh.model.hashutil import hash_to_bytehex, hash_to_hex from swh.model.model import ( Person, Release, Revision, RevisionType, Sha1Git, Snapshot, SnapshotBranch, TargetType, TimestampWithTimezone, ) from swh.model.model import Content, Directory, DirectoryEntry from swh.model.model import ObjectType as ModelObjectType from swh.model.swhids import CoreSWHID, ObjectType from swh.storage.algos.revisions_walker import DFSRevisionsWalker from swh.storage.algos.snapshot import snapshot_get_all_branches from swh.vault.cookers.base import BaseVaultCooker from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE RELEASE_BATCH_SIZE = 10000 REVISION_BATCH_SIZE = 10000 DIRECTORY_BATCH_SIZE = 10000 CONTENT_BATCH_SIZE = 100 logger = logging.getLogger(__name__) class RootObjectType(enum.Enum): DIRECTORY = "directory" REVISION = "revision" + RELEASE = "release" SNAPSHOT = "snapshot" def assert_never(value: NoReturn, msg) -> NoReturn: """mypy makes sure this function is never called, through exhaustive checking of ``value`` in the parent function. See https://mypy.readthedocs.io/en/latest/literal_types.html#exhaustive-checks for details. """ assert False, msg class GitBareCooker(BaseVaultCooker): BUNDLE_TYPE = "git_bare" SUPPORTED_OBJECT_TYPES = {ObjectType[obj_type.name] for obj_type in RootObjectType} use_fsck = True obj_type: RootObjectType def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.obj_type = RootObjectType[self.swhid.object_type.name] def check_exists(self) -> bool: """Returns whether the root object is present in the archive.""" if self.obj_type is RootObjectType.REVISION: return not list(self.storage.revision_missing([self.obj_id])) + elif self.obj_type is RootObjectType.RELEASE: + return not list(self.storage.release_missing([self.obj_id])) elif self.obj_type is RootObjectType.DIRECTORY: return not list(self.storage.directory_missing([self.obj_id])) elif self.obj_type is RootObjectType.SNAPSHOT: return not list(self.storage.snapshot_missing([self.obj_id])) else: assert_never(self.obj_type, f"Unexpected root object type: {self.obj_type}") def _push(self, stack: List[Sha1Git], obj_ids: Iterable[Sha1Git]) -> None: """Adds all the given ``obj_ids`` to the given ``stack``, unless they are already in ``self._seen``, and adds them to ``self._seen``.""" assert not isinstance(obj_ids, bytes) revision_ids = [id_ for id_ in obj_ids if id_ not in self._seen] self._seen.update(revision_ids) stack.extend(revision_ids) def _pop(self, stack: List[Sha1Git], n: int) -> List[Sha1Git]: """Removes ``n`` object from the ``stack`` and returns them.""" obj_ids = stack[-n:] stack[-n:] = [] return obj_ids def prepare_bundle(self): """Main entry point. Initializes the state, creates the bundle, and sends it to the backend.""" # Objects we will visit soon (aka. "todo-lists"): self._rel_stack: List[Sha1Git] = [] self._rev_stack: List[Sha1Git] = [] self._dir_stack: List[Sha1Git] = [] self._cnt_stack: List[Sha1Git] = [] # Set of objects already in any of the stacks: self._seen: Set[Sha1Git] = set() self._walker_state: Optional[Any] = None # Set of errors we expect git-fsck to raise at the end: self._expected_fsck_errors = set() with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir: # Initialize a Git directory self.workdir = workdir self.gitdir = os.path.join(workdir, "clone.git") os.mkdir(self.gitdir) self.init_git() self.nb_loaded = 0 # Add the root object to the stack of objects to visit self.push_subgraph(self.obj_type, self.obj_id) # Load and write all the objects to disk self.load_objects() self.backend.set_progress( self.BUNDLE_TYPE, self.swhid, "Writing references..." ) # Write the root object as a ref (this step is skipped if it's a snapshot) # This must be done before repacking; git-repack ignores orphan objects. self.write_refs() self.backend.set_progress( self.BUNDLE_TYPE, self.swhid, "Checking content integrity" ) if self.use_fsck: self.git_fsck() self.backend.set_progress( self.BUNDLE_TYPE, self.swhid, "Creating final bundle" ) self.repack() self.write_archive() self.backend.set_progress(self.BUNDLE_TYPE, self.swhid, "Uploading bundle") def init_git(self) -> None: """Creates an empty :file:`.git` directory.""" subprocess.run(["git", "-C", self.gitdir, "init", "--bare"], check=True) self.create_object_dirs() # Remove example hooks; they take ~40KB and we don't use them for filename in glob.glob(os.path.join(self.gitdir, "hooks", "*.sample")): os.unlink(filename) def create_object_dirs(self) -> None: """Creates all possible subdirectories of :file:`.git/objects/`""" # Create all possible dirs ahead of time, so we don't have to check for # existence every time. for byte in range(256): try: os.mkdir(os.path.join(self.gitdir, "objects", f"{byte:02x}")) except FileExistsError: pass def repack(self) -> None: """Moves all objects from :file:`.git/objects/` to a packfile.""" try: subprocess.run(["git", "-C", self.gitdir, "repack", "-d"], check=True) except subprocess.CalledProcessError: logging.exception("git-repack failed with:") # Remove their non-packed originals subprocess.run(["git", "-C", self.gitdir, "prune-packed"], check=True) def git_fsck(self) -> None: """Runs git-fsck and ignores expected errors (eg. because of missing objects).""" proc = subprocess.run( ["git", "-C", self.gitdir, "fsck"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"LANG": "C.utf8"}, ) # Split on newlines not followed by a space errors = re.split("\n(?! )", proc.stdout.decode()) errors = [ error for error in errors if error and not error.startswith("warning ") ] unexpected_errors = set(errors) - self._expected_fsck_errors if unexpected_errors: logging.error( "Unexpected errors from git-fsck after cooking %s: %s", self.swhid, "\n".join(sorted(unexpected_errors)), ) + def _make_stub_directory_revision(self, dir_id: Sha1Git) -> Sha1Git: + author = Person.from_fullname( + b"swh-vault, git-bare cooker " + ) + dt = datetime.datetime.now(tz=datetime.timezone.utc) + dt = dt.replace(microsecond=0) # not supported by git + date = TimestampWithTimezone.from_datetime(dt) + + revision = Revision( + author=author, + committer=author, + date=date, + committer_date=date, + message=b"Initial commit", + type=RevisionType.GIT, + directory=self.obj_id, + synthetic=True, + ) + self.write_revision_node(revision) + + return revision.id + def write_refs(self, snapshot=None): """Writes all files in :file:`.git/refs/`. For non-snapshot objects, this is only ``master``.""" refs: Dict[bytes, bytes] # ref name -> target if self.obj_type == RootObjectType.DIRECTORY: # We need a synthetic revision pointing to the directory - author = Person.from_fullname( - b"swh-vault, git-bare cooker " - ) - dt = datetime.datetime.now(tz=datetime.timezone.utc) - dt = dt.replace(microsecond=0) # not supported by git - date = TimestampWithTimezone.from_datetime(dt) - revision = Revision( - author=author, - committer=author, - date=date, - committer_date=date, - message=b"Initial commit", - type=RevisionType.GIT, - directory=self.obj_id, - synthetic=True, - ) - self.write_revision_node(revision) - refs = {b"refs/heads/master": hash_to_bytehex(revision.id)} + rev_id = self._make_stub_directory_revision(self.obj_id) + + refs = {b"refs/heads/master": hash_to_bytehex(rev_id)} elif self.obj_type == RootObjectType.REVISION: refs = {b"refs/heads/master": hash_to_bytehex(self.obj_id)} + elif self.obj_type == RootObjectType.RELEASE: + (release,) = self.storage.release_get([self.obj_id]) + + if release.name and re.match(br"^[a-zA-Z0-9_.-]+$", release.name): + release_name = release.name + else: + release_name = b"release" + + refs = { + b"refs/tags/" + release_name: hash_to_bytehex(self.obj_id), + } + + if release.target_type.value == ModelObjectType.REVISION: + # Not necessary, but makes it easier to browse + refs[b"ref/heads/master"] = hash_to_bytehex(release.target) + # TODO: synthetize a master branch for other target types + elif self.obj_type == RootObjectType.SNAPSHOT: if snapshot is None: # refs were already written in a previous step return branches = [] for (branch_name, branch) in snapshot.branches.items(): if branch is None: logging.error( "%s has dangling branch: %r", snapshot.swhid(), branch_name ) else: branches.append((branch_name, branch)) refs = { branch_name: ( b"ref: " + branch.target if branch.target_type == TargetType.ALIAS else hash_to_bytehex(branch.target) ) for (branch_name, branch) in branches } else: assert_never(self.obj_type, f"Unexpected root object type: {self.obj_type}") for (ref_name, ref_target) in refs.items(): path = os.path.join(self.gitdir.encode(), ref_name) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "wb") as fd: fd.write(ref_target) def write_archive(self): """Creates the final .tar file.""" with tarfile.TarFile(mode="w", fileobj=self.fileobj) as tf: tf.add(self.gitdir, arcname=f"{self.swhid}.git", recursive=True) def _obj_path(self, obj_id: Sha1Git): """Returns the absolute path of file (in :file:`.git/objects/`) that will contain the git object identified by the ``obj_id``.""" return os.path.join(self.gitdir, self._obj_relative_path(obj_id)) def _obj_relative_path(self, obj_id: Sha1Git): """Same as :meth:`_obj_path`, but relative.""" obj_id_hex = hash_to_hex(obj_id) directory = obj_id_hex[0:2] filename = obj_id_hex[2:] return os.path.join("objects", directory, filename) def object_exists(self, obj_id: Sha1Git) -> bool: """Returns whether the object identified by the given ``obj_id`` was already written to a file in :file:`.git/object/`. This function ignores objects contained in a git pack.""" return os.path.exists(self._obj_path(obj_id)) def write_object(self, obj_id: Sha1Git, obj: bytes) -> bool: """Writes a git object on disk. Returns whether it was already written.""" # Git requires objects to be zlib-compressed; but repacking decompresses and # removes them, so we don't need to compress them too much. data = zlib.compress(obj, level=1) with open(self._obj_path(obj_id), "wb") as fd: fd.write(data) return True def push_subgraph(self, obj_type: RootObjectType, obj_id) -> None: """Adds graph induced by the given ``obj_id`` without recursing through directories, to the todo-lists. If swh-graph is not available, this immediately loads revisions, as they need to be fetched in order to compute the subgraph, and fetching them immediately avoids duplicate fetches.""" if self.obj_type is RootObjectType.REVISION: self.push_revision_subgraph(obj_id) elif self.obj_type is RootObjectType.DIRECTORY: self._push(self._dir_stack, [obj_id]) elif self.obj_type is RootObjectType.SNAPSHOT: self.push_snapshot_subgraph(obj_id) + elif self.obj_type is RootObjectType.RELEASE: + self.push_releases_subgraphs([obj_id]) else: assert_never(self.obj_type, f"Unexpected root object type: {self.obj_type}") def load_objects(self) -> None: """Repeatedly loads objects in the todo-lists, until all lists are empty.""" while self._rel_stack or self._rev_stack or self._dir_stack or self._cnt_stack: nb_remaining = ( len(self._rel_stack) + len(self._rev_stack) + len(self._dir_stack) + len(self._cnt_stack) ) # We assume assume nb_remaining is a lower bound. # When the snapshot was loaded with swh-graph, this should be the exact # value, though. self.backend.set_progress( self.BUNDLE_TYPE, self.swhid, f"Processing... {self.nb_loaded} objects processed\n" f"Over {nb_remaining} remaining", ) release_ids = self._pop(self._rel_stack, RELEASE_BATCH_SIZE) if release_ids: self.load_releases(release_ids) self.nb_loaded += len(release_ids) revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) if revision_ids: self.load_revisions(revision_ids) self.nb_loaded += len(revision_ids) directory_ids = self._pop(self._dir_stack, DIRECTORY_BATCH_SIZE) if directory_ids: self.load_directories(directory_ids) self.nb_loaded += len(directory_ids) content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE) if content_ids: self.load_contents(content_ids) self.nb_loaded += len(content_ids) def push_revision_subgraph(self, obj_id: Sha1Git) -> None: """Fetches the graph of revisions induced by the given ``obj_id`` and adds them to ``self._rev_stack``. If swh-graph is not available, this requires fetching the revisions themselves, so they are directly loaded instead.""" loaded_from_graph = False if self.graph: from swh.graph.client import GraphArgumentException # First, try to cook using swh-graph, as it is more efficient than # swh-storage for querying the history obj_swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id,) try: revision_ids = ( swhid.object_id for swhid in map( CoreSWHID.from_string, self.graph.visit_nodes(str(obj_swhid), edges="rev:rev"), ) ) self._push(self._rev_stack, revision_ids) except GraphArgumentException as e: logger.info( "Revision %s not found in swh-graph, falling back to fetching " "history using swh-storage. %s", hash_to_hex(obj_id), e.args[0], ) else: loaded_from_graph = True if not loaded_from_graph: # If swh-graph is not available, or the revision is not yet in # swh-graph, fall back to self.storage.revision_log. # self.storage.revision_log also gives us the full revisions, # so we load them right now instead of just pushing them on the stack. walker = DFSRevisionsWalker( self.storage, obj_id, state=self._walker_state, ignore_displayname=True ) for revision in walker: self.write_revision_node(Revision.from_dict(revision)) self.nb_loaded += 1 self._push(self._dir_stack, [revision["directory"]]) # Save the state, so the next call to the walker won't return the same # revisions self._walker_state = walker.export_state() def push_snapshot_subgraph(self, obj_id: Sha1Git) -> None: """Fetches a snapshot and all its children, excluding directories and contents, and pushes them to the todo-lists. Also loads revisions if swh-graph is not available, see :meth:`push_revision_subgraph`.""" loaded_from_graph = False if self.graph: revision_ids = [] release_ids = [] directory_ids = [] content_ids = [] from swh.graph.client import GraphArgumentException # First, try to cook using swh-graph, as it is more efficient than # swh-storage for querying the history obj_swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=obj_id,) try: swhids: Iterable[CoreSWHID] = map( CoreSWHID.from_string, self.graph.visit_nodes(str(obj_swhid), edges="snp:*,rel:*,rev:rev"), ) for swhid in swhids: if swhid.object_type is ObjectType.REVISION: revision_ids.append(swhid.object_id) elif swhid.object_type is ObjectType.RELEASE: release_ids.append(swhid.object_id) elif swhid.object_type is ObjectType.DIRECTORY: directory_ids.append(swhid.object_id) elif swhid.object_type is ObjectType.CONTENT: content_ids.append(swhid.object_id) elif swhid.object_type is ObjectType.SNAPSHOT: assert ( swhid.object_id == obj_id ), f"Snapshot {obj_id.hex()} references a different snapshot" else: assert_never( swhid.object_type, f"Unexpected SWHID object type: {swhid}" ) except GraphArgumentException as e: logger.info( "Snapshot %s not found in swh-graph, falling back to fetching " "history for each branch. %s", hash_to_hex(obj_id), e.args[0], ) else: self._push(self._rev_stack, revision_ids) self._push(self._rel_stack, release_ids) self._push(self._dir_stack, directory_ids) self._push(self._cnt_stack, content_ids) loaded_from_graph = True # TODO: when self.graph is available and supports edge labels, use it # directly to get branch names. snapshot: Optional[Snapshot] = snapshot_get_all_branches(self.storage, obj_id) assert snapshot, "Unknown snapshot" # should have been caught by check_exists() for branch in snapshot.branches.values(): if not loaded_from_graph: if branch is None: logging.warning("Dangling branch: %r", branch) continue assert isinstance(branch, SnapshotBranch) # for mypy if branch.target_type is TargetType.REVISION: self.push_revision_subgraph(branch.target) elif branch.target_type is TargetType.RELEASE: self.push_releases_subgraphs([branch.target]) elif branch.target_type is TargetType.ALIAS: # Nothing to do, this for loop also iterates on the target branch # (if it exists) pass elif branch.target_type is TargetType.DIRECTORY: self._push(self._dir_stack, [branch.target]) elif branch.target_type is TargetType.CONTENT: self._push(self._cnt_stack, [branch.target]) elif branch.target_type is TargetType.SNAPSHOT: if swhid.object_id != obj_id: raise NotImplementedError( f"{swhid} has a snapshot as a branch." ) else: assert_never( branch.target_type, f"Unexpected target type: {self.obj_type}" ) self.write_refs(snapshot=snapshot) def load_revisions(self, obj_ids: List[Sha1Git]) -> None: """Given a list of revision ids, loads these revisions and their directories; but not their parent revisions (ie. this is not recursive).""" ret: List[Optional[Revision]] = self.storage.revision_get( obj_ids, ignore_displayname=True ) revisions: List[Revision] = list(filter(None, ret)) if len(ret) != len(revisions): logger.error("Missing revision(s), ignoring them.") for revision in revisions: self.write_revision_node(revision) self._push(self._dir_stack, (rev.directory for rev in revisions)) def write_revision_node(self, revision: Revision) -> bool: """Writes a revision object to disk""" git_object = revision.raw_manifest or git_objects.revision_git_object(revision) return self.write_object(revision.id, git_object) def load_releases(self, obj_ids: List[Sha1Git]) -> List[Release]: """Loads release objects, and returns them.""" ret = self.storage.release_get(obj_ids, ignore_displayname=True) releases = list(filter(None, ret)) if len(ret) != len(releases): logger.error("Missing release(s), ignoring them.") for release in releases: self.write_release_node(release) return releases def push_releases_subgraphs(self, obj_ids: List[Sha1Git]) -> None: """Given a list of release ids, loads these releases and adds their target to the list of objects to visit""" for release in self.load_releases(obj_ids): self.nb_loaded += 1 assert release.target, "{release.swhid(}) has no target" if release.target_type is ModelObjectType.REVISION: self.push_revision_subgraph(release.target) elif release.target_type is ModelObjectType.DIRECTORY: self._push(self._dir_stack, [release.target]) elif release.target_type is ModelObjectType.CONTENT: self._push(self._cnt_stack, [release.target]) elif release.target_type is ModelObjectType.RELEASE: self.push_releases_subgraphs([release.target]) elif release.target_type is ModelObjectType.SNAPSHOT: raise NotImplementedError( f"{release.swhid()} targets a snapshot: {release.target!r}" ) else: assert_never( release.target_type, f"Unexpected release target type: {release.target_type}", ) def write_release_node(self, release: Release) -> bool: """Writes a release object to disk""" git_object = release.raw_manifest or git_objects.release_git_object(release) return self.write_object(release.id, git_object) def load_directories(self, obj_ids: List[Sha1Git]) -> None: if not obj_ids: return raw_manifests = self.storage.directory_get_raw_manifest(obj_ids) with multiprocessing.dummy.Pool(min(self.thread_pool_size, len(obj_ids))) as p: for _ in p.imap_unordered( lambda obj_id: self.load_directory(obj_id, raw_manifests.get(obj_id)), obj_ids, ): pass def load_directory(self, obj_id: Sha1Git, raw_manifest: Optional[bytes]) -> None: # Load the directory entries_it: Optional[Iterable[DirectoryEntry]] = stream_results_optional( self.storage.directory_get_entries, obj_id ) if entries_it is None: logger.error("Missing swh:1:dir:%s, ignoring.", hash_to_hex(obj_id)) return directory = Directory( id=obj_id, entries=tuple(entries_it), raw_manifest=raw_manifest ) git_object = raw_manifest or git_objects.directory_git_object(directory) self.write_object(obj_id, git_object) # Add children to the stack entry_loaders: Dict[str, Optional[List[Sha1Git]]] = { "file": self._cnt_stack, "dir": self._dir_stack, "rev": None, # Do not include submodule targets (rejected by git-fsck) } for entry in directory.entries: stack = entry_loaders[entry.type] if stack is not None: self._push(stack, [entry.target]) def load_contents(self, obj_ids: List[Sha1Git]) -> None: # TODO: add support of filtered objects, somehow? # It's tricky, because, by definition, we can't write a git object with # the expected hash, so git-fsck *will* choke on it. contents = self.storage.content_get(obj_ids, "sha1_git") visible_contents = [] for (obj_id, content) in zip(obj_ids, contents): if content is None: # FIXME: this may also happen for missing content self.write_content(obj_id, SKIPPED_MESSAGE) self._expect_mismatched_object_error(obj_id) elif content.status == "visible": visible_contents.append(content) elif content.status == "hidden": self.write_content(obj_id, HIDDEN_MESSAGE) self._expect_mismatched_object_error(obj_id) elif content.status == "absent": assert False, f"content_get returned absent content {content.swhid()}" else: # TODO: When content.status will have type Literal, replace this with # assert_never assert False, f"{content.swhid} has status: {content.status!r}" contents_and_data: Iterator[Tuple[Content, Optional[bytes]]] if self.objstorage is None: contents_and_data = ( (content, self.storage.content_get_data(content.sha1)) for content in visible_contents ) else: contents_and_data = zip( visible_contents, self.objstorage.get_batch(c.sha1 for c in visible_contents), ) for (content, datum) in contents_and_data: if datum is None: logger.error( "%s is visible, but is missing data. Skipping.", content.swhid() ) continue self.write_content(content.sha1_git, datum) def write_content(self, obj_id: Sha1Git, content: bytes) -> None: header = git_objects.git_object_header("blob", len(content)) self.write_object(obj_id, header + content) def _expect_mismatched_object_error(self, obj_id): obj_id_hex = hash_to_hex(obj_id) obj_path = self._obj_relative_path(obj_id) # For Git < 2.21: self._expected_fsck_errors.add( f"error: sha1 mismatch for ./{obj_path} (expected {obj_id_hex})" ) # For Git >= 2.21: self._expected_fsck_errors.add( f"error: hash mismatch for ./{obj_path} (expected {obj_id_hex})" ) self._expected_fsck_errors.add( f"error: {obj_id_hex}: object corrupt or missing: ./{obj_path}" ) self._expected_fsck_errors.add(f"missing blob {obj_id_hex}") diff --git a/swh/vault/sql/30-schema.sql b/swh/vault/sql/30-schema.sql index b170c5f..a66e8ce 100644 --- a/swh/vault/sql/30-schema.sql +++ b/swh/vault/sql/30-schema.sql @@ -1,59 +1,49 @@ -create table if not exists dbversion -( - version int primary key, - release timestamptz not null, - description text not null -); -comment on table dbversion is 'Schema update tracking'; -insert into dbversion (version, release, description) - values (4, now(), 'Initial version'); - create domain obj_hash as bytea; create type bundle_type as enum ('flat', 'gitfast', 'git_bare'); comment on type bundle_type is 'Type of the requested bundle'; create type cook_status as enum ('new', 'pending', 'done', 'failed'); comment on type cook_status is 'Status of the cooking'; create table vault_bundle ( id bigserial primary key, type bundle_type not null, swhid text not null, -- requested object ID task_id integer, -- scheduler task id task_status cook_status not null default 'new', -- status of the task sticky boolean not null default false, -- bundle cannot expire ts_created timestamptz not null default now(), -- timestamp of creation ts_done timestamptz, -- timestamp of the cooking result ts_last_access timestamptz not null default now(), -- last access progress_msg text -- progress message ); create unique index concurrently vault_bundle_type_swhid on vault_bundle (type, swhid); create index concurrently vault_bundle_task_id on vault_bundle (task_id); create table vault_notif_email ( id bigserial primary key, email text not null, -- e-mail to notify bundle_id bigint not null references vault_bundle(id) on delete cascade ); create index concurrently vault_notif_email_bundle on vault_notif_email (bundle_id); create index concurrently vault_notif_email_email on vault_notif_email (email); create table vault_batch ( id bigserial primary key ); create table vault_batch_bundle ( batch_id bigint not null references vault_batch(id) on delete cascade, bundle_id bigint not null references vault_bundle(id) on delete cascade ); create unique index concurrently vault_batch_bundle_pkey on vault_batch_bundle (batch_id, bundle_id); diff --git a/sql/upgrades/002.sql b/swh/vault/sql/upgrades/002.sql similarity index 100% rename from sql/upgrades/002.sql rename to swh/vault/sql/upgrades/002.sql diff --git a/sql/upgrades/003.sql b/swh/vault/sql/upgrades/003.sql similarity index 100% rename from sql/upgrades/003.sql rename to swh/vault/sql/upgrades/003.sql diff --git a/swh/vault/tests/conftest.py b/swh/vault/tests/conftest.py index e6d7efa..be1b266 100644 --- a/swh/vault/tests/conftest.py +++ b/swh/vault/tests/conftest.py @@ -1,93 +1,104 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from functools import partial import os from typing import Any, Dict import pkg_resources.extern.packaging.version import pytest +from pytest_postgresql import factories import yaml -from swh.core.db.pytest_plugin import postgresql_fact -from swh.storage.tests import SQL_DIR as STORAGE_SQL_DIR -import swh.vault +from swh.core.db.pytest_plugin import initialize_database_for_module, postgresql_fact +from swh.storage.postgresql.db import Db as StorageDb from swh.vault import get_vault +from swh.vault.backend import VaultBackend os.environ["LC_ALL"] = "C.UTF-8" # needed for directory tests on git-cloned repositories # 022 is usually the default value, but some environments (eg. Debian builds) have # a different one. os.umask(0o022) pytest_v = pkg_resources.get_distribution("pytest").parsed_version if pytest_v < pkg_resources.extern.packaging.version.parse("3.9"): @pytest.fixture def tmp_path(): import pathlib import tempfile with tempfile.TemporaryDirectory() as tmpdir: yield pathlib.Path(tmpdir) -VAULT_SQL_DIR = os.path.join(os.path.dirname(swh.vault.__file__), "sql") - +storage_postgresql_proc = factories.postgresql_proc( + dbname="storage", + load=[ + partial(initialize_database_for_module, "storage", StorageDb.current_version) + ], +) -postgres_vault = postgresql_fact( - "postgresql_proc", dbname="vault", dump_files=f"{VAULT_SQL_DIR}/*.sql" +vault_postgresql_proc = factories.postgresql_proc( + dbname="vault", + load=[ + partial(initialize_database_for_module, "vault", VaultBackend.current_version) + ], ) + +postgres_vault = postgresql_fact("vault_postgresql_proc") postgres_storage = postgresql_fact( - "postgresql_proc", dbname="storage", dump_files=f"{STORAGE_SQL_DIR}/*.sql" + "storage_postgresql_proc", no_db_drop=True, # keep the db for performance reasons ) @pytest.fixture def swh_vault_config(postgres_vault, postgres_storage, tmp_path) -> Dict[str, Any]: tmp_path = str(tmp_path) return { "db": postgres_vault.dsn, "storage": { "cls": "postgresql", "db": postgres_storage.dsn, "objstorage": { "cls": "pathslicing", "args": {"root": tmp_path, "slicing": "0:1/1:5",}, }, }, "cache": { "cls": "pathslicing", "args": {"root": tmp_path, "slicing": "0:1/1:5", "allow_delete": True}, }, "scheduler": {"cls": "remote", "url": "http://swh-scheduler:5008",}, } @pytest.fixture def swh_local_vault_config(swh_vault_config: Dict[str, Any]) -> Dict[str, Any]: return { "vault": {"cls": "local", **swh_vault_config}, "client_max_size": 1024 ** 3, } @pytest.fixture def swh_vault_config_file(swh_local_vault_config, monkeypatch, tmp_path): conf_path = os.path.join(str(tmp_path), "vault-server.yml") with open(conf_path, "w") as f: f.write(yaml.dump(swh_local_vault_config)) monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) return conf_path @pytest.fixture def swh_vault(swh_vault_config): return get_vault("local", **swh_vault_config) @pytest.fixture def swh_storage(swh_vault): return swh_vault.storage diff --git a/swh/vault/tests/test_git_bare_cooker.py b/swh/vault/tests/test_git_bare_cooker.py index 8a7c008..e16fa9b 100644 --- a/swh/vault/tests/test_git_bare_cooker.py +++ b/swh/vault/tests/test_git_bare_cooker.py @@ -1,567 +1,668 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ This module contains additional tests for the bare cooker. Generic cooker tests (eg. without swh-graph) in test_cookers.py also run on the bare cooker. """ import datetime +import enum import io import subprocess import tarfile import tempfile import unittest.mock import attr import dulwich.repo import pytest from pytest import param from swh.model.from_disk import DentryPerms from swh.model.model import ( Content, Directory, DirectoryEntry, ObjectType, Person, Release, Revision, RevisionType, Snapshot, SnapshotBranch, TargetType, Timestamp, TimestampWithTimezone, ) from swh.vault.cookers.git_bare import GitBareCooker from swh.vault.in_memory_backend import InMemoryVaultBackend +class RootObjects(enum.Enum): + REVISION = enum.auto() + SNAPSHOT = enum.auto() + RELEASE = enum.auto() + WEIRD_RELEASE = enum.auto() # has a : in the name + points to another release + + @pytest.mark.graph @pytest.mark.parametrize( - "snapshot,up_to_date_graph,tag,weird_branches", + "root_object,up_to_date_graph,tag,weird_branches", [ - # 'no snp' implies no tag or tree, because there can only be one root object param( - False, False, False, False, id="no snp, outdated graph, no tag/tree/blob" + RootObjects.REVISION, + False, + False, + False, + id="rev, outdated graph, no tag/tree/blob", + ), + param( + RootObjects.REVISION, + True, + False, + False, + id="rev, updated graph, no tag/tree/blob", + ), + param( + RootObjects.RELEASE, + False, + False, + False, + id="rel, outdated graph, no tag/tree/blob", + ), + param( + RootObjects.RELEASE, + True, + False, + False, + id="rel, updated graph, no tag/tree/blob", + ), + param( + RootObjects.WEIRD_RELEASE, + True, + False, + False, + id="weird rel, updated graph, no tag/tree/blob", + ), + param( + RootObjects.SNAPSHOT, + False, + False, + False, + id="snp, outdated graph, no tag/tree/blob", + ), + param( + RootObjects.SNAPSHOT, + True, + False, + False, + id="snp, updated graph, no tag/tree/blob", + ), + param( + RootObjects.SNAPSHOT, + False, + True, + False, + id="snp, outdated graph, w/ tag, no tree/blob", + ), + param( + RootObjects.SNAPSHOT, + True, + True, + False, + id="snp, updated graph, w/ tag, no tree/blob", + ), + param( + RootObjects.SNAPSHOT, + False, + True, + True, + id="snp, outdated graph, w/ tag, tree, and blob", ), - param(False, True, False, False, id="no snp, updated graph, no tag/tree/blob"), - param(True, False, False, False, id="snp, outdated graph, no tag/tree/blob"), - param(True, True, False, False, id="snp, updated graph, no tag/tree/blob"), - param(True, False, True, False, id="snp, outdated graph, w/ tag, no tree/blob"), - param(True, True, True, False, id="snp, updated graph, w/ tag, no tree/blob"), param( - True, False, True, True, id="snp, outdated graph, w/ tag, tree, and blob" + RootObjects.SNAPSHOT, + True, + True, + True, + id="snp, updated graph, w/ tag, tree, and blob", ), - param(True, True, True, True, id="snp, updated graph, w/ tag, tree, and blob"), ], ) -def test_graph_revisions(swh_storage, up_to_date_graph, snapshot, tag, weird_branches): +def test_graph_revisions( + swh_storage, up_to_date_graph, root_object, tag, weird_branches +): r""" Build objects:: snp /|||\ / ||| \ rel2 <----° /|\ \----> rel4 | / | \ | v / v \ v rev1 <------ rev2 <----° dir4 \ rel3 | | | \ | v v v \ | dir1 dir2 dir3 | | | / | | | | v / v v v v cnt1 <----° cnt2 cnt3 cnt4 cnt5 If up_to_date_graph is true, then swh-graph contains all objects. Else, cnt4, cnt5, dir4, rev2, rel2, rel3, and snp are missing from the graph. If tag is False, rel2 is excluded. If weird_branches is False, dir4, cnt4, rel3, rel4, and cnt5 are excluded. """ from swh.graph.naive_client import NaiveClient as GraphClient # Create objects: date = TimestampWithTimezone.from_datetime( datetime.datetime(2021, 5, 7, 8, 43, 59, tzinfo=datetime.timezone.utc) ) author = Person.from_fullname(b"Foo ") cnt1 = Content.from_data(b"correct") cnt2 = Content.from_data(b"horse") cnt3 = Content.from_data(b"battery") cnt4 = Content.from_data(b"staple") cnt5 = Content.from_data(b"Tr0ub4dor&3") dir1 = Directory( entries=( DirectoryEntry( name=b"file1", type="file", perms=DentryPerms.content, target=cnt1.sha1_git, ), ) ) dir2 = Directory( entries=( DirectoryEntry( name=b"file1", type="file", perms=DentryPerms.content, target=cnt1.sha1_git, ), DirectoryEntry( name=b"file2", type="file", perms=DentryPerms.content, target=cnt2.sha1_git, ), ) ) dir3 = Directory( entries=( DirectoryEntry( name=b"file3", type="file", perms=DentryPerms.content, target=cnt3.sha1_git, ), ) ) dir4 = Directory( entries=( DirectoryEntry( name=b"directory3", type="dir", perms=DentryPerms.directory, target=dir3.id, ), ) ) rev1 = Revision( message=b"msg1", date=date, committer_date=date, author=author, committer=author, directory=dir1.id, type=RevisionType.GIT, synthetic=True, ) rev2 = Revision( message=b"msg2", date=date, committer_date=date, author=author, committer=author, directory=dir2.id, parents=(rev1.id,), type=RevisionType.GIT, synthetic=True, ) rel2 = Release( name=b"1.0.0", message=b"tag2", target_type=ObjectType.REVISION, target=rev2.id, synthetic=True, ) rel3 = Release( name=b"1.0.0-blob", message=b"tagged-blob", target_type=ObjectType.CONTENT, target=cnt5.sha1_git, synthetic=True, ) rel4 = Release( name=b"1.0.0-weird", message=b"weird release", target_type=ObjectType.RELEASE, target=rel3.id, synthetic=True, ) + rel5 = Release( + name=b"1.0.0:weirdname", + message=b"weird release", + target_type=ObjectType.RELEASE, + target=rel2.id, + synthetic=True, + ) # Create snapshot: branches = { b"refs/heads/master": SnapshotBranch( target=rev2.id, target_type=TargetType.REVISION ), } if tag: branches[b"refs/tags/1.0.0"] = SnapshotBranch( target=rel2.id, target_type=TargetType.RELEASE ) if weird_branches: branches[b"refs/heads/tree-ref"] = SnapshotBranch( target=dir4.id, target_type=TargetType.DIRECTORY ) branches[b"refs/heads/blob-ref"] = SnapshotBranch( target=cnt4.sha1_git, target_type=TargetType.CONTENT ) branches[b"refs/tags/1.0.0-weird"] = SnapshotBranch( target=rel4.id, target_type=TargetType.RELEASE ) snp = Snapshot(branches=branches) # "Fill" swh-graph if up_to_date_graph: nodes = [cnt1, cnt2, dir1, dir2, rev1, rev2, snp] edges = [ (dir1, cnt1), (dir2, cnt1), (dir2, cnt2), (rev1, dir1), (rev2, dir2), (rev2, rev1), (snp, rev2), ] if tag: nodes.append(rel2) edges.append((rel2, rev2)) edges.append((snp, rel2)) if weird_branches: - nodes.extend([cnt3, cnt4, cnt5, dir3, dir4, rel3, rel4]) + nodes.extend([cnt3, cnt4, cnt5, dir3, dir4, rel3, rel4, rel5]) edges.extend( [ (dir3, cnt3), (dir4, dir3), (snp, dir4), (snp, cnt4), (snp, rel4), (rel4, rel3), (rel3, cnt5), + (rel5, rev2), ] ) else: nodes = [cnt1, cnt2, cnt3, dir1, dir2, dir3, rev1] edges = [ (dir1, cnt1), (dir2, cnt1), (dir2, cnt2), (dir3, cnt3), (rev1, dir1), ] if tag: nodes.append(rel2) if weird_branches: nodes.extend([cnt3, dir3]) edges.extend([(dir3, cnt3)]) nodes = [str(n.swhid()) for n in nodes] edges = [(str(s.swhid()), str(d.swhid())) for (s, d) in edges] # Add all objects to storage swh_storage.content_add([cnt1, cnt2, cnt3, cnt4, cnt5]) swh_storage.directory_add([dir1, dir2, dir3, dir4]) swh_storage.revision_add([rev1, rev2]) - swh_storage.release_add([rel2, rel3, rel4]) + swh_storage.release_add([rel2, rel3, rel4, rel5]) swh_storage.snapshot_add([snp]) # Add spy on swh_storage, to make sure revision_log is not called # (the graph must be used instead) swh_storage = unittest.mock.MagicMock(wraps=swh_storage) # Add all objects to graph swh_graph = unittest.mock.Mock(wraps=GraphClient(nodes=nodes, edges=edges)) # Cook backend = InMemoryVaultBackend() - if snapshot: - cooked_swhid = snp.swhid() - else: - cooked_swhid = rev2.swhid() + cooked_swhid = { + RootObjects.SNAPSHOT: snp.swhid(), + RootObjects.REVISION: rev2.swhid(), + RootObjects.RELEASE: rel2.swhid(), + RootObjects.WEIRD_RELEASE: rel5.swhid(), + }[root_object] cooker = GitBareCooker( cooked_swhid, backend=backend, storage=swh_storage, graph=swh_graph, ) if weird_branches: # git-fsck now rejects refs pointing to trees and blobs, # but some old git repos have them. cooker.use_fsck = False cooker.cook() # Get bundle bundle = backend.fetch("git_bare", cooked_swhid) # Extract bundle and make sure both revisions are in it with tempfile.TemporaryDirectory("swh-vault-test-bare") as tempdir: with tarfile.open(fileobj=io.BytesIO(bundle)) as tf: tf.extractall(tempdir) + if root_object in (RootObjects.SNAPSHOT, RootObjects.REVISION): + log_head = "master" + elif root_object == RootObjects.RELEASE: + log_head = "1.0.0" + elif root_object == RootObjects.WEIRD_RELEASE: + log_head = "release" + else: + assert False, root_object + output = subprocess.check_output( [ "git", "-C", f"{tempdir}/{cooked_swhid}.git", "log", "--format=oneline", "--decorate=", + log_head, ] ) assert output.decode() == f"{rev2.id.hex()} msg2\n{rev1.id.hex()} msg1\n" # Make sure the graph was used instead of swh_storage.revision_log - if snapshot: + if root_object == RootObjects.SNAPSHOT: if up_to_date_graph: # The graph has everything, so the first call succeeds and returns # all objects transitively pointed by the snapshot swh_graph.visit_nodes.assert_has_calls( [unittest.mock.call(str(snp.swhid()), edges="snp:*,rel:*,rev:rev"),] ) else: # The graph does not have everything, so the first call returns nothing. # However, the second call (on the top rev) succeeds and returns # all objects but the rev and the rel swh_graph.visit_nodes.assert_has_calls( [ unittest.mock.call(str(snp.swhid()), edges="snp:*,rel:*,rev:rev"), unittest.mock.call(str(rev2.swhid()), edges="rev:rev"), ] ) - else: + elif root_object in ( + RootObjects.REVISION, + RootObjects.RELEASE, + RootObjects.WEIRD_RELEASE, + ): swh_graph.visit_nodes.assert_has_calls( [unittest.mock.call(str(rev2.swhid()), edges="rev:rev")] ) + else: + assert False, root_object + if up_to_date_graph: swh_storage.revision_log.assert_not_called() swh_storage.revision_shortlog.assert_not_called() else: swh_storage.revision_log.assert_called() @pytest.mark.parametrize( "mismatch_on", ["content", "directory", "revision1", "revision2", "none"] ) def test_checksum_mismatch(swh_storage, mismatch_on): date = TimestampWithTimezone.from_datetime( datetime.datetime(2021, 5, 7, 8, 43, 59, tzinfo=datetime.timezone.utc) ) author = Person.from_fullname(b"Foo ") wrong_hash = b"\x12\x34" * 10 cnt1 = Content.from_data(b"Tr0ub4dor&3") if mismatch_on == "content": cnt1 = attr.evolve(cnt1, sha1_git=wrong_hash) dir1 = Directory( entries=( DirectoryEntry( name=b"file1", type="file", perms=DentryPerms.content, target=cnt1.sha1_git, ), ) ) if mismatch_on == "directory": dir1 = attr.evolve(dir1, id=wrong_hash) rev1 = Revision( message=b"msg1", date=date, committer_date=date, author=author, committer=author, directory=dir1.id, type=RevisionType.GIT, synthetic=True, ) if mismatch_on == "revision1": rev1 = attr.evolve(rev1, id=wrong_hash) rev2 = Revision( message=b"msg2", date=date, committer_date=date, author=author, committer=author, directory=dir1.id, parents=(rev1.id,), type=RevisionType.GIT, synthetic=True, ) if mismatch_on == "revision2": rev2 = attr.evolve(rev2, id=wrong_hash) cooked_swhid = rev2.swhid() swh_storage.content_add([cnt1]) swh_storage.directory_add([dir1]) swh_storage.revision_add([rev1, rev2]) backend = InMemoryVaultBackend() cooker = GitBareCooker( cooked_swhid, backend=backend, storage=swh_storage, graph=None, ) cooker.cook() # Get bundle bundle = backend.fetch("git_bare", cooked_swhid) # Extract bundle and make sure both revisions are in it with tempfile.TemporaryDirectory("swh-vault-test-bare") as tempdir: with tarfile.open(fileobj=io.BytesIO(bundle)) as tf: tf.extractall(tempdir) if mismatch_on != "revision2": # git-log fails if the head revision is corrupted # TODO: we need to find a way to make this somewhat usable output = subprocess.check_output( [ "git", "-C", f"{tempdir}/{cooked_swhid}.git", "log", "--format=oneline", "--decorate=", ] ) assert output.decode() == f"{rev2.id.hex()} msg2\n{rev1.id.hex()} msg1\n" @pytest.mark.parametrize( "use_graph", [ pytest.param(False, id="without-graph"), pytest.param(True, id="with-graph", marks=pytest.mark.graph), ], ) def test_ignore_displayname(swh_storage, use_graph): """Tests the original authorship information is used instead of configured display names; otherwise objects would not match their hash, and git-fsck/git-clone would fail. This tests both with and without swh-graph, as both configurations use different code paths to fetch revisions. """ date = TimestampWithTimezone.from_numeric_offset(Timestamp(1643882820, 0), 0, False) legacy_person = Person.from_fullname(b"old me ") current_person = Person.from_fullname(b"me ") content = Content.from_data(b"foo") swh_storage.content_add([content]) directory = Directory( entries=( DirectoryEntry( name=b"file1", type="file", perms=0o100644, target=content.sha1_git ), ), ) swh_storage.directory_add([directory]) revision = Revision( message=b"rev", author=legacy_person, date=date, committer=legacy_person, committer_date=date, parents=(), type=RevisionType.GIT, directory=directory.id, synthetic=True, ) swh_storage.revision_add([revision]) release = Release( name=b"v1.1.0", message=None, author=legacy_person, date=date, target=revision.id, target_type=ObjectType.REVISION, synthetic=True, ) swh_storage.release_add([release]) snapshot = Snapshot( branches={ b"refs/tags/v1.1.0": SnapshotBranch( target=release.id, target_type=TargetType.RELEASE ), b"HEAD": SnapshotBranch( target=revision.id, target_type=TargetType.REVISION ), } ) swh_storage.snapshot_add([snapshot]) # Add all objects to graph if use_graph: from swh.graph.naive_client import NaiveClient as GraphClient nodes = [ str(x.swhid()) for x in [content, directory, revision, release, snapshot] ] edges = [ (str(x.swhid()), str(y.swhid())) for (x, y) in [ (directory, content), (revision, directory), (release, revision), (snapshot, release), (snapshot, revision), ] ] swh_graph = unittest.mock.Mock(wraps=GraphClient(nodes=nodes, edges=edges)) else: swh_graph = None # Set a display name with swh_storage.db() as db: with db.transaction() as cur: cur.execute( "UPDATE person set displayname = %s where fullname = %s", (current_person.fullname, legacy_person.fullname), ) # Check the display name did apply in the storage assert swh_storage.revision_get([revision.id])[0] == attr.evolve( revision, author=current_person, committer=current_person, ) # Cook cooked_swhid = snapshot.swhid() backend = InMemoryVaultBackend() cooker = GitBareCooker( cooked_swhid, backend=backend, storage=swh_storage, graph=swh_graph, ) cooker.cook() # Get bundle bundle = backend.fetch("git_bare", cooked_swhid) # Extract bundle and make sure both revisions are in it with tempfile.TemporaryDirectory("swh-vault-test-bare") as tempdir: with tarfile.open(fileobj=io.BytesIO(bundle)) as tf: tf.extractall(tempdir) # If we are here, it means git-fsck succeeded when called by cooker.cook(), # so we already know the original person was used. Let's double-check. repo = dulwich.repo.Repo(f"{tempdir}/{cooked_swhid}.git") tag = repo[b"refs/tags/v1.1.0"] assert tag.tagger == legacy_person.fullname commit = repo[tag.object[1]] assert commit.author == legacy_person.fullname diff --git a/swh/vault/tests/test_server.py b/swh/vault/tests/test_server.py index aed9e67..fbd5c12 100644 --- a/swh/vault/tests/test_server.py +++ b/swh/vault/tests/test_server.py @@ -1,162 +1,166 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import os from typing import Any, Dict import pytest import yaml from swh.core.api.serializers import json_dumps, msgpack_dumps, msgpack_loads from swh.vault.api.serializers import ENCODERS +import swh.vault.api.server from swh.vault.api.server import ( VaultServerApp, check_config, make_app, make_app_from_configfile, ) from swh.vault.tests.test_backend import TEST_SWHID def test_make_app_from_file_missing(): with pytest.raises(ValueError, match="Missing configuration path."): make_app_from_configfile() def test_make_app_from_file_does_not_exist(tmp_path): conf_path = os.path.join(str(tmp_path), "vault-server.yml") assert os.path.exists(conf_path) is False with pytest.raises( ValueError, match=f"Configuration path {conf_path} should exist." ): make_app_from_configfile(conf_path) def test_make_app_from_env_variable(swh_vault_config_file): """Server initialization happens through env variable when no path is provided """ app = make_app_from_configfile() assert app is not None def test_make_app_from_file(swh_local_vault_config, tmp_path): - """Server initialization happens trough path if provided + """Server initialization happens through path if provided """ conf_path = os.path.join(str(tmp_path), "vault-server.yml") with open(conf_path, "w") as f: f.write(yaml.dump(swh_local_vault_config)) app = make_app_from_configfile(conf_path) assert app is not None @pytest.fixture def async_app(swh_local_vault_config: Dict[str, Any],) -> VaultServerApp: """Instantiate the vault server application. Note: This requires the db setup to run (fixture swh_vault in charge of this) """ + # make sure a new VaultBackend is instantiated for each test to prevent + # side effects between tests + swh.vault.api.server.vault = None return make_app(swh_local_vault_config) @pytest.fixture def cli(async_app, aiohttp_client, loop): return loop.run_until_complete(aiohttp_client(async_app)) async def test_client_index(cli): resp = await cli.get("/") assert resp.status == 200 async def test_client_cook_notfound(cli): resp = await cli.post( "/cook", data=json_dumps( {"bundle_type": "flat", "swhid": TEST_SWHID}, extra_encoders=ENCODERS ), headers=[("Content-Type", "application/json")], ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["type"] == "NotFoundExc" assert content["args"] == [f"flat {TEST_SWHID} was not found."] async def test_client_progress_notfound(cli): resp = await cli.post( "/progress", data=json_dumps( {"bundle_type": "flat", "swhid": TEST_SWHID}, extra_encoders=ENCODERS ), headers=[("Content-Type", "application/json")], ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["type"] == "NotFoundExc" assert content["args"] == [f"flat {TEST_SWHID} was not found."] async def test_client_batch_cook_invalid_type(cli): resp = await cli.post( "/batch_cook", data=msgpack_dumps({"batch": [("foobar", [])]}), headers={"Content-Type": "application/x-msgpack"}, ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["type"] == "NotFoundExc" assert content["args"] == ["foobar is an unknown type."] async def test_client_batch_progress_notfound(cli): resp = await cli.post( "/batch_progress", data=msgpack_dumps({"batch_id": 1}), headers={"Content-Type": "application/x-msgpack"}, ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["type"] == "NotFoundExc" assert content["args"] == ["Batch 1 does not exist."] def test_check_config_missing_vault_configuration() -> None: """Irrelevant configuration file path raises""" with pytest.raises(ValueError, match="missing 'vault' configuration"): check_config({}) def test_check_config_not_local() -> None: """Wrong configuration raises""" expected_error = ( "The vault backend can only be started with a 'local' configuration" ) with pytest.raises(EnvironmentError, match=expected_error): check_config({"vault": {"cls": "remote"}}) @pytest.mark.parametrize("missing_key", ["storage", "cache", "scheduler"]) def test_check_config_missing_key(missing_key, swh_vault_config) -> None: """Any other configuration than 'local' (the default) is rejected""" config_ok = {"vault": {"cls": "local", **swh_vault_config}} config_ko = copy.deepcopy(config_ok) config_ko["vault"].pop(missing_key, None) expected_error = f"invalid configuration: missing {missing_key} config entry" with pytest.raises(ValueError, match=expected_error): check_config(config_ko) @pytest.mark.parametrize("missing_key", ["storage", "cache", "scheduler"]) def test_check_config_ok(missing_key, swh_vault_config) -> None: """Any other configuration than 'local' (the default) is rejected""" config_ok = {"vault": {"cls": "local", **swh_vault_config}} assert check_config(config_ok) is not None