diff --git a/PKG-INFO b/PKG-INFO index 351c8fb..4e10db7 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,31 +1,35 @@ Metadata-Version: 2.1 Name: swh.vault -Version: 0.5.1 +Version: 0.6.0 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-vault Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-vault/ -Description: swh-vault - ========= - - User-facing service that allows to retrieve parts of the archive as - self-contained bundles. - - See the - [documentation](https://docs.softwareheritage.org/devel/swh-vault/index.html) - for more details. - Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing +License-File: LICENSE +License-File: AUTHORS + +swh-vault +========= + +User-facing service that allows to retrieve parts of the archive as +self-contained bundles. + +See the +[documentation](https://docs.softwareheritage.org/devel/swh-vault/index.html) +for more details. + + diff --git a/requirements-swh.txt b/requirements-swh.txt index 6a1f292..cc81268 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,6 @@ -swh.core[db,http] >= 0.5 +swh.core[db,http] >= 0.14.0 +swh.graph >= v0.3.2 swh.model >= 0.3 swh.objstorage >= 0.0.17 swh.scheduler >= 0.7.0 -swh.storage >= 0.0.106 +swh.storage >= 0.29.0 diff --git a/requirements-test.txt b/requirements-test.txt index 6e6306f..9c23fb1 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,6 +1,10 @@ pytest dulwich >= 0.18.7 swh.loader.core swh.loader.git >= 0.8 swh.storage[testing] pytest-mock +types-click +types-python-dateutil +types-pyyaml +types-requests diff --git a/swh.vault.egg-info/PKG-INFO b/swh.vault.egg-info/PKG-INFO index 351c8fb..4e10db7 100644 --- a/swh.vault.egg-info/PKG-INFO +++ b/swh.vault.egg-info/PKG-INFO @@ -1,31 +1,35 @@ Metadata-Version: 2.1 Name: swh.vault -Version: 0.5.1 +Version: 0.6.0 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-vault Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-vault/ -Description: swh-vault - ========= - - User-facing service that allows to retrieve parts of the archive as - self-contained bundles. - - See the - [documentation](https://docs.softwareheritage.org/devel/swh-vault/index.html) - for more details. - Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing +License-File: LICENSE +License-File: AUTHORS + +swh-vault +========= + +User-facing service that allows to retrieve parts of the archive as +self-contained bundles. + +See the +[documentation](https://docs.softwareheritage.org/devel/swh-vault/index.html) +for more details. + + diff --git a/swh.vault.egg-info/SOURCES.txt b/swh.vault.egg-info/SOURCES.txt index e24b29f..03bb049 100644 --- a/swh.vault.egg-info/SOURCES.txt +++ b/swh.vault.egg-info/SOURCES.txt @@ -1,68 +1,72 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile README.md conftest.py mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/api.rst docs/cli.rst docs/conf.py docs/getting-started.rst docs/index.rst docs/_static/.placeholder docs/_templates/.placeholder sql/upgrades/002.sql sql/upgrades/003.sql swh/__init__.py swh.vault.egg-info/PKG-INFO swh.vault.egg-info/SOURCES.txt swh.vault.egg-info/dependency_links.txt swh.vault.egg-info/entry_points.txt swh.vault.egg-info/not-zip-safe swh.vault.egg-info/requires.txt swh.vault.egg-info/top_level.txt swh/vault/__init__.py swh/vault/backend.py swh/vault/cache.py swh/vault/cli.py swh/vault/cooking_tasks.py swh/vault/exc.py +swh/vault/in_memory_backend.py swh/vault/interface.py swh/vault/py.typed swh/vault/to_disk.py swh/vault/api/__init__.py swh/vault/api/client.py swh/vault/api/server.py swh/vault/cookers/__init__.py swh/vault/cookers/base.py swh/vault/cookers/directory.py +swh/vault/cookers/git_bare.py swh/vault/cookers/revision_flat.py swh/vault/cookers/revision_gitfast.py swh/vault/cookers/utils.py swh/vault/sql/30-schema.sql swh/vault/tests/__init__.py swh/vault/tests/conftest.py swh/vault/tests/test_backend.py swh/vault/tests/test_cache.py +swh/vault/tests/test_cli.py swh/vault/tests/test_cookers.py swh/vault/tests/test_cookers_base.py +swh/vault/tests/test_git_bare_cooker.py swh/vault/tests/test_init.py swh/vault/tests/test_init_cookers.py swh/vault/tests/test_server.py swh/vault/tests/test_to_disk.py swh/vault/tests/vault_testing.py \ No newline at end of file diff --git a/swh.vault.egg-info/requires.txt b/swh.vault.egg-info/requires.txt index 64ca232..3af889f 100644 --- a/swh.vault.egg-info/requires.txt +++ b/swh.vault.egg-info/requires.txt @@ -1,19 +1,24 @@ click flask psycopg2 python-dateutil fastimport typing-extensions -swh.core[db,http]>=0.5 +swh.core[db,http]>=0.14.0 +swh.graph>=v0.3.2 swh.model>=0.3 swh.objstorage>=0.0.17 swh.scheduler>=0.7.0 -swh.storage>=0.0.106 +swh.storage>=0.29.0 [testing] pytest dulwich>=0.18.7 swh.loader.core swh.loader.git>=0.8 swh.storage[testing] pytest-mock +types-click +types-python-dateutil +types-pyyaml +types-requests diff --git a/swh/__init__.py b/swh/__init__.py index f14e196..8d9f151 100644 --- a/swh/__init__.py +++ b/swh/__init__.py @@ -1,4 +1,4 @@ from pkgutil import extend_path -from typing import Iterable +from typing import List -__path__ = extend_path(__path__, __name__) # type: Iterable[str] +__path__: List[str] = extend_path(__path__, __name__) diff --git a/swh/vault/__init__.py b/swh/vault/__init__.py index db16ff9..5a57fd7 100644 --- a/swh/vault/__init__.py +++ b/swh/vault/__init__.py @@ -1,54 +1,55 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations import importlib import logging from typing import Dict import warnings logger = logging.getLogger(__name__) BACKEND_TYPES: Dict[str, str] = { "remote": ".api.client.RemoteVaultClient", "local": ".backend.VaultBackend", + "memory": ".in_memory_backend.InMemoryVaultBackend", } def get_vault(cls: str = "remote", **kwargs): """ Get a vault object of class `vault_class` with arguments `vault_args`. Args: cls: vault's class, either 'remote' or 'local' kwargs: arguments to pass to the class' constructor Returns: an instance of VaultBackend (either local or remote) Raises: ValueError if passed an unknown storage class. """ if "args" in kwargs: warnings.warn( 'Explicit "args" key is deprecated, use keys directly instead.', DeprecationWarning, ) kwargs = kwargs["args"] class_path = BACKEND_TYPES.get(cls) if class_path is None: raise ValueError( f"Unknown Vault class `{cls}`. " f"Supported: {', '.join(BACKEND_TYPES)}" ) (module_path, class_name) = class_path.rsplit(".", 1) module = importlib.import_module(module_path, package=__package__) Vault = getattr(module, class_name) return Vault(**kwargs) diff --git a/swh/vault/backend.py b/swh/vault/backend.py index 3009cdd..4812f0b 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,552 +1,552 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections from email.mime.text import MIMEText import smtplib from typing import Any, Dict, List, Optional, Tuple import psycopg2.extras import psycopg2.pool from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.model import hashutil from swh.scheduler import get_scheduler from swh.scheduler.utils import create_oneshot_task_dict from swh.storage import get_storage from swh.vault.cache import VaultCache from swh.vault.cookers import COOKER_TYPES, get_cooker_cls from swh.vault.exc import NotFoundExc from swh.vault.interface import ObjectId cooking_task_name = "swh.vault.cooking_tasks.SWHCookingTask" NOTIF_EMAIL_FROM = '"Software Heritage Vault" ' "" NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {obj_type} {short_id}" NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {obj_type} {short_id}" NOTIF_EMAIL_BODY_SUCCESS = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ NOTIF_EMAIL_BODY_FAILURE = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle could not be cooked for the following reason: {progress_msg} We apologize for the inconvenience. --\x20 The Software Heritage Developers """ def batch_to_bytes(batch: List[Tuple[str, str]]) -> List[Tuple[str, bytes]]: return [(obj_type, hashutil.hash_to_bytes(hex_id)) for obj_type, hex_id in batch] class VaultBackend: """ Backend for the Software Heritage Vault. """ def __init__(self, **config): self.config = config self.cache = VaultCache(**config["cache"]) self.scheduler = get_scheduler(**config["scheduler"]) self.storage = get_storage(**config["storage"]) self.smtp_server = smtplib.SMTP() db_conn = config["db"] self._pool = psycopg2.pool.ThreadedConnectionPool( config.get("min_pool_conns", 1), config.get("max_pool_conns", 10), db_conn, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() def _compute_ids(self, obj_id: ObjectId) -> Tuple[str, bytes]: """Internal method to reconcile multiple possible inputs """ if isinstance(obj_id, str): return obj_id, hashutil.hash_to_bytes(obj_id) return hashutil.hash_to_hex(obj_id), obj_id @db_transaction() def progress( self, obj_type: str, obj_id: ObjectId, raise_notfound: bool = True, db=None, cur=None, ) -> Optional[Dict[str, Any]]: hex_id, obj_id = self._compute_ids(obj_id) cur.execute( """ SELECT id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s""", (obj_type, obj_id), ) res = cur.fetchone() if not res: if raise_notfound: raise NotFoundExc(f"{obj_type} {hex_id} was not found.") return None res["object_id"] = hashutil.hash_to_hex(res["object_id"]) return res def _send_task(self, obj_type: str, hex_id: ObjectId): """Send a cooking task to the celery scheduler""" task = create_oneshot_task_dict("cook-vault-bundle", obj_type, hex_id) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]["id"] @db_transaction() def create_task( self, obj_type: str, obj_id: bytes, sticky: bool = False, db=None, cur=None ): """Create and send a cooking task""" hex_id, obj_id = self._compute_ids(obj_id) cooker_class = get_cooker_cls(obj_type) cooker = cooker_class(obj_type, hex_id, backend=self, storage=self.storage) if not cooker.check_exists(): raise NotFoundExc(f"{obj_type} {hex_id} was not found.") cur.execute( """ INSERT INTO vault_bundle (type, object_id, sticky) VALUES (%s, %s, %s)""", (obj_type, obj_id, sticky), ) db.conn.commit() task_id = self._send_task(obj_type, hex_id) cur.execute( """ UPDATE vault_bundle SET task_id = %s WHERE type = %s AND object_id = %s""", (task_id, obj_type, obj_id), ) @db_transaction() def add_notif_email( self, obj_type: str, obj_id: bytes, email: str, db=None, cur=None ): """Add an e-mail address to notify when a given bundle is ready""" cur.execute( """ INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))""", (email, obj_type, obj_id), ) def put_bundle(self, obj_type: str, obj_id: ObjectId, bundle) -> bool: _, obj_id = self._compute_ids(obj_id) self.cache.add(obj_type, obj_id, bundle) return True @db_transaction() def cook( self, obj_type: str, obj_id: ObjectId, *, sticky: bool = False, email: Optional[str] = None, db=None, cur=None, ) -> Dict[str, Any]: hex_id, obj_id = self._compute_ids(obj_id) info = self.progress(obj_type, obj_id, raise_notfound=False) if obj_type not in COOKER_TYPES: raise NotFoundExc(f"{obj_type} is an unknown type.") # If there's a failed bundle entry, delete it first. if info is not None and info["task_status"] == "failed": obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( "DELETE FROM vault_bundle WHERE type = %s AND object_id = %s", (obj_type, obj_id), ) db.conn.commit() info = None # If there's no bundle entry, create the task. if info is None: self.create_task(obj_type, obj_id, sticky) if email is not None: # If the task is already done, send the email directly if info is not None and info["task_status"] == "done": self.send_notification( None, email, obj_type, hex_id, info["task_status"] ) # Else, add it to the notification queue else: self.add_notif_email(obj_type, obj_id, email) return self.progress(obj_type, obj_id) @db_transaction() def batch_cook( self, batch: List[Tuple[str, str]], db=None, cur=None ) -> Dict[str, int]: # Import execute_values at runtime only, because it requires # psycopg2 >= 2.7 (only available on postgresql servers) from psycopg2.extras import execute_values for obj_type, _ in batch: if obj_type not in COOKER_TYPES: raise NotFoundExc(f"{obj_type} is an unknown type.") cur.execute( """ INSERT INTO vault_batch (id) VALUES (DEFAULT) RETURNING id""" ) batch_id = cur.fetchone()["id"] batch_bytes = batch_to_bytes(batch) # Delete all failed bundles from the batch cur.execute( """ DELETE FROM vault_bundle WHERE task_status = 'failed' AND (type, object_id) IN %s""", (tuple(batch_bytes),), ) # Insert all the bundles, return the new ones execute_values( cur, """ INSERT INTO vault_bundle (type, object_id) VALUES %s ON CONFLICT DO NOTHING""", batch_bytes, ) # Get the bundle ids and task status cur.execute( """ SELECT id, type, object_id, task_id FROM vault_bundle WHERE (type, object_id) IN %s""", (tuple(batch_bytes),), ) bundles = cur.fetchall() # Insert the batch-bundle entries batch_id_bundle_ids = [(batch_id, row["id"]) for row in bundles] execute_values( cur, """ INSERT INTO vault_batch_bundle (batch_id, bundle_id) VALUES %s ON CONFLICT DO NOTHING""", batch_id_bundle_ids, ) db.conn.commit() # Get the tasks to fetch batch_new = [ (row["type"], bytes(row["object_id"])) for row in bundles if row["task_id"] is None ] # Send the tasks args_batch = [ (obj_type, hashutil.hash_to_hex(obj_id)) for obj_type, obj_id in batch_new ] # TODO: change once the scheduler handles priority tasks tasks = [ create_oneshot_task_dict("swh-vault-batch-cooking", *args) for args in args_batch ] added_tasks = self.scheduler.create_tasks(tasks) tasks_ids_bundle_ids = [ (task_id, obj_type, obj_id) for task_id, (obj_type, obj_id) in zip( [task["id"] for task in added_tasks], batch_new ) ] # Update the task ids execute_values( cur, """ UPDATE vault_bundle SET task_id = s_task_id FROM (VALUES %s) AS sub (s_task_id, s_type, s_object_id) WHERE type = s_type::cook_type AND object_id = s_object_id """, tasks_ids_bundle_ids, ) return {"id": batch_id} @db_transaction() def batch_progress(self, batch_id: int, db=None, cur=None) -> Dict[str, Any]: cur.execute( """ SELECT vault_bundle.id as id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_batch_bundle LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id WHERE batch_id = %s""", (batch_id,), ) bundles = cur.fetchall() if not bundles: raise NotFoundExc(f"Batch {batch_id} does not exist.") for bundle in bundles: bundle["object_id"] = hashutil.hash_to_hex(bundle["object_id"]) counter = collections.Counter(b["status"] for b in bundles) res = { "bundles": bundles, "total": len(bundles), **{k: 0 for k in ("new", "pending", "done", "failed")}, **dict(counter), } return res @db_transaction() def is_available(self, obj_type: str, obj_id: ObjectId, db=None, cur=None): """Check whether a bundle is available for retrieval""" info = self.progress(obj_type, obj_id, raise_notfound=False, cur=cur) obj_id = hashutil.hash_to_bytes(obj_id) return ( info is not None and info["task_status"] == "done" and self.cache.is_cached(obj_type, obj_id) ) @db_transaction() def fetch( self, obj_type: str, obj_id: ObjectId, raise_notfound=True, db=None, cur=None - ): + ) -> Optional[bytes]: """Retrieve a bundle from the cache""" hex_id, obj_id = self._compute_ids(obj_id) available = self.is_available(obj_type, obj_id, cur=cur) if not available: if raise_notfound: raise NotFoundExc(f"{obj_type} {hex_id} is not available.") return None self.update_access_ts(obj_type, obj_id, cur=cur) return self.cache.get(obj_type, obj_id) @db_transaction() def update_access_ts(self, obj_type: str, obj_id: bytes, db=None, cur=None): """Update the last access timestamp of a bundle""" cur.execute( """ UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s""", (obj_type, obj_id), ) @db_transaction() def set_status( self, obj_type: str, obj_id: ObjectId, status: str, db=None, cur=None ) -> bool: obj_id = hashutil.hash_to_bytes(obj_id) req = ( """ UPDATE vault_bundle SET task_status = %s """ + (""", ts_done = NOW() """ if status == "done" else "") + """WHERE type = %s AND object_id = %s""" ) cur.execute(req, (status, obj_type, obj_id)) return True @db_transaction() def set_progress( self, obj_type: str, obj_id: ObjectId, progress: str, db=None, cur=None ) -> bool: obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( """ UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s""", (progress, obj_type, obj_id), ) return True @db_transaction() def send_notif(self, obj_type: str, obj_id: ObjectId, db=None, cur=None) -> bool: hex_id, obj_id = self._compute_ids(obj_id) cur.execute( """ SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s""", (obj_type, obj_id), ) for d in cur: self.send_notification( d["id"], d["email"], obj_type, hex_id, status=d["task_status"], progress_msg=d["progress_msg"], ) return True @db_transaction() def send_notification( self, n_id: Optional[int], email: str, obj_type: str, hex_id: str, status: str, progress_msg: Optional[str] = None, db=None, cur=None, ) -> None: """Send the notification of a bundle to a specific e-mail""" short_id = hex_id[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = "https://archive.softwareheritage.org/api/1/vault/{}/{}/" "raw".format( obj_type, hex_id ) if status == "done": text = NOTIF_EMAIL_BODY_SUCCESS.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_SUCCESS.format( obj_type=obj_type, short_id=short_id ) elif status == "failed": text = NOTIF_EMAIL_BODY_FAILURE.strip() text = text.format( obj_type=obj_type, hex_id=hex_id, progress_msg=progress_msg ) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_FAILURE.format( obj_type=obj_type, short_id=short_id ) else: raise RuntimeError( "send_notification called on a '{}' bundle".format(status) ) msg["From"] = NOTIF_EMAIL_FROM msg["To"] = email self._smtp_send(msg) if n_id is not None: cur.execute( """ DELETE FROM vault_notif_email WHERE id = %s""", (n_id,), ) def _smtp_send(self, msg: MIMEText): # Reconnect if needed try: status = self.smtp_server.noop()[0] except smtplib.SMTPException: status = -1 if status != 250: self.smtp_server.connect("localhost", 25) # Send the message self.smtp_server.send_message(msg) @db_transaction() def _cache_expire(self, cond, *args, db=None, cur=None) -> None: """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cur.execute( """ DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, object_id """.format( cond ), args, ) for d in cur: self.cache.delete(d["type"], bytes(d["object_id"])) @db_transaction() def cache_expire_oldest(self, n=1, by="last_access", db=None, cur=None) -> None: """Expire the `n` oldest bundles""" assert by in ("created", "done", "last_access") filter = """ORDER BY ts_{} LIMIT {}""".format(by, n) return self._cache_expire(filter) @db_transaction() def cache_expire_until(self, date, by="last_access", db=None, cur=None) -> None: """Expire all the bundles until a certain date""" assert by in ("created", "done", "last_access") filter = """AND ts_{} <= %s""".format(by) return self._cache_expire(filter, date) diff --git a/swh/vault/cli.py b/swh/vault/cli.py index e99322b..8cc4e35 100644 --- a/swh/vault/cli.py +++ b/swh/vault/cli.py @@ -1,76 +1,176 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from __future__ import annotations + # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging +from typing import TYPE_CHECKING, Optional import click from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.core.cli import swh as swh_cli_group -CFG_HELP = """Software Heritage Vault RPC server.""" +if TYPE_CHECKING: + import io + + from swh.model.identifiers import CoreSWHID + + +class SwhidParamType(click.ParamType): + name = "swhid" + + def convert(self, value, param, ctx): + from swh.model.exceptions import ValidationError + from swh.model.identifiers import CoreSWHID + + try: + return CoreSWHID.from_string(value) + except ValidationError: + self.fail(f"expected core SWHID, got {value!r}", param, ctx) @swh_cli_group.group(name="vault", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) @click.pass_context def vault(ctx): """Software Heritage Vault tools.""" - pass -@vault.command(name="rpc-serve", help=CFG_HELP) +@vault.command() +@click.option( + "--config-file", + "-C", + default=None, + metavar="CONFIGFILE", + type=click.Path(exists=True, dir_okay=False,), + help="Configuration file.", +) +@click.argument("swhid", type=SwhidParamType()) +@click.argument("outfile", type=click.File("wb")) +@click.option( + "--cooker-type", + type=click.Choice(["flat", "gitfast", "git_bare"]), + help="Selects which cooker to use, when there is more than one available " + "for the given object type.", +) +@click.pass_context +def cook( + ctx, + config_file: str, + swhid: CoreSWHID, + outfile: io.RawIOBase, + cooker_type: Optional[str], +): + """ + Runs a vault cooker for a single object (identified by a SWHID), + and outputs it to the given file. + """ + from swh.core import config + from swh.graph.client import RemoteGraphClient + from swh.objstorage.factory import get_objstorage + from swh.storage import get_storage + + from .cookers import COOKER_TYPES, get_cooker_cls + from .in_memory_backend import InMemoryVaultBackend + + conf = config.read(config_file) + + supported_object_types = {name.split("_")[0] for name in COOKER_TYPES} + if swhid.object_type.name.lower() not in supported_object_types: + raise click.ClickException( + f"No cooker available for {swhid.object_type.name} objects." + ) + + cooker_name = swhid.object_type.name.lower() + + if cooker_type: + cooker_name = f"{cooker_name}_{cooker_type}" + if cooker_name not in COOKER_TYPES: + raise click.ClickException( + f"{swhid.object_type.name.lower()} objects do not have " + f"a {cooker_type} cooker." + ) + else: + if cooker_name not in COOKER_TYPES: + raise click.ClickException( + f"{swhid.object_type.name.lower()} objects need " + f"an explicit --cooker-type." + ) + + backend = InMemoryVaultBackend() + storage = get_storage(**conf["storage"]) + objstorage = get_objstorage(**conf["objstorage"]) if "objstorage" in conf else None + graph = RemoteGraphClient(**conf["graph"]) if "graph" in conf else None + cooker_cls = get_cooker_cls(cooker_name) + cooker = cooker_cls( + obj_type=cooker_name, + obj_id=swhid.object_id, + backend=backend, + storage=storage, + graph=graph, + objstorage=objstorage, + ) + cooker.cook() + + bundle = backend.fetch(cooker_name, swhid.object_id) + assert bundle, "Cooker did not write a bundle to the backend." + outfile.write(bundle) + + +@vault.command(name="rpc-serve") @click.option( "--config-file", "-C", default=None, metavar="CONFIGFILE", type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.option( "--host", default="0.0.0.0", metavar="IP", show_default=True, help="Host ip address to bind the server on", ) @click.option( "--port", default=5005, type=click.INT, metavar="PORT", help="Binding port of the server", ) @click.option( "--debug/--no-debug", default=True, help="Indicates if the server should run in debug mode", ) @click.pass_context def serve(ctx, config_file, host, port, debug): + """Software Heritage Vault RPC server.""" import aiohttp from swh.vault.api.server import make_app_from_configfile ctx.ensure_object(dict) try: app = make_app_from_configfile(config_file, debug=debug) except EnvironmentError as e: click.echo(e.msg, err=True) ctx.exit(1) aiohttp.web.run_app(app, host=host, port=int(port)) def main(): logging.basicConfig() return serve(auto_envvar_prefix="SWH_VAULT") if __name__ == "__main__": main() diff --git a/swh/vault/cookers/__init__.py b/swh/vault/cookers/__init__.py index 0a9df00..8ef8b36 100644 --- a/swh/vault/cookers/__init__.py +++ b/swh/vault/cookers/__init__.py @@ -1,96 +1,102 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations import os from typing import Any, Dict from swh.core.config import load_named_config from swh.core.config import read as read_config +from swh.graph.client import RemoteGraphClient from swh.storage import get_storage from swh.vault import get_vault from swh.vault.cookers.base import DEFAULT_CONFIG, DEFAULT_CONFIG_PATH from swh.vault.cookers.directory import DirectoryCooker +from swh.vault.cookers.git_bare import GitBareCooker from swh.vault.cookers.revision_flat import RevisionFlatCooker from swh.vault.cookers.revision_gitfast import RevisionGitfastCooker COOKER_TYPES = { "directory": DirectoryCooker, "revision_flat": RevisionFlatCooker, "revision_gitfast": RevisionGitfastCooker, + "revision_git_bare": GitBareCooker, + "directory_git_bare": GitBareCooker, } def get_cooker_cls(obj_type): return COOKER_TYPES[obj_type] def check_config(cfg: Dict[str, Any]) -> Dict[str, Any]: """Ensure the configuration is ok to run a vault worker, and propagate defaults Raises: EnvironmentError if the configuration is not for remote instance ValueError if one of the following keys is missing: vault, storage Returns: New configuration dict to instantiate a vault worker instance """ cfg = cfg.copy() if "vault" not in cfg: raise ValueError("missing 'vault' configuration") vcfg = cfg["vault"] if vcfg["cls"] != "remote": raise EnvironmentError( "This vault backend can only be a 'remote' configuration" ) # TODO: Soft-deprecation of args key. Remove when ready. vcfg.update(vcfg.get("args", {})) # Default to top-level value if any if "storage" not in vcfg: vcfg["storage"] = cfg.get("storage") if not vcfg.get("storage"): raise ValueError("invalid configuration: missing 'storage' config entry.") return cfg def get_cooker(obj_type: str, obj_id: str): """Instantiate a cooker class of type obj_type. Returns: Cooker class in charge of cooking the obj_type with id obj_id. Raises: ValueError in case of a missing top-level vault key configuration or a storage key. EnvironmentError in case the vault configuration reference a non remote class. """ if "SWH_CONFIG_FILENAME" in os.environ: cfg = read_config(os.environ["SWH_CONFIG_FILENAME"], DEFAULT_CONFIG) else: cfg = load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG) cooker_cls = get_cooker_cls(obj_type) cfg = check_config(cfg) vcfg = cfg["vault"] storage = get_storage(**vcfg.pop("storage")) backend = get_vault(**vcfg) + graph = RemoteGraphClient(**vcfg["graph"]) if "graph" in vcfg else None return cooker_cls( obj_type, obj_id, backend=backend, storage=storage, + graph=graph, max_bundle_size=cfg["max_bundle_size"], ) diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index a9c32f5..f88f49f 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,136 +1,149 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import logging from typing import Optional from psycopg2.extensions import QueryCanceledError from swh.model import hashutil MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB DEFAULT_CONFIG_PATH = "vault/cooker" DEFAULT_CONFIG = { "max_bundle_size": ("int", MAX_BUNDLE_SIZE), } class PolicyError(Exception): """Raised when the bundle violates the cooking policy.""" pass class BundleTooLargeError(PolicyError): """Raised when the bundle is too large to be cooked.""" pass class BytesIOBundleSizeLimit(io.BytesIO): def __init__(self, *args, size_limit=None, **kwargs): super().__init__(*args, **kwargs) self.size_limit = size_limit def write(self, chunk): if ( self.size_limit is not None and self.getbuffer().nbytes + len(chunk) > self.size_limit ): raise BundleTooLargeError( "The requested bundle exceeds the maximum allowed " "size of {} bytes.".format(self.size_limit) ) return super().write(chunk) class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle """ CACHE_TYPE_KEY = None # type: Optional[str] def __init__( - self, obj_type, obj_id, backend, storage, max_bundle_size=MAX_BUNDLE_SIZE + self, + obj_type, + obj_id, + backend, + storage, + graph=None, + objstorage=None, + max_bundle_size=MAX_BUNDLE_SIZE, ): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: obj_type: type of the object to be cooked into a bundle (directory, revision_flat or revision_gitfast; see swh.vault.cooker.COOKER_TYPES). obj_id: id of the object to be cooked into a bundle. backend: the vault backend (swh.vault.backend.VaultBackend). """ self.obj_type = obj_type self.obj_id = hashutil.hash_to_bytes(obj_id) self.backend = backend self.storage = storage + self.objstorage = objstorage + self.graph = graph self.max_bundle_size = max_bundle_size @abc.abstractmethod def check_exists(self): """Checks that the requested object exists and can be cooked. Override this in the cooker implementation. """ raise NotImplementedError @abc.abstractmethod def prepare_bundle(self): """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplementedError + def cache_type_key(self) -> str: + assert self.CACHE_TYPE_KEY + return self.CACHE_TYPE_KEY + def write(self, chunk): self.fileobj.write(chunk) def cook(self): """Cook the requested object into a bundle """ self.backend.set_status(self.obj_type, self.obj_id, "pending") self.backend.set_progress(self.obj_type, self.obj_id, "Processing...") self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) try: try: self.prepare_bundle() except QueryCanceledError: raise PolicyError( "Timeout reached while assembling the requested bundle" ) bundle = self.fileobj.getvalue() # TODO: use proper content streaming instead of put_bundle() - self.backend.put_bundle(self.CACHE_TYPE_KEY, self.obj_id, bundle) + self.backend.put_bundle(self.cache_type_key(), self.obj_id, bundle) except PolicyError as e: self.backend.set_status(self.obj_type, self.obj_id, "failed") self.backend.set_progress(self.obj_type, self.obj_id, str(e)) except Exception: self.backend.set_status(self.obj_type, self.obj_id, "failed") self.backend.set_progress( self.obj_type, self.obj_id, "Internal Server Error. This incident will be reported.", ) logging.exception("Bundle cooking failed.") else: self.backend.set_status(self.obj_type, self.obj_id, "done") self.backend.set_progress(self.obj_type, self.obj_id, None) finally: self.backend.send_notif(self.obj_type, self.obj_id) diff --git a/swh/vault/cookers/git_bare.py b/swh/vault/cookers/git_bare.py new file mode 100644 index 0000000..33264e9 --- /dev/null +++ b/swh/vault/cookers/git_bare.py @@ -0,0 +1,350 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +""" +This cooker creates tarballs containing a bare .git directory, +that can be unpacked and cloned like any git repository. + +It works in three steps: + +1. Write objects one by one in :file:`.git/objects/` +2. Calls ``git repack`` to pack all these objects into git packfiles. +3. Creates a tarball of the resulting repository + +It keeps a set of all written (or about-to-be-written) object hashes in memory +to avoid downloading and writing the same objects twice. +""" + +import datetime +import os.path +import re +import subprocess +import tarfile +import tempfile +from typing import Any, Dict, Iterable, List, Set +import zlib + +from swh.core.api.classes import stream_results +from swh.graph.client import GraphArgumentException +from swh.model import identifiers +from swh.model.hashutil import hash_to_bytehex, hash_to_hex +from swh.model.model import ( + Person, + Revision, + RevisionType, + Sha1Git, + TimestampWithTimezone, +) +from swh.storage.algos.revisions_walker import DFSRevisionsWalker +from swh.vault.cookers.base import BaseVaultCooker +from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE + +REVISION_BATCH_SIZE = 10000 +DIRECTORY_BATCH_SIZE = 10000 +CONTENT_BATCH_SIZE = 100 + + +class GitBareCooker(BaseVaultCooker): + use_fsck = True + + def cache_type_key(self) -> str: + return self.obj_type + + def check_exists(self): + obj_type = self.obj_type.split("_")[0] + if obj_type == "revision": + return not list(self.storage.revision_missing([self.obj_id])) + elif obj_type == "directory": + return not list(self.storage.directory_missing([self.obj_id])) + else: + raise NotImplementedError(f"GitBareCooker for {obj_type}") + + def obj_swhid(self) -> identifiers.CoreSWHID: + obj_type = self.obj_type.split("_")[0] + return identifiers.CoreSWHID( + object_type=identifiers.ObjectType[obj_type.upper()], object_id=self.obj_id, + ) + + def _push(self, stack: List[Sha1Git], obj_ids: Iterable[Sha1Git]) -> None: + assert not isinstance(obj_ids, bytes) + revision_ids = [id_ for id_ in obj_ids if id_ not in self._seen] + self._seen.update(revision_ids) + stack.extend(revision_ids) + + def _pop(self, stack: List[Sha1Git], n: int) -> List[Sha1Git]: + obj_ids = stack[-n:] + stack[-n:] = [] + return obj_ids + + def prepare_bundle(self): + # Objects we will visit soon: + self._rev_stack: List[Sha1Git] = [] + self._dir_stack: List[Sha1Git] = [] + self._cnt_stack: List[Sha1Git] = [] + + # Set of objects already in any of the stacks: + self._seen: Set[Sha1Git] = set() + + # Set of errors we expect git-fsck to raise at the end: + self._expected_fsck_errors = set() + + with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir: + # Initialize a Git directory + self.workdir = workdir + self.gitdir = os.path.join(workdir, "clone.git") + os.mkdir(self.gitdir) + self.init_git() + + # Add the root object to the stack of objects to visit + self.push_subgraph(self.obj_type.split("_")[0], self.obj_id) + + # Load and write all the objects to disk + self.load_objects() + + # Write the root object as a ref. + # This must be done before repacking; git-repack ignores orphan objects. + self.write_refs() + + self.repack() + self.write_archive() + + def init_git(self) -> None: + subprocess.run(["git", "-C", self.gitdir, "init", "--bare"], check=True) + + # Create all possible dirs ahead of time, so we don't have to check for + # existence every time. + for byte in range(256): + os.mkdir(os.path.join(self.gitdir, "objects", f"{byte:02x}")) + + def repack(self) -> None: + if self.use_fsck: + self.git_fsck() + + # Add objects we wrote in a pack + subprocess.run(["git", "-C", self.gitdir, "repack"], check=True) + + # Remove their non-packed originals + subprocess.run(["git", "-C", self.gitdir, "prune-packed"], check=True) + + def git_fsck(self) -> None: + proc = subprocess.run( + ["git", "-C", self.gitdir, "fsck"], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env={"LANG": "C.utf8"}, + ) + if not self._expected_fsck_errors: + # All went well, there should not be any error + proc.check_returncode() + return + + # Split on newlines not followed by a space + errors = re.split("\n(?! )", proc.stdout.decode()) + + unexpected_errors = set(filter(bool, errors)) - self._expected_fsck_errors + if unexpected_errors: + raise Exception( + "\n".join( + ["Unexpected errors from git-fsck:"] + sorted(unexpected_errors) + ) + ) + + def write_refs(self): + obj_type = self.obj_type.split("_")[0] + if obj_type == "directory": + # We need a synthetic revision pointing to the directory + author = Person.from_fullname( + b"swh-vault, git-bare cooker " + ) + dt = datetime.datetime.now(tz=datetime.timezone.utc) + dt = dt.replace(microsecond=0) # not supported by git + date = TimestampWithTimezone.from_datetime(dt) + revision = Revision( + author=author, + committer=author, + date=date, + committer_date=date, + message=b"Initial commit", + type=RevisionType.GIT, + directory=self.obj_id, + synthetic=True, + ) + self.write_revision_node(revision.to_dict()) + head = revision.id + elif obj_type == "revision": + head = self.obj_id + else: + assert False, obj_type + + with open(os.path.join(self.gitdir, "refs", "heads", "master"), "wb") as fd: + fd.write(hash_to_bytehex(head)) + + def write_archive(self): + with tarfile.TarFile(mode="w", fileobj=self.fileobj) as tf: + tf.add(self.gitdir, arcname=f"{self.obj_swhid()}.git", recursive=True) + + def _obj_path(self, obj_id: Sha1Git): + return os.path.join(self.gitdir, self._obj_relative_path(obj_id)) + + def _obj_relative_path(self, obj_id: Sha1Git): + obj_id_hex = hash_to_hex(obj_id) + directory = obj_id_hex[0:2] + filename = obj_id_hex[2:] + return os.path.join("objects", directory, filename) + + def object_exists(self, obj_id: Sha1Git) -> bool: + return os.path.exists(self._obj_path(obj_id)) + + def write_object(self, obj_id: Sha1Git, obj: bytes) -> bool: + """Writes a git object on disk. + + Returns whether it was already written.""" + # Git requires objects to be zlib-compressed; but repacking decompresses and + # removes them, so we don't need to compress them too much. + data = zlib.compress(obj, level=1) + + with open(self._obj_path(obj_id), "wb") as fd: + fd.write(data) + return True + + def push_subgraph(self, obj_type, obj_id) -> None: + if obj_type == "revision": + self.push_revision_subgraph(obj_id) + elif obj_type == "directory": + self._push(self._dir_stack, [obj_id]) + else: + raise NotImplementedError( + f"GitBareCooker.queue_subgraph({obj_type!r}, ...)" + ) + + def load_objects(self) -> None: + while self._rev_stack or self._dir_stack or self._cnt_stack: + revision_ids = self._pop(self._rev_stack, REVISION_BATCH_SIZE) + self.load_revisions(revision_ids) + + directory_ids = self._pop(self._dir_stack, DIRECTORY_BATCH_SIZE) + self.load_directories(directory_ids) + + content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE) + self.load_contents(content_ids) + + def push_revision_subgraph(self, obj_id: Sha1Git) -> None: + """Fetches a revision and all its children, and writes them to disk""" + loaded_from_graph = False + + if self.graph: + # First, try to cook using swh-graph, as it is more efficient than + # swh-storage for querying the history + obj_swhid = identifiers.CoreSWHID( + object_type=identifiers.ObjectType.REVISION, object_id=obj_id, + ) + try: + revision_ids = ( + swhid.object_id + for swhid in map( + identifiers.CoreSWHID.from_string, + self.graph.visit_nodes(str(obj_swhid), edges="rev:rev"), + ) + ) + self._push(self._rev_stack, revision_ids) + except GraphArgumentException: + # Revision not found in the graph + pass + else: + loaded_from_graph = True + + if not loaded_from_graph: + # If swh-graph is not available, or the revision is not yet in + # swh-graph, fall back to self.storage.revision_log. + # self.storage.revision_log also gives us the full revisions, + # so we load them right now instead of just pushing them on the stack. + walker = DFSRevisionsWalker(self.storage, obj_id) + for revision in walker: + self.write_revision_node(revision) + self._push(self._dir_stack, [revision["directory"]]) + + def load_revisions(self, obj_ids: List[Sha1Git]) -> None: + """Given a list of revision ids, loads these revisions and their directories; + but not their parent revisions.""" + revisions = self.storage.revision_get(obj_ids) + for revision in revisions: + self.write_revision_node(revision.to_dict()) + self._push(self._dir_stack, (rev.directory for rev in revisions)) + + def write_revision_node(self, revision: Dict[str, Any]) -> bool: + """Writes a revision object to disk""" + git_object = identifiers.revision_git_object(revision) + return self.write_object(revision["id"], git_object) + + def load_directories(self, obj_ids: List[Sha1Git]) -> None: + for obj_id in obj_ids: + self.load_directory(obj_id) + + def load_directory(self, obj_id: Sha1Git) -> None: + # Load the directory + entries = [ + entry.to_dict() + for entry in stream_results(self.storage.directory_get_entries, obj_id) + ] + directory = {"id": obj_id, "entries": entries} + git_object = identifiers.directory_git_object(directory) + self.write_object(obj_id, git_object) + + # Add children to the stack + entry_loaders: Dict[str, List[Sha1Git]] = { + "file": self._cnt_stack, + "dir": self._dir_stack, + "rev": self._rev_stack, + } + for entry in directory["entries"]: + stack = entry_loaders[entry["type"]] + self._push(stack, [entry["target"]]) + + def load_contents(self, obj_ids: List[Sha1Git]) -> None: + # TODO: add support of filtered objects, somehow? + # It's tricky, because, by definition, we can't write a git object with + # the expected hash, so git-fsck *will* choke on it. + contents = self.storage.content_get(obj_ids, "sha1_git") + + visible_contents = [] + for (obj_id, content) in zip(obj_ids, contents): + if content is None: + # FIXME: this may also happen for missing content + self.write_content(obj_id, SKIPPED_MESSAGE) + self._expect_mismatched_object_error(obj_id) + elif content.status == "visible": + visible_contents.append(content) + elif content.status == "hidden": + self.write_content(obj_id, HIDDEN_MESSAGE) + self._expect_mismatched_object_error(obj_id) + else: + assert False, ( + f"unexpected status {content.status!r} " + f"for content {hash_to_hex(content.sha1_git)}" + ) + + if self.objstorage is None: + for content in visible_contents: + data = self.storage.content_get_data(content.sha1) + self.write_content(content.sha1_git, data) + else: + content_data = self.objstorage.get_batch(c.sha1 for c in visible_contents) + for (content, data) in zip(contents, content_data): + self.write_content(content.sha1_git, data) + + def write_content(self, obj_id: Sha1Git, content: bytes) -> None: + header = identifiers.git_object_header("blob", len(content)) + self.write_object(obj_id, header + content) + + def _expect_mismatched_object_error(self, obj_id): + obj_id_hex = hash_to_hex(obj_id) + obj_path = self._obj_relative_path(obj_id) + self._expected_fsck_errors.add( + f"error: sha1 mismatch for ./{obj_path} (expected {obj_id_hex})" + ) + self._expected_fsck_errors.add( + f"error: {obj_id_hex}: object corrupt or missing: ./{obj_path}" + ) + self._expected_fsck_errors.add(f"missing blob {obj_id_hex}") diff --git a/swh/vault/in_memory_backend.py b/swh/vault/in_memory_backend.py new file mode 100644 index 0000000..e7cbb2a --- /dev/null +++ b/swh/vault/in_memory_backend.py @@ -0,0 +1,53 @@ +# Copyright (C) 2017-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, List, Optional, Tuple, Union + +from swh.model.hashutil import hash_to_bytes + +from .cache import VaultCache + +ObjectId = Union[str, bytes] + + +class InMemoryVaultBackend: + """Stub vault backend, for use in the CLI.""" + + def __init__(self): + self._cache = VaultCache(cls="memory") + + def fetch(self, obj_type: str, obj_id: ObjectId) -> Optional[bytes]: + return self._cache.get(obj_type, hash_to_bytes(obj_id)) + + def cook( + self, obj_type: str, obj_id: ObjectId, email: Optional[str] = None + ) -> Dict[str, Any]: + raise NotImplementedError("InMemoryVaultBackend.cook()") + + def progress(self, obj_type: str, obj_id: ObjectId): + raise NotImplementedError("InMemoryVaultBackend.progress()") + + # Cookers endpoints + + def set_progress(self, obj_type: str, obj_id: ObjectId, progress: str) -> None: + pass + + def set_status(self, obj_type: str, obj_id: ObjectId, status: str) -> None: + pass + + def put_bundle(self, obj_type: str, obj_id: ObjectId, bundle) -> bool: + self._cache.add(obj_type, hash_to_bytes(obj_id), bundle) + return True + + def send_notif(self, obj_type: str, obj_id: ObjectId): + pass + + # Batch endpoints + + def batch_cook(self, batch: List[Tuple[str, str]]) -> int: + raise NotImplementedError("InMemoryVaultBackend.batch_cook()") + + def batch_progress(self, batch_id: int) -> Dict[str, Any]: + pass diff --git a/swh/vault/interface.py b/swh/vault/interface.py index 6657c2a..cfc3a5c 100644 --- a/swh/vault/interface.py +++ b/swh/vault/interface.py @@ -1,70 +1,70 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, List, Optional, Tuple, Union from typing_extensions import Protocol, runtime_checkable from swh.core.api import remote_api_endpoint ObjectId = Union[str, bytes] @runtime_checkable class VaultInterface(Protocol): """ Backend Interface for the Software Heritage vault. """ @remote_api_endpoint("fetch") - def fetch(self, obj_type: str, obj_id: ObjectId) -> Dict[str, Any]: + def fetch(self, obj_type: str, obj_id: ObjectId) -> Optional[bytes]: """Fetch information from a bundle""" ... @remote_api_endpoint("cook") def cook( self, obj_type: str, obj_id: ObjectId, email: Optional[str] = None ) -> Dict[str, Any]: """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" ... @remote_api_endpoint("progress") def progress(self, obj_type: str, obj_id: ObjectId): ... # Cookers endpoints @remote_api_endpoint("set_progress") def set_progress(self, obj_type: str, obj_id: ObjectId, progress: str) -> None: """Set the cooking progress of a bundle""" ... @remote_api_endpoint("set_status") - def set_status(self, obj_type: str, obj_id: ObjectId, status: str) -> None: + def set_status(self, obj_type: str, obj_id: ObjectId, status: str) -> bool: """Set the cooking status of a bundle""" ... @remote_api_endpoint("put_bundle") def put_bundle(self, obj_type: str, obj_id: ObjectId, bundle): """Store bundle in vault cache""" ... @remote_api_endpoint("send_notif") def send_notif(self, obj_type: str, obj_id: ObjectId): """Send all the e-mails in the notification list of a bundle""" ... # Batch endpoints @remote_api_endpoint("batch_cook") def batch_cook(self, batch: List[Tuple[str, str]]) -> int: """Cook a batch of bundles and returns the cooking id.""" ... @remote_api_endpoint("batch_progress") def batch_progress(self, batch_id: int) -> Dict[str, Any]: """Fetch information from a batch of bundles""" ... diff --git a/swh/vault/tests/conftest.py b/swh/vault/tests/conftest.py index a5356c2..3d394b2 100644 --- a/swh/vault/tests/conftest.py +++ b/swh/vault/tests/conftest.py @@ -1,88 +1,88 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Any, Dict import pkg_resources.extern.packaging.version import pytest import yaml from swh.core.db.pytest_plugin import postgresql_fact from swh.storage.tests import SQL_DIR as STORAGE_SQL_DIR import swh.vault from swh.vault import get_vault os.environ["LC_ALL"] = "C.UTF-8" pytest_v = pkg_resources.get_distribution("pytest").parsed_version if pytest_v < pkg_resources.extern.packaging.version.parse("3.9"): @pytest.fixture def tmp_path(): import pathlib import tempfile with tempfile.TemporaryDirectory() as tmpdir: yield pathlib.Path(tmpdir) VAULT_SQL_DIR = os.path.join(os.path.dirname(swh.vault.__file__), "sql") postgres_vault = postgresql_fact( - "postgresql_proc", db_name="vault", dump_files=f"{VAULT_SQL_DIR}/*.sql" + "postgresql_proc", dbname="vault", dump_files=f"{VAULT_SQL_DIR}/*.sql" ) postgres_storage = postgresql_fact( - "postgresql_proc", db_name="storage", dump_files=f"{STORAGE_SQL_DIR}/*.sql" + "postgresql_proc", dbname="storage", dump_files=f"{STORAGE_SQL_DIR}/*.sql" ) @pytest.fixture def swh_vault_config(postgres_vault, postgres_storage, tmp_path) -> Dict[str, Any]: tmp_path = str(tmp_path) return { "db": postgres_vault.dsn, "storage": { "cls": "local", "db": postgres_storage.dsn, "objstorage": { "cls": "pathslicing", "args": {"root": tmp_path, "slicing": "0:1/1:5",}, }, }, "cache": { "cls": "pathslicing", "args": {"root": tmp_path, "slicing": "0:1/1:5", "allow_delete": True}, }, "scheduler": {"cls": "remote", "url": "http://swh-scheduler:5008",}, } @pytest.fixture def swh_local_vault_config(swh_vault_config: Dict[str, Any]) -> Dict[str, Any]: return { "vault": {"cls": "local", **swh_vault_config}, "client_max_size": 1024 ** 3, } @pytest.fixture def swh_vault_config_file(swh_local_vault_config, monkeypatch, tmp_path): conf_path = os.path.join(str(tmp_path), "vault-server.yml") with open(conf_path, "w") as f: f.write(yaml.dump(swh_local_vault_config)) monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) return conf_path @pytest.fixture def swh_vault(swh_vault_config): return get_vault("local", **swh_vault_config) @pytest.fixture def swh_storage(swh_vault): return swh_vault.storage diff --git a/swh/vault/tests/test_cli.py b/swh/vault/tests/test_cli.py new file mode 100644 index 0000000..7a375a2 --- /dev/null +++ b/swh/vault/tests/test_cli.py @@ -0,0 +1,104 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import tempfile +from unittest.mock import MagicMock + +import click +import click.testing +import pytest + +from swh.vault.cli import vault as vault_cli_group +from swh.vault.cookers.base import BaseVaultCooker +from swh.vault.in_memory_backend import InMemoryVaultBackend + + +def test_cook_unsupported_swhid(): + runner = click.testing.CliRunner() + + result = runner.invoke(vault_cli_group, ["cook", "swh:1:dir:f00b4r", "-"]) + assert isinstance(result.exception, SystemExit) + assert "expected core SWHID" in result.stdout + + result = runner.invoke(vault_cli_group, ["cook", "swh:1:ori:" + "0" * 40, "-"]) + assert isinstance(result.exception, SystemExit) + assert "expected core SWHID" in result.stdout + + result = runner.invoke(vault_cli_group, ["cook", "swh:1:cnt:" + "0" * 40, "-"]) + assert isinstance(result.exception, SystemExit) + assert "No cooker available for CONTENT" in result.stdout + + +def test_cook_unknown_cooker(): + runner = click.testing.CliRunner() + + result = runner.invoke( + vault_cli_group, + ["cook", "swh:1:dir:" + "0" * 40, "-", "--cooker-type", "gitfast"], + ) + assert isinstance(result.exception, SystemExit) + assert "do not have a gitfast cooker" in result.stdout + + result = runner.invoke(vault_cli_group, ["cook", "swh:1:rev:" + "0" * 40, "-"]) + assert isinstance(result.exception, SystemExit) + assert "explicit --cooker-type" in result.stdout + + +@pytest.mark.parametrize( + "obj_type,cooker_name_suffix,swhid_type", + [("directory", "", "dir"), ("revision", "gitfast", "rev"),], +) +def test_cook_directory(obj_type, cooker_name_suffix, swhid_type, mocker): + storage = object() + mocker.patch("swh.storage.get_storage", return_value=storage) + + backend = MagicMock(spec=InMemoryVaultBackend) + backend.fetch.return_value = b"bundle content" + mocker.patch( + "swh.vault.in_memory_backend.InMemoryVaultBackend", return_value=backend + ) + + cooker = MagicMock(spec=BaseVaultCooker) + cooker_cls = MagicMock(return_value=cooker) + mocker.patch("swh.vault.cookers.get_cooker_cls", return_value=cooker_cls) + + runner = click.testing.CliRunner() + + with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: + config_fd.write('{"storage": {}}') + config_fd.seek(0) + if cooker_name_suffix: + result = runner.invoke( + vault_cli_group, + [ + "cook", + f"swh:1:{swhid_type}:{'0'*40}", + "-", + "-C", + config_fd.name, + "--cooker-type", + cooker_name_suffix, + ], + ) + else: + result = runner.invoke( + vault_cli_group, + ["cook", f"swh:1:{swhid_type}:{'0'*40}", "-", "-C", config_fd.name], + ) + + if result.exception is not None: + raise result.exception + + cooker_cls.assert_called_once_with( + obj_type=f"{obj_type}_{cooker_name_suffix}" if cooker_name_suffix else obj_type, + obj_id=b"\x00" * 20, + backend=backend, + storage=storage, + graph=None, + objstorage=None, + ) + cooker.cook.assert_called_once_with() + + assert result.stdout_bytes == b"bundle content" diff --git a/swh/vault/tests/test_cookers.py b/swh/vault/tests/test_cookers.py index 46d23c5..f917d42 100644 --- a/swh/vault/tests/test_cookers.py +++ b/swh/vault/tests/test_cookers.py @@ -1,565 +1,786 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime +import glob import gzip import io import os import pathlib +import shutil import subprocess import tarfile import tempfile import unittest import unittest.mock import dulwich.fastexport import dulwich.index import dulwich.objects import dulwich.porcelain import dulwich.repo import pytest from swh.loader.git.from_disk import GitLoaderFromDisk from swh.model import from_disk, hashutil -from swh.model.model import Directory, DirectoryEntry, Person, Revision, RevisionType -from swh.vault.cookers import DirectoryCooker, RevisionGitfastCooker +from swh.model.model import ( + Directory, + DirectoryEntry, + Person, + Revision, + RevisionType, + TimestampWithTimezone, +) +from swh.vault.cookers import DirectoryCooker, GitBareCooker, RevisionGitfastCooker from swh.vault.tests.vault_testing import hash_content from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE class TestRepo: """A tiny context manager for a test git repository, with some utility functions to perform basic git stuff. """ + def __init__(self, repo_dir=None): + self.repo_dir = repo_dir + def __enter__(self): - self.tmp_dir = tempfile.TemporaryDirectory(prefix="tmp-vault-repo-") - self.repo_dir = self.tmp_dir.__enter__() - self.repo = dulwich.repo.Repo.init(self.repo_dir) + if self.repo_dir: + self.tmp_dir = None + self.repo = dulwich.repo.Repo(self.repo_dir) + else: + self.tmp_dir = tempfile.TemporaryDirectory(prefix="tmp-vault-repo-") + self.repo_dir = self.tmp_dir.__enter__() + self.repo = dulwich.repo.Repo.init(self.repo_dir) self.author_name = b"Test Author" self.author_email = b"test@softwareheritage.org" self.author = b"%s <%s>" % (self.author_name, self.author_email) self.base_date = 258244200 self.counter = 0 return pathlib.Path(self.repo_dir) def __exit__(self, exc, value, tb): - self.tmp_dir.__exit__(exc, value, tb) + if self.tmp_dir is not None: + self.tmp_dir.__exit__(exc, value, tb) + self.repo_dir = None def checkout(self, rev_sha): rev = self.repo[rev_sha] dulwich.index.build_index_from_tree( self.repo_dir, self.repo.index_path(), self.repo.object_store, rev.tree ) def git_shell(self, *cmd, stdout=subprocess.DEVNULL, **kwargs): name = self.author_name email = self.author_email date = "%d +0000" % (self.base_date + self.counter) env = { # Set git commit format "GIT_AUTHOR_NAME": name, "GIT_AUTHOR_EMAIL": email, "GIT_AUTHOR_DATE": date, "GIT_COMMITTER_NAME": name, "GIT_COMMITTER_EMAIL": email, "GIT_COMMITTER_DATE": date, # Ignore all the system-wide and user configurations "GIT_CONFIG_NOSYSTEM": "1", "HOME": str(self.tmp_dir), "XDG_CONFIG_HOME": str(self.tmp_dir), } kwargs.setdefault("env", {}).update(env) subprocess.check_call( ("git", "-C", self.repo_dir) + cmd, stdout=stdout, **kwargs ) def commit(self, message="Commit test\n", ref=b"HEAD"): """Commit the current working tree in a new commit with message on the branch 'ref'. At the end of the commit, the reference should stay the same and the index should be clean. """ - self.git_shell("add", ".") + paths = [ + os.path.relpath(path, self.repo_dir) + for path in glob.glob(self.repo_dir + "/**/*", recursive=True) + ] + self.repo.stage(paths) message = message.encode() + b"\n" ret = self.repo.do_commit( message=message, committer=self.author, commit_timestamp=self.base_date + self.counter, commit_timezone=0, ref=ref, ) self.counter += 1 # committing on another branch leaves # dangling files in index if ref != b"HEAD": # XXX this should work (but does not) # dulwich.porcelain.reset(self.repo, 'hard') self.git_shell("reset", "--hard", "HEAD") return ret def merge(self, parent_sha_list, message="Merge branches."): self.git_shell( "merge", "--allow-unrelated-histories", "-m", message, *[p.decode() for p in parent_sha_list], ) self.counter += 1 return self.repo.refs[b"HEAD"] def print_debug_graph(self, reflog=False): args = ["log", "--all", "--graph", "--decorate"] if reflog: args.append("--reflog") self.git_shell(*args, stdout=None) @pytest.fixture def git_loader(swh_storage,): """Instantiate a Git Loader using the storage instance as storage. """ def _create_loader(directory): return GitLoaderFromDisk( swh_storage, "fake_origin", directory=directory, visit_date=datetime.datetime.now(datetime.timezone.utc), ) return _create_loader @contextlib.contextmanager -def cook_extract_directory(storage, obj_id): +def cook_extract_directory_dircooker(storage, obj_id, fsck=True): """Context manager that cooks a directory and extract it.""" backend = unittest.mock.MagicMock() backend.storage = storage cooker = DirectoryCooker("directory", obj_id, backend=backend, storage=storage) cooker.fileobj = io.BytesIO() assert cooker.check_exists() cooker.prepare_bundle() cooker.fileobj.seek(0) with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: tar.extractall(td) yield pathlib.Path(td) / hashutil.hash_to_hex(obj_id) cooker.storage = None +@contextlib.contextmanager +def cook_extract_directory_gitfast(storage, obj_id, fsck=True): + """Context manager that cooks a revision containing a directory and extract it, + using RevisionGitfastCooker""" + test_repo = TestRepo() + with test_repo as p: + date = TimestampWithTimezone.from_datetime( + datetime.datetime.now(datetime.timezone.utc) + ) + revision = Revision( + directory=obj_id, + message=b"dummy message", + author=Person.from_fullname(b"someone"), + committer=Person.from_fullname(b"someone"), + date=date, + committer_date=date, + type=RevisionType.GIT, + synthetic=False, + ) + storage.revision_add([revision]) + + with cook_stream_revision_gitfast(storage, revision.id) as stream, test_repo as p: + processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) + processor.import_stream(stream) + test_repo.checkout(b"HEAD") + shutil.rmtree(p / ".git") + yield p + + +@contextlib.contextmanager +def cook_extract_directory_git_bare( + storage, obj_id, fsck=True, direct_objstorage=False +): + """Context manager that cooks a revision and extract it, + using GitBareCooker""" + backend = unittest.mock.MagicMock() + backend.storage = storage + + # Cook the object + cooker = GitBareCooker( + "directory", + obj_id, + backend=backend, + storage=storage, + objstorage=storage.objstorage if direct_objstorage else None, + ) + cooker.use_fsck = fsck # Some tests try edge-cases that git-fsck rejects + cooker.fileobj = io.BytesIO() + assert cooker.check_exists() + cooker.prepare_bundle() + cooker.fileobj.seek(0) + + # Extract it + with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: + with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: + tar.extractall(td) + + # Clone it with Dulwich + with tempfile.TemporaryDirectory(prefix="tmp-vault-clone-") as clone_dir: + clone_dir = pathlib.Path(clone_dir) + subprocess.check_call( + [ + "git", + "clone", + os.path.join(td, f"swh:1:dir:{obj_id.hex()}.git"), + clone_dir, + ] + ) + shutil.rmtree(clone_dir / ".git") + yield clone_dir + + +@pytest.fixture( + scope="module", + params=[ + cook_extract_directory_dircooker, + cook_extract_directory_gitfast, + cook_extract_directory_git_bare, + ], +) +def cook_extract_directory(request): + """A fixture that is instantiated as either cook_extract_directory_dircooker or + cook_extract_directory_git_bare.""" + return request.param + + @contextlib.contextmanager def cook_stream_revision_gitfast(storage, obj_id): """Context manager that cooks a revision and stream its fastexport.""" backend = unittest.mock.MagicMock() backend.storage = storage cooker = RevisionGitfastCooker( "revision_gitfast", obj_id, backend=backend, storage=storage ) cooker.fileobj = io.BytesIO() assert cooker.check_exists() cooker.prepare_bundle() cooker.fileobj.seek(0) fastexport_stream = gzip.GzipFile(fileobj=cooker.fileobj) yield fastexport_stream cooker.storage = None @contextlib.contextmanager -def cook_extract_revision_gitfast(storage, obj_id): - """Context manager that cooks a revision and extract it.""" +def cook_extract_revision_gitfast(storage, obj_id, fsck=True): + """Context manager that cooks a revision and extract it, + using RevisionGitfastCooker""" test_repo = TestRepo() with cook_stream_revision_gitfast(storage, obj_id) as stream, test_repo as p: processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) processor.import_stream(stream) yield test_repo, p +@contextlib.contextmanager +def cook_extract_revision_git_bare(storage, obj_id, fsck=True): + """Context manager that cooks a revision and extract it, + using GitBareCooker""" + backend = unittest.mock.MagicMock() + backend.storage = storage + + # Cook the object + cooker = GitBareCooker("revision", obj_id, backend=backend, storage=storage) + cooker.use_fsck = fsck # Some tests try edge-cases that git-fsck rejects + cooker.fileobj = io.BytesIO() + assert cooker.check_exists() + cooker.prepare_bundle() + cooker.fileobj.seek(0) + + # Extract it + with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: + with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: + tar.extractall(td) + + # Clone it with Dulwich + with tempfile.TemporaryDirectory(prefix="tmp-vault-clone-") as clone_dir: + clone_dir = pathlib.Path(clone_dir) + subprocess.check_call( + [ + "git", + "clone", + os.path.join(td, f"swh:1:rev:{obj_id.hex()}.git"), + clone_dir, + ] + ) + test_repo = TestRepo(clone_dir) + with test_repo: + yield test_repo, clone_dir + + +@pytest.fixture( + scope="module", + params=[cook_extract_revision_gitfast, cook_extract_revision_git_bare], +) +def cook_extract_revision(request): + """A fixture that is instantiated as either cook_extract_revision_gitfast or + cook_extract_revision_git_bare.""" + return request.param + + TEST_CONTENT = ( " test content\n" "and unicode \N{BLACK HEART SUIT}\n" " and trailing spaces " ) TEST_EXECUTABLE = b"\x42\x40\x00\x00\x05" class TestDirectoryCooker: - def test_directory_simple(self, git_loader): + def test_directory_simple(self, git_loader, cook_extract_directory): repo = TestRepo() with repo as rp: (rp / "file").write_text(TEST_CONTENT) (rp / "executable").write_bytes(TEST_EXECUTABLE) (rp / "executable").chmod(0o755) (rp / "link").symlink_to("file") (rp / "dir1/dir2").mkdir(parents=True) (rp / "dir1/dir2/file").write_text(TEST_CONTENT) c = repo.commit() loader = git_loader(str(rp)) loader.load() obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with cook_extract_directory(loader.storage, obj_id) as p: assert (p / "file").stat().st_mode == 0o100644 assert (p / "file").read_text() == TEST_CONTENT assert (p / "executable").stat().st_mode == 0o100755 assert (p / "executable").read_bytes() == TEST_EXECUTABLE - assert (p / "link").is_symlink + assert (p / "link").is_symlink() assert os.readlink(str(p / "link")) == "file" assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT directory = from_disk.Directory.from_disk(path=bytes(p)) assert obj_id_hex == hashutil.hash_to_hex(directory.hash) - def test_directory_filtered_objects(self, git_loader): + def test_directory_filtered_objects(self, git_loader, cook_extract_directory): repo = TestRepo() with repo as rp: file_1, id_1 = hash_content(b"test1") file_2, id_2 = hash_content(b"test2") file_3, id_3 = hash_content(b"test3") (rp / "file").write_bytes(file_1) (rp / "hidden_file").write_bytes(file_2) (rp / "absent_file").write_bytes(file_3) c = repo.commit() loader = git_loader(str(rp)) loader.load() obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) # FIXME: storage.content_update() should be changed to allow things # like that with loader.storage.get_db().transaction() as cur: cur.execute( """update content set status = 'visible' where sha1 = %s""", (id_1,), ) cur.execute( """update content set status = 'hidden' where sha1 = %s""", (id_2,), ) + cur.execute( - """update content set status = 'absent' - where sha1 = %s""", + """ + insert into skipped_content + (sha1, sha1_git, sha256, blake2s256, length, reason) + select sha1, sha1_git, sha256, blake2s256, length, 'no reason' + from content + where sha1 = %s + """, (id_3,), ) + cur.execute("delete from content where sha1 = %s", (id_3,)) + with cook_extract_directory(loader.storage, obj_id) as p: assert (p / "file").read_bytes() == b"test1" assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE - def test_directory_bogus_perms(self, git_loader): + def test_directory_bogus_perms(self, git_loader, cook_extract_directory): # Some early git repositories have 664/775 permissions... let's check # if all the weird modes are properly normalized in the directory # cooker. repo = TestRepo() with repo as rp: (rp / "file").write_text(TEST_CONTENT) (rp / "file").chmod(0o664) (rp / "executable").write_bytes(TEST_EXECUTABLE) (rp / "executable").chmod(0o775) (rp / "wat").write_text(TEST_CONTENT) (rp / "wat").chmod(0o604) + + # Disable mode cleanup + with unittest.mock.patch("dulwich.index.cleanup_mode", lambda mode: mode): + c = repo.commit() + + # Make sure Dulwich didn't normalize the permissions itself. + # (if it did, then the test can't check the cooker normalized them) + tree_id = repo.repo[c].tree + assert {entry.mode for entry in repo.repo[tree_id].items()} == { + 0o100775, + 0o100664, + 0o100604, + } + + # Disable mode checks + with unittest.mock.patch("dulwich.objects.Tree.check", lambda self: None): + loader = git_loader(str(rp)) + loader.load() + + # Make sure swh-loader didn't normalize them either + dir_entries = loader.storage.directory_ls(hashutil.bytehex_to_hash(tree_id)) + assert {entry["perms"] for entry in dir_entries} == { + 0o100664, + 0o100775, + 0o100604, + } + + obj_id_hex = repo.repo[c].tree.decode() + obj_id = hashutil.hash_to_bytes(obj_id_hex) + + with cook_extract_directory(loader.storage, obj_id) as p: + assert (p / "file").stat().st_mode == 0o100644 + assert (p / "executable").stat().st_mode == 0o100755 + assert (p / "wat").stat().st_mode == 0o100644 + + @pytest.mark.parametrize("direct_objstorage", [True, False]) + def test_directory_objstorage( + self, swh_storage, git_loader, mocker, direct_objstorage + ): + """Like test_directory_simple, but using swh_objstorage directly, without + going through swh_storage.content_get_data()""" + repo = TestRepo() + with repo as rp: + (rp / "file").write_text(TEST_CONTENT) + (rp / "executable").write_bytes(TEST_EXECUTABLE) + (rp / "executable").chmod(0o755) + (rp / "link").symlink_to("file") + (rp / "dir1/dir2").mkdir(parents=True) + (rp / "dir1/dir2/file").write_text(TEST_CONTENT) c = repo.commit() loader = git_loader(str(rp)) loader.load() obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) - with cook_extract_directory(loader.storage, obj_id) as p: + # Set-up spies + storage_content_get_data = mocker.patch.object( + swh_storage, "content_get_data", wraps=swh_storage.content_get_data + ) + objstorage_content_batch = mocker.patch.object( + swh_storage.objstorage, "get_batch", wraps=swh_storage.objstorage.get_batch + ) + + with cook_extract_directory_git_bare( + loader.storage, obj_id, direct_objstorage=direct_objstorage + ) as p: assert (p / "file").stat().st_mode == 0o100644 + assert (p / "file").read_text() == TEST_CONTENT assert (p / "executable").stat().st_mode == 0o100755 - assert (p / "wat").stat().st_mode == 0o100644 + assert (p / "executable").read_bytes() == TEST_EXECUTABLE + assert (p / "link").is_symlink() + assert os.readlink(str(p / "link")) == "file" + assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 + assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT + + directory = from_disk.Directory.from_disk(path=bytes(p)) + assert obj_id_hex == hashutil.hash_to_hex(directory.hash) + + if direct_objstorage: + storage_content_get_data.assert_not_called() + objstorage_content_batch.assert_called() + else: + storage_content_get_data.assert_called() + objstorage_content_batch.assert_not_called() def test_directory_revision_data(self, swh_storage): target_rev = "0e8a3ad980ec179856012b7eecf4327e99cd44cd" dir = Directory( entries=( DirectoryEntry( name=b"submodule", type="rev", target=hashutil.hash_to_bytes(target_rev), perms=0o100644, ), ), ) swh_storage.directory_add([dir]) - with cook_extract_directory(swh_storage, dir.id) as p: + with cook_extract_directory_dircooker(swh_storage, dir.id, fsck=False) as p: assert (p / "submodule").is_symlink() assert os.readlink(str(p / "submodule")) == target_rev -class TestRevisionGitfastCooker: - def test_revision_simple(self, git_loader): +class TestRevisionCooker: + def test_revision_simple(self, git_loader, cook_extract_revision): # # 1--2--3--4--5--6--7 # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) repo.commit("add file1") (rp / "file2").write_text(TEST_CONTENT) repo.commit("add file2") (rp / "dir1/dir2").mkdir(parents=True) (rp / "dir1/dir2/file").write_text(TEST_CONTENT) repo.commit("add dir1/dir2/file") (rp / "bin1").write_bytes(TEST_EXECUTABLE) (rp / "bin1").chmod(0o755) repo.commit("add bin1") (rp / "link1").symlink_to("file1") repo.commit("link link1 to file1") (rp / "file2").unlink() repo.commit("remove file2") (rp / "bin1").rename(rp / "bin") repo.commit("rename bin1 to bin") loader = git_loader(str(rp)) loader.load() obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) - with cook_extract_revision_gitfast(loader.storage, obj_id) as (ert, p): + with cook_extract_revision(loader.storage, obj_id) as (ert, p): ert.checkout(b"HEAD") assert (p / "file1").stat().st_mode == 0o100644 assert (p / "file1").read_text() == TEST_CONTENT - assert (p / "link1").is_symlink + assert (p / "link1").is_symlink() assert os.readlink(str(p / "link1")) == "file1" assert (p / "bin").stat().st_mode == 0o100755 assert (p / "bin").read_bytes() == TEST_EXECUTABLE assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex - def test_revision_two_roots(self, git_loader): + def test_revision_two_roots(self, git_loader, cook_extract_revision): # # 1----3---4 # / # 2---- # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) c1 = repo.commit("Add file1") del repo.repo.refs[b"refs/heads/master"] # git update-ref -d HEAD (rp / "file2").write_text(TEST_CONTENT) repo.commit("Add file2") repo.merge([c1]) (rp / "file3").write_text(TEST_CONTENT) repo.commit("add file3") obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) loader = git_loader(str(rp)) loader.load() - with cook_extract_revision_gitfast(loader.storage, obj_id) as (ert, p): + with cook_extract_revision(loader.storage, obj_id) as (ert, p): assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex - def test_revision_two_double_fork_merge(self, git_loader): + def test_revision_two_double_fork_merge(self, git_loader, cook_extract_revision): # # 2---4---6 # / / / # 1---3---5 # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) c1 = repo.commit("Add file1") repo.repo.refs[b"refs/heads/c1"] = c1 (rp / "file2").write_text(TEST_CONTENT) repo.commit("Add file2") (rp / "file3").write_text(TEST_CONTENT) c3 = repo.commit("Add file3", ref=b"refs/heads/c1") repo.repo.refs[b"refs/heads/c3"] = c3 repo.merge([c3]) (rp / "file5").write_text(TEST_CONTENT) c5 = repo.commit("Add file3", ref=b"refs/heads/c3") repo.merge([c5]) obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) loader = git_loader(str(rp)) loader.load() - with cook_extract_revision_gitfast(loader.storage, obj_id) as (ert, p): + with cook_extract_revision(loader.storage, obj_id) as (ert, p): assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex - def test_revision_triple_merge(self, git_loader): + def test_revision_triple_merge(self, git_loader, cook_extract_revision): # # .---.---5 # / / / # 2 3 4 # / / / # 1---.---. # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) c1 = repo.commit("Commit 1") repo.repo.refs[b"refs/heads/b1"] = c1 repo.repo.refs[b"refs/heads/b2"] = c1 repo.commit("Commit 2") c3 = repo.commit("Commit 3", ref=b"refs/heads/b1") c4 = repo.commit("Commit 4", ref=b"refs/heads/b2") repo.merge([c3, c4]) obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) loader = git_loader(str(rp)) loader.load() - with cook_extract_revision_gitfast(loader.storage, obj_id) as (ert, p): + with cook_extract_revision(loader.storage, obj_id) as (ert, p): assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex - def test_revision_filtered_objects(self, git_loader): + def test_revision_filtered_objects(self, git_loader, cook_extract_revision): repo = TestRepo() with repo as rp: file_1, id_1 = hash_content(b"test1") file_2, id_2 = hash_content(b"test2") file_3, id_3 = hash_content(b"test3") (rp / "file").write_bytes(file_1) (rp / "hidden_file").write_bytes(file_2) (rp / "absent_file").write_bytes(file_3) repo.commit() obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) loader = git_loader(str(rp)) loader.load() # FIXME: storage.content_update() should be changed to allow things # like that with loader.storage.get_db().transaction() as cur: cur.execute( """update content set status = 'visible' where sha1 = %s""", (id_1,), ) cur.execute( """update content set status = 'hidden' where sha1 = %s""", (id_2,), ) + cur.execute( - """update content set status = 'absent' - where sha1 = %s""", + """ + insert into skipped_content + (sha1, sha1_git, sha256, blake2s256, length, reason) + select sha1, sha1_git, sha256, blake2s256, length, 'no reason' + from content + where sha1 = %s + """, (id_3,), ) - with cook_extract_revision_gitfast(loader.storage, obj_id) as (ert, p): + cur.execute("delete from content where sha1 = %s", (id_3,)) + + with cook_extract_revision(loader.storage, obj_id) as (ert, p): ert.checkout(b"HEAD") assert (p / "file").read_bytes() == b"test1" assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE - def test_revision_bogus_perms(self, git_loader): - # Some early git repositories have 664/775 permissions... let's check - # if all the weird modes are properly normalized in the revision - # cooker. - repo = TestRepo() - with repo as rp: - (rp / "file").write_text(TEST_CONTENT) - (rp / "file").chmod(0o664) - (rp / "executable").write_bytes(TEST_EXECUTABLE) - (rp / "executable").chmod(0o775) - (rp / "wat").write_text(TEST_CONTENT) - (rp / "wat").chmod(0o604) - repo.commit("initial commit") - loader = git_loader(str(rp)) - loader.load() - obj_id_hex = repo.repo.refs[b"HEAD"].decode() - obj_id = hashutil.hash_to_bytes(obj_id_hex) - - with cook_extract_revision_gitfast(loader.storage, obj_id) as (ert, p): - ert.checkout(b"HEAD") - assert (p / "file").stat().st_mode == 0o100644 - assert (p / "executable").stat().st_mode == 0o100755 - assert (p / "wat").stat().st_mode == 0o100644 - - def test_revision_null_fields(self, git_loader): + def test_revision_null_fields(self, git_loader, cook_extract_revision): # Our schema doesn't enforce a lot of non-null revision fields. We need # to check these cases don't break the cooker. repo = TestRepo() with repo as rp: (rp / "file").write_text(TEST_CONTENT) c = repo.commit("initial commit") loader = git_loader(str(rp)) loader.load() repo.repo.refs[b"HEAD"].decode() dir_id_hex = repo.repo[c].tree.decode() dir_id = hashutil.hash_to_bytes(dir_id_hex) test_revision = Revision( message=b"", author=Person(name=None, email=None, fullname=b""), date=None, committer=Person(name=None, email=None, fullname=b""), committer_date=None, parents=(), type=RevisionType.GIT, directory=dir_id, metadata={}, synthetic=True, ) storage = loader.storage storage.revision_add([test_revision]) - with cook_extract_revision_gitfast(storage, test_revision.id) as (ert, p): + with cook_extract_revision(storage, test_revision.id, fsck=False) as (ert, p): ert.checkout(b"HEAD") assert (p / "file").stat().st_mode == 0o100644 def test_revision_revision_data(self, swh_storage): target_rev = "0e8a3ad980ec179856012b7eecf4327e99cd44cd" dir = Directory( entries=( DirectoryEntry( name=b"submodule", type="rev", target=hashutil.hash_to_bytes(target_rev), perms=0o100644, ), ), ) swh_storage.directory_add([dir]) rev = Revision( message=b"", author=Person(name=None, email=None, fullname=b""), date=None, committer=Person(name=None, email=None, fullname=b""), committer_date=None, parents=(), type=RevisionType.GIT, directory=dir.id, metadata={}, synthetic=True, ) swh_storage.revision_add([rev]) with cook_stream_revision_gitfast(swh_storage, rev.id) as stream: pattern = "M 160000 {} submodule".format(target_rev).encode() assert pattern in stream.read() diff --git a/swh/vault/tests/test_git_bare_cooker.py b/swh/vault/tests/test_git_bare_cooker.py new file mode 100644 index 0000000..78d5aff --- /dev/null +++ b/swh/vault/tests/test_git_bare_cooker.py @@ -0,0 +1,181 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +""" +This module contains additional tests for the bare cooker. +Generic cooker tests (eg. without swh-graph) in test_cookers.py also +run on the bare cooker. +""" + +import datetime +import io +import subprocess +import tarfile +import tempfile +import unittest.mock + +import pytest + +from swh.graph.naive_client import NaiveClient as GraphClient +from swh.model.from_disk import DentryPerms +from swh.model.model import ( + Content, + Directory, + DirectoryEntry, + Person, + Revision, + RevisionType, + TimestampWithTimezone, +) +from swh.vault.cookers.git_bare import GitBareCooker +from swh.vault.in_memory_backend import InMemoryVaultBackend + + +def get_objects(last_revision_in_graph): + """ + Build objects:: + + rev1 <------ rev2 + | | + v v + dir1 dir2 + | / | + v / v + cnt1 <----° cnt2 + """ + date = TimestampWithTimezone.from_datetime( + datetime.datetime(2021, 5, 7, 8, 43, 59, tzinfo=datetime.timezone.utc) + ) + author = Person.from_fullname(b"Foo ") + cnt1 = Content.from_data(b"hello") + cnt2 = Content.from_data(b"world") + dir1 = Directory( + entries=( + DirectoryEntry( + name=b"file1", + type="file", + perms=DentryPerms.content, + target=cnt1.sha1_git, + ), + ) + ) + dir2 = Directory( + entries=( + DirectoryEntry( + name=b"file1", + type="file", + perms=DentryPerms.content, + target=cnt1.sha1_git, + ), + DirectoryEntry( + name=b"file2", + type="file", + perms=DentryPerms.content, + target=cnt2.sha1_git, + ), + ) + ) + rev1 = Revision( + message=b"msg1", + date=date, + committer_date=date, + author=author, + committer=author, + directory=dir1.id, + type=RevisionType.GIT, + synthetic=True, + ) + rev2 = Revision( + message=b"msg2", + date=date, + committer_date=date, + author=author, + committer=author, + directory=dir2.id, + parents=(rev1.id,), + type=RevisionType.GIT, + synthetic=True, + ) + + if last_revision_in_graph: + nodes = [str(n.swhid()) for n in [cnt1, cnt2, dir1, dir2, rev1, rev2]] + edges = [ + (str(s.swhid()), str(d.swhid())) + for (s, d) in [ + (dir1, cnt1), + (dir2, cnt1), + (dir2, cnt2), + (rev1, dir1), + (rev2, dir2), + (rev2, rev1), + ] + ] + else: + nodes = [str(n.swhid()) for n in [cnt1, cnt2, dir1, dir2, rev1]] + edges = [ + (str(s.swhid()), str(d.swhid())) + for (s, d) in [(dir1, cnt1), (dir2, cnt1), (dir2, cnt2), (rev1, dir1),] + ] + + return (cnt1, cnt2, dir1, dir2, rev1, rev2, nodes, edges) + + +@pytest.mark.parametrize("last_revision_in_graph", [True, False]) +def test_graph_revisions(swh_storage, last_revision_in_graph): + (cnt1, cnt2, dir1, dir2, rev1, rev2, nodes, edges) = get_objects( + last_revision_in_graph + ) + + # Add all objects to storage + swh_storage.content_add([cnt1, cnt2]) + swh_storage.directory_add([dir1, dir2]) + swh_storage.revision_add([rev1, rev2]) + + # Add spy on swh_storage, to make sure revision_log is not called + # (the graph must be used instead) + swh_storage = unittest.mock.MagicMock(wraps=swh_storage) + + # Add all objects to graph + swh_graph = unittest.mock.Mock(wraps=GraphClient(nodes=nodes, edges=edges)) + + # Cook + backend = InMemoryVaultBackend() + cooker = GitBareCooker( + "revision_gitbare", + rev2.id, + backend=backend, + storage=swh_storage, + graph=swh_graph, + ) + cooker.cook() + + # Get bundle + bundle = backend.fetch("revision_gitbare", rev2.id) + + # Extract bundle and make sure both revisions are in it + with tempfile.TemporaryDirectory("swh-vault-test-bare") as tempdir: + with tarfile.open(fileobj=io.BytesIO(bundle)) as tf: + tf.extractall(tempdir) + + output = subprocess.check_output( + [ + "git", + "-C", + f"{tempdir}/{rev2.swhid()}.git", + "log", + "--format=oneline", + "--decorate=", + ] + ) + + assert output.decode() == f"{rev2.id.hex()} msg2\n{rev1.id.hex()} msg1\n" + + # Make sure the graph was used instead of swh_storage.revision_log + swh_graph.visit_nodes.assert_called_once_with(str(rev2.swhid()), edges="rev:rev") + if last_revision_in_graph: + swh_storage.revision_log.assert_not_called() + swh_storage.revision_shortlog.assert_not_called() + else: + swh_storage.revision_log.assert_called()