diff --git a/setup.py b/setup.py index 0524cc5..d578799 100644 --- a/setup.py +++ b/setup.py @@ -1,73 +1,73 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from setuptools import setup, find_packages - -from os import path from io import open +from os import path + +from setuptools import find_packages, setup here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = "requirements-%s.txt" % name else: reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith("#"): continue requirements.append(line) return requirements setup( name="swh.vault", description="Software Heritage vault", long_description=long_description, long_description_content_type="text/markdown", python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/diffusion/DVAU/", packages=find_packages(), install_requires=parse_requirements() + parse_requirements("swh"), setup_requires=["setuptools-scm"], use_scm_version=True, extras_require={"testing": parse_requirements("test")}, include_package_data=True, zip_safe=False, entry_points=""" [console_scripts] swh-vault=swh.vault.cli:vault [swh.cli.subcommands] vault=swh.vault.cli:vault """, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ "Bug Reports": "https://forge.softwareheritage.org/maniphest", "Funding": "https://www.softwareheritage.org/donate", "Source": "https://forge.softwareheritage.org/source/swh-vault", "Documentation": "https://docs.softwareheritage.org/devel/swh-vault/", }, ) diff --git a/swh/vault/__init__.py b/swh/vault/__init__.py index 032e997..a39a171 100644 --- a/swh/vault/__init__.py +++ b/swh/vault/__init__.py @@ -1,41 +1,41 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import logging logger = logging.getLogger(__name__) def get_vault(cls="remote", args={}): """ Get a vault object of class `vault_class` with arguments `vault_args`. Args: vault (dict): dictionary with keys: - cls (str): vault's class, either 'remote' - args (dict): dictionary with keys Returns: an instance of VaultBackend (either local or remote) Raises: ValueError if passed an unknown storage class. """ if cls == "remote": from .api.client import RemoteVaultClient as Vault elif cls == "local": from swh.scheduler import get_scheduler from swh.storage import get_storage - from swh.vault.cache import VaultCache from swh.vault.backend import VaultBackend as Vault + from swh.vault.cache import VaultCache args["cache"] = VaultCache(**args["cache"]) args["storage"] = get_storage(**args["storage"]) args["scheduler"] = get_scheduler(**args["scheduler"]) else: raise ValueError("Unknown storage class `%s`" % cls) logger.debug("Instantiating %s with %s" % (Vault, args)) return Vault(**args) diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py index e8f7a01..79a2fd4 100644 --- a/swh/vault/api/client.py +++ b/swh/vault/api/client.py @@ -1,60 +1,59 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.model import hashutil from swh.core.api import RPCClient - +from swh.model import hashutil from swh.vault.exc import NotFoundExc class RemoteVaultClient(RPCClient): """Client to the Software Heritage vault cache.""" reraise_exceptions = [NotFoundExc] # Web API endpoints def fetch(self, obj_type, obj_id): hex_id = hashutil.hash_to_hex(obj_id) return self.get("fetch/{}/{}".format(obj_type, hex_id)) def cook(self, obj_type, obj_id, email=None): hex_id = hashutil.hash_to_hex(obj_id) return self.post( "cook/{}/{}".format(obj_type, hex_id), data={}, params=({"email": email} if email else None), ) def progress(self, obj_type, obj_id): hex_id = hashutil.hash_to_hex(obj_id) return self.get("progress/{}/{}".format(obj_type, hex_id)) # Cookers endpoints def set_progress(self, obj_type, obj_id, progress): hex_id = hashutil.hash_to_hex(obj_id) return self.post("set_progress/{}/{}".format(obj_type, hex_id), data=progress) def set_status(self, obj_type, obj_id, status): hex_id = hashutil.hash_to_hex(obj_id) return self.post("set_status/{}/{}".format(obj_type, hex_id), data=status) # TODO: handle streaming properly def put_bundle(self, obj_type, obj_id, bundle): hex_id = hashutil.hash_to_hex(obj_id) return self.post("put_bundle/{}/{}".format(obj_type, hex_id), data=bundle) def send_notif(self, obj_type, obj_id): hex_id = hashutil.hash_to_hex(obj_id) return self.post("send_notif/{}/{}".format(obj_type, hex_id), data=None) # Batch endpoints def batch_cook(self, batch): return self.post("batch_cook", data=batch) def batch_progress(self, batch_id): return self.get("batch_progress/{}".format(batch_id)) diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py index 1a8fe63..8837fa2 100644 --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -1,239 +1,236 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os -import aiohttp.web import asyncio import collections +import os + +import aiohttp.web from swh.core import config -from swh.core.api.asynchronous import ( - RPCServerApp, - encode_data_server as encode_data, - decode_request, -) +from swh.core.api.asynchronous import RPCServerApp, decode_request +from swh.core.api.asynchronous import encode_data_server as encode_data from swh.model import hashutil from swh.vault import get_vault -from swh.vault.cookers import COOKER_TYPES from swh.vault.backend import NotFoundExc - +from swh.vault.cookers import COOKER_TYPES DEFAULT_CONFIG_PATH = "vault/server" DEFAULT_CONFIG = { "storage": ("dict", {"cls": "remote", "args": {"url": "http://localhost:5002/",},}), "cache": ( "dict", { "cls": "pathslicing", "args": {"root": "/srv/softwareheritage/vault", "slicing": "0:1/1:5",}, }, ), "client_max_size": ("int", 1024 ** 3), "vault": ( "dict", {"cls": "local", "args": {"db": "dbname=softwareheritage-vault-dev",},}, ), "scheduler": ( "dict", {"cls": "remote", "args": {"url": "http://localhost:5008/",}}, ), } @asyncio.coroutine def index(request): return aiohttp.web.Response(body="SWH Vault API server") # Web API endpoints @asyncio.coroutine def vault_fetch(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] if not request.app["backend"].is_available(obj_type, obj_id): raise NotFoundExc(f"{obj_type} {obj_id} is not available.") return encode_data(request.app["backend"].fetch(obj_type, obj_id)) def user_info(task_info): return { "id": task_info["id"], "status": task_info["task_status"], "progress_message": task_info["progress_msg"], "obj_type": task_info["type"], "obj_id": hashutil.hash_to_hex(task_info["object_id"]), } @asyncio.coroutine def vault_cook(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] email = request.query.get("email") sticky = request.query.get("sticky") in ("true", "1") if obj_type not in COOKER_TYPES: raise NotFoundExc(f"{obj_type} is an unknown type.") info = request.app["backend"].cook_request( obj_type, obj_id, email=email, sticky=sticky ) # TODO: return 201 status (Created) once the api supports it return encode_data(user_info(info)) @asyncio.coroutine def vault_progress(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] info = request.app["backend"].task_info(obj_type, obj_id) if not info: raise NotFoundExc(f"{obj_type} {obj_id} was not found.") return encode_data(user_info(info)) # Cookers endpoints @asyncio.coroutine def set_progress(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] progress = yield from decode_request(request) request.app["backend"].set_progress(obj_type, obj_id, progress) return encode_data(True) # FIXME: success value? @asyncio.coroutine def set_status(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] status = yield from decode_request(request) request.app["backend"].set_status(obj_type, obj_id, status) return encode_data(True) # FIXME: success value? @asyncio.coroutine def put_bundle(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] # TODO: handle streaming properly content = yield from decode_request(request) request.app["backend"].cache.add(obj_type, obj_id, content) return encode_data(True) # FIXME: success value? @asyncio.coroutine def send_notif(request): obj_type = request.match_info["type"] obj_id = request.match_info["id"] request.app["backend"].send_all_notifications(obj_type, obj_id) return encode_data(True) # FIXME: success value? # Batch endpoints @asyncio.coroutine def batch_cook(request): batch = yield from decode_request(request) for obj_type, obj_id in batch: if obj_type not in COOKER_TYPES: raise NotFoundExc(f"{obj_type} is an unknown type.") batch_id = request.app["backend"].batch_cook(batch) return encode_data({"id": batch_id}) @asyncio.coroutine def batch_progress(request): batch_id = request.match_info["batch_id"] bundles = request.app["backend"].batch_info(batch_id) if not bundles: raise NotFoundExc(f"Batch {batch_id} does not exist.") bundles = [user_info(bundle) for bundle in bundles] counter = collections.Counter(b["status"] for b in bundles) res = { "bundles": bundles, "total": len(bundles), **{k: 0 for k in ("new", "pending", "done", "failed")}, **dict(counter), } return encode_data(res) # Web server def make_app(backend, **kwargs): app = RPCServerApp(**kwargs) app.router.add_route("GET", "/", index) app.client_exception_classes = (NotFoundExc,) # Endpoints used by the web API app.router.add_route("GET", "/fetch/{type}/{id}", vault_fetch) app.router.add_route("POST", "/cook/{type}/{id}", vault_cook) app.router.add_route("GET", "/progress/{type}/{id}", vault_progress) # Endpoints used by the Cookers app.router.add_route("POST", "/set_progress/{type}/{id}", set_progress) app.router.add_route("POST", "/set_status/{type}/{id}", set_status) app.router.add_route("POST", "/put_bundle/{type}/{id}", put_bundle) app.router.add_route("POST", "/send_notif/{type}/{id}", send_notif) # Endpoints for batch requests app.router.add_route("POST", "/batch_cook", batch_cook) app.router.add_route("GET", "/batch_progress/{batch_id}", batch_progress) app["backend"] = backend return app def get_local_backend(cfg): if "vault" not in cfg: raise ValueError("missing '%vault' configuration") vcfg = cfg["vault"] if vcfg["cls"] != "local": raise EnvironmentError( "The vault backend can only be started with a 'local' " "configuration", err=True, ) args = vcfg["args"] if "cache" not in args: args["cache"] = cfg.get("cache") if "storage" not in args: args["storage"] = cfg.get("storage") if "scheduler" not in args: args["scheduler"] = cfg.get("scheduler") for key in ("cache", "storage", "scheduler"): if not args.get(key): raise ValueError("invalid configuration; missing %s config entry." % key) return get_vault("local", args) def make_app_from_configfile(config_file=None, **kwargs): if config_file is None: config_file = DEFAULT_CONFIG_PATH config_file = os.environ.get("SWH_CONFIG_FILENAME", config_file) if os.path.isfile(config_file): cfg = config.read(config_file, DEFAULT_CONFIG) else: cfg = config.load_named_config(config_file, DEFAULT_CONFIG) vault = get_local_backend(cfg) return make_app(backend=vault, client_max_size=cfg["client_max_size"], **kwargs) if __name__ == "__main__": print("Deprecated. Use swh-vault ") diff --git a/swh/vault/backend.py b/swh/vault/backend.py index 2b2f00b..1974e9e 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,488 +1,487 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import smtplib from email.mime.text import MIMEText +import smtplib import psycopg2.extras import psycopg2.pool from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.model import hashutil from swh.scheduler.utils import create_oneshot_task_dict - from swh.vault.cookers import get_cooker_cls from swh.vault.exc import NotFoundExc cooking_task_name = "swh.vault.cooking_tasks.SWHCookingTask" NOTIF_EMAIL_FROM = '"Software Heritage Vault" ' "" NOTIF_EMAIL_SUBJECT_SUCCESS = "Bundle ready: {obj_type} {short_id}" NOTIF_EMAIL_SUBJECT_FAILURE = "Bundle failed: {obj_type} {short_id}" NOTIF_EMAIL_BODY_SUCCESS = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ NOTIF_EMAIL_BODY_FAILURE = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle could not be cooked for the following reason: {progress_msg} We apologize for the inconvenience. --\x20 The Software Heritage Developers """ def batch_to_bytes(batch): return [(obj_type, hashutil.hash_to_bytes(obj_id)) for obj_type, obj_id in batch] class VaultBackend: """ Backend for the Software Heritage vault. """ def __init__(self, db, cache, scheduler, storage=None, **config): self.config = config self.cache = cache self.scheduler = scheduler self.storage = storage self.smtp_server = smtplib.SMTP() self._pool = psycopg2.pool.ThreadedConnectionPool( config.get("min_pool_conns", 1), config.get("max_pool_conns", 10), db, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None def get_db(self): if self._db: return self._db return BaseDb.from_pool(self._pool) def put_db(self, db): if db is not self._db: db.put_conn() @db_transaction() def task_info(self, obj_type, obj_id, db=None, cur=None): """Fetch information from a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( """ SELECT id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s""", (obj_type, obj_id), ) res = cur.fetchone() if res: res["object_id"] = bytes(res["object_id"]) return res def _send_task(self, *args): """Send a cooking task to the celery scheduler""" task = create_oneshot_task_dict("cook-vault-bundle", *args) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]["id"] @db_transaction() def create_task(self, obj_type, obj_id, sticky=False, db=None, cur=None): """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) hex_id = hashutil.hash_to_hex(obj_id) cooker_class = get_cooker_cls(obj_type) cooker = cooker_class(obj_type, hex_id, backend=self, storage=self.storage) if not cooker.check_exists(): raise NotFoundExc("Object {} was not found.".format(hex_id)) cur.execute( """ INSERT INTO vault_bundle (type, object_id, sticky) VALUES (%s, %s, %s)""", (obj_type, obj_id, sticky), ) db.conn.commit() task_id = self._send_task(obj_type, hex_id) cur.execute( """ UPDATE vault_bundle SET task_id = %s WHERE type = %s AND object_id = %s""", (task_id, obj_type, obj_id), ) @db_transaction() def add_notif_email(self, obj_type, obj_id, email, db=None, cur=None): """Add an e-mail address to notify when a given bundle is ready""" obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( """ INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))""", (email, obj_type, obj_id), ) @db_transaction() def cook_request( self, obj_type, obj_id, *, sticky=False, email=None, db=None, cur=None ): """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" obj_id = hashutil.hash_to_bytes(obj_id) info = self.task_info(obj_type, obj_id) # If there's a failed bundle entry, delete it first. if info is not None and info["task_status"] == "failed": cur.execute( """DELETE FROM vault_bundle WHERE type = %s AND object_id = %s""", (obj_type, obj_id), ) db.conn.commit() info = None # If there's no bundle entry, create the task. if info is None: self.create_task(obj_type, obj_id, sticky) if email is not None: # If the task is already done, send the email directly if info is not None and info["task_status"] == "done": self.send_notification( None, email, obj_type, obj_id, info["task_status"] ) # Else, add it to the notification queue else: self.add_notif_email(obj_type, obj_id, email) info = self.task_info(obj_type, obj_id) return info @db_transaction() def batch_cook(self, batch, db=None, cur=None): """Cook a batch of bundles and returns the cooking id.""" # Import execute_values at runtime only, because it requires # psycopg2 >= 2.7 (only available on postgresql servers) from psycopg2.extras import execute_values cur.execute( """ INSERT INTO vault_batch (id) VALUES (DEFAULT) RETURNING id""" ) batch_id = cur.fetchone()["id"] batch = batch_to_bytes(batch) # Delete all failed bundles from the batch cur.execute( """ DELETE FROM vault_bundle WHERE task_status = 'failed' AND (type, object_id) IN %s""", (tuple(batch),), ) # Insert all the bundles, return the new ones execute_values( cur, """ INSERT INTO vault_bundle (type, object_id) VALUES %s ON CONFLICT DO NOTHING""", batch, ) # Get the bundle ids and task status cur.execute( """ SELECT id, type, object_id, task_id FROM vault_bundle WHERE (type, object_id) IN %s""", (tuple(batch),), ) bundles = cur.fetchall() # Insert the batch-bundle entries batch_id_bundle_ids = [(batch_id, row["id"]) for row in bundles] execute_values( cur, """ INSERT INTO vault_batch_bundle (batch_id, bundle_id) VALUES %s ON CONFLICT DO NOTHING""", batch_id_bundle_ids, ) db.conn.commit() # Get the tasks to fetch batch_new = [ (row["type"], bytes(row["object_id"])) for row in bundles if row["task_id"] is None ] # Send the tasks args_batch = [ (obj_type, hashutil.hash_to_hex(obj_id)) for obj_type, obj_id in batch_new ] # TODO: change once the scheduler handles priority tasks tasks = [ create_oneshot_task_dict("swh-vault-batch-cooking", *args) for args in args_batch ] added_tasks = self.scheduler.create_tasks(tasks) tasks_ids_bundle_ids = zip([task["id"] for task in added_tasks], batch_new) tasks_ids_bundle_ids = [ (task_id, obj_type, obj_id) for task_id, (obj_type, obj_id) in tasks_ids_bundle_ids ] # Update the task ids execute_values( cur, """ UPDATE vault_bundle SET task_id = s_task_id FROM (VALUES %s) AS sub (s_task_id, s_type, s_object_id) WHERE type = s_type::cook_type AND object_id = s_object_id """, tasks_ids_bundle_ids, ) return batch_id @db_transaction() def batch_info(self, batch_id, db=None, cur=None): """Fetch information from a batch of bundles""" cur.execute( """ SELECT vault_bundle.id as id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_batch_bundle LEFT JOIN vault_bundle ON vault_bundle.id = bundle_id WHERE batch_id = %s""", (batch_id,), ) res = cur.fetchall() if res: for d in res: d["object_id"] = bytes(d["object_id"]) return res @db_transaction() def is_available(self, obj_type, obj_id, db=None, cur=None): """Check whether a bundle is available for retrieval""" info = self.task_info(obj_type, obj_id, cur=cur) return ( info is not None and info["task_status"] == "done" and self.cache.is_cached(obj_type, obj_id) ) @db_transaction() def fetch(self, obj_type, obj_id, db=None, cur=None): """Retrieve a bundle from the cache""" if not self.is_available(obj_type, obj_id, cur=cur): return None self.update_access_ts(obj_type, obj_id, cur=cur) return self.cache.get(obj_type, obj_id) @db_transaction() def update_access_ts(self, obj_type, obj_id, db=None, cur=None): """Update the last access timestamp of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( """ UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s""", (obj_type, obj_id), ) @db_transaction() def set_status(self, obj_type, obj_id, status, db=None, cur=None): """Set the cooking status of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) req = ( """ UPDATE vault_bundle SET task_status = %s """ + (""", ts_done = NOW() """ if status == "done" else "") + """WHERE type = %s AND object_id = %s""" ) cur.execute(req, (status, obj_type, obj_id)) @db_transaction() def set_progress(self, obj_type, obj_id, progress, db=None, cur=None): """Set the cooking progress of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( """ UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s""", (progress, obj_type, obj_id), ) @db_transaction() def send_all_notifications(self, obj_type, obj_id, db=None, cur=None): """Send all the e-mails in the notification list of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cur.execute( """ SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s""", (obj_type, obj_id), ) for d in cur: self.send_notification( d["id"], d["email"], obj_type, obj_id, status=d["task_status"], progress_msg=d["progress_msg"], ) @db_transaction() def send_notification( self, n_id, email, obj_type, obj_id, status, progress_msg=None, db=None, cur=None, ): """Send the notification of a bundle to a specific e-mail""" hex_id = hashutil.hash_to_hex(obj_id) short_id = hex_id[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = "https://archive.softwareheritage.org/api/1/vault/{}/{}/" "raw".format( obj_type, hex_id ) if status == "done": text = NOTIF_EMAIL_BODY_SUCCESS.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_SUCCESS.format( obj_type=obj_type, short_id=short_id ) elif status == "failed": text = NOTIF_EMAIL_BODY_FAILURE.strip() text = text.format( obj_type=obj_type, hex_id=hex_id, progress_msg=progress_msg ) msg = MIMEText(text) msg["Subject"] = NOTIF_EMAIL_SUBJECT_FAILURE.format( obj_type=obj_type, short_id=short_id ) else: raise RuntimeError( "send_notification called on a '{}' bundle".format(status) ) msg["From"] = NOTIF_EMAIL_FROM msg["To"] = email self._smtp_send(msg) if n_id is not None: cur.execute( """ DELETE FROM vault_notif_email WHERE id = %s""", (n_id,), ) def _smtp_send(self, msg): # Reconnect if needed try: status = self.smtp_server.noop()[0] except smtplib.SMTPException: status = -1 if status != 250: self.smtp_server.connect("localhost", 25) # Send the message self.smtp_server.send_message(msg) @db_transaction() def _cache_expire(self, cond, *args, db=None, cur=None): """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cur.execute( """ DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, object_id """.format( cond ), args, ) for d in cur: self.cache.delete(d["type"], bytes(d["object_id"])) @db_transaction() def cache_expire_oldest(self, n=1, by="last_access", db=None, cur=None): """Expire the `n` oldest bundles""" assert by in ("created", "done", "last_access") filter = """ORDER BY ts_{} LIMIT {}""".format(by, n) return self._cache_expire(filter) @db_transaction() def cache_expire_until(self, date, by="last_access", db=None, cur=None): """Expire all the bundles until a certain date""" assert by in ("created", "done", "last_access") filter = """AND ts_{} <= %s""".format(by) return self._cache_expire(filter, date) diff --git a/swh/vault/cli.py b/swh/vault/cli.py index ffc6617..0e3f055 100644 --- a/swh/vault/cli.py +++ b/swh/vault/cli.py @@ -1,85 +1,85 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import click from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup - CFG_HELP = """Software Heritage Vault RPC server.""" @click.group(name="vault", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) @click.pass_context def vault(ctx): """Software Heritage Vault tools.""" pass @vault.command(name="rpc-serve", help=CFG_HELP) @click.option( "--config-file", "-C", default=None, metavar="CONFIGFILE", type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.option( "--no-stdout", is_flag=True, default=False, help="Do NOT output logs on the console" ) @click.option( "--host", default="0.0.0.0", metavar="IP", show_default=True, help="Host ip address to bind the server on", ) @click.option( "--port", default=5005, type=click.INT, metavar="PORT", help="Binding port of the server", ) @click.option( "--debug/--no-debug", default=True, help="Indicates if the server should run in debug mode", ) @click.pass_context def serve(ctx, config_file, no_stdout, host, port, debug): import aiohttp + from swh.scheduler.celery_backend.config import setup_log_handler from swh.vault.api.server import make_app_from_configfile ctx.ensure_object(dict) setup_log_handler( loglevel=ctx.obj.get("log_level", logging.INFO), colorize=False, format="[%(levelname)s] %(name)s -- %(message)s", log_console=not no_stdout, ) try: app = make_app_from_configfile(config_file, debug=debug) except EnvironmentError as e: click.echo(e.msg, err=True) ctx.exit(1) aiohttp.web.run_app(app, host=host, port=int(port)) def main(): logging.basicConfig() return serve(auto_envvar_prefix="SWH_VAULT") if __name__ == "__main__": main() diff --git a/swh/vault/cookers/__init__.py b/swh/vault/cookers/__init__.py index 717478e..474eb62 100644 --- a/swh/vault/cookers/__init__.py +++ b/swh/vault/cookers/__init__.py @@ -1,56 +1,57 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os -from swh.core.config import load_named_config, read as read_config +from swh.core.config import load_named_config +from swh.core.config import read as read_config from swh.storage import get_storage from swh.vault import get_vault -from swh.vault.cookers.base import DEFAULT_CONFIG_PATH, DEFAULT_CONFIG +from swh.vault.cookers.base import DEFAULT_CONFIG, DEFAULT_CONFIG_PATH from swh.vault.cookers.directory import DirectoryCooker from swh.vault.cookers.revision_flat import RevisionFlatCooker from swh.vault.cookers.revision_gitfast import RevisionGitfastCooker COOKER_TYPES = { "directory": DirectoryCooker, "revision_flat": RevisionFlatCooker, "revision_gitfast": RevisionGitfastCooker, } def get_cooker_cls(obj_type): return COOKER_TYPES[obj_type] def get_cooker(obj_type, obj_id): if "SWH_CONFIG_FILENAME" in os.environ: cfg = read_config(os.environ["SWH_CONFIG_FILENAME"], DEFAULT_CONFIG) else: cfg = load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG) cooker_cls = get_cooker_cls(obj_type) if "vault" not in cfg: raise ValueError("missing '%vault' configuration") vcfg = cfg["vault"] if vcfg["cls"] != "remote": raise EnvironmentError( "This vault backend can only be a 'remote' " "configuration", err=True ) args = vcfg["args"] if "storage" not in args: args["storage"] = cfg.get("storage") if not args.get("storage"): raise ValueError("invalid configuration; missing 'storage' config entry.") storage = get_storage(**args.pop("storage")) backend = get_vault(**vcfg) return cooker_cls( obj_type, obj_id, backend=backend, storage=storage, max_bundle_size=cfg["max_bundle_size"], ) diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index 5357735..657ddba 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,138 +1,138 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import logging +from typing import Optional from psycopg2.extensions import QueryCanceledError -from typing import Optional from swh.model import hashutil MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB DEFAULT_CONFIG_PATH = "vault/cooker" DEFAULT_CONFIG = { "storage": ("dict", {"cls": "remote", "args": {"url": "http://localhost:5002/",},}), "vault": ("dict", {"cls": "remote", "args": {"url": "http://localhost:5005/",},}), "max_bundle_size": ("int", MAX_BUNDLE_SIZE), } class PolicyError(Exception): """Raised when the bundle violates the cooking policy.""" pass class BundleTooLargeError(PolicyError): """Raised when the bundle is too large to be cooked.""" pass class BytesIOBundleSizeLimit(io.BytesIO): def __init__(self, *args, size_limit=None, **kwargs): super().__init__(*args, **kwargs) self.size_limit = size_limit def write(self, chunk): if ( self.size_limit is not None and self.getbuffer().nbytes + len(chunk) > self.size_limit ): raise BundleTooLargeError( "The requested bundle exceeds the maximum allowed " "size of {} bytes.".format(self.size_limit) ) return super().write(chunk) class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle """ CACHE_TYPE_KEY = None # type: Optional[str] def __init__( self, obj_type, obj_id, backend, storage, max_bundle_size=MAX_BUNDLE_SIZE ): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: obj_type: type of the object to be cooked into a bundle (directory, revision_flat or revision_gitfast; see swh.vault.cooker.COOKER_TYPES). obj_id: id of the object to be cooked into a bundle. backend: the vault backend (swh.vault.backend.VaultBackend). """ self.obj_type = obj_type self.obj_id = hashutil.hash_to_bytes(obj_id) self.backend = backend self.storage = storage self.max_bundle_size = max_bundle_size @abc.abstractmethod def check_exists(self): """Checks that the requested object exists and can be cooked. Override this in the cooker implementation. """ raise NotImplementedError @abc.abstractmethod def prepare_bundle(self): """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplementedError def write(self, chunk): self.fileobj.write(chunk) def cook(self): """Cook the requested object into a bundle """ self.backend.set_status(self.obj_type, self.obj_id, "pending") self.backend.set_progress(self.obj_type, self.obj_id, "Processing...") self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) try: try: self.prepare_bundle() except QueryCanceledError: raise PolicyError( "Timeout reached while assembling the requested bundle" ) bundle = self.fileobj.getvalue() # TODO: use proper content streaming instead of put_bundle() self.backend.put_bundle(self.CACHE_TYPE_KEY, self.obj_id, bundle) except PolicyError as e: self.backend.set_status(self.obj_type, self.obj_id, "failed") self.backend.set_progress(self.obj_type, self.obj_id, str(e)) except Exception: self.backend.set_status(self.obj_type, self.obj_id, "failed") self.backend.set_progress( self.obj_type, self.obj_id, "Internal Server Error. This incident will be reported.", ) logging.exception("Bundle cooking failed.") else: self.backend.set_status(self.obj_type, self.obj_id, "done") self.backend.set_progress(self.obj_type, self.obj_id, None) finally: self.backend.send_notif(self.obj_type, self.obj_id) diff --git a/swh/vault/cookers/revision_flat.py b/swh/vault/cookers/revision_flat.py index f310404..f66ae77 100644 --- a/swh/vault/cookers/revision_flat.py +++ b/swh/vault/cookers/revision_flat.py @@ -1,35 +1,35 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from pathlib import Path import tarfile import tempfile -from pathlib import Path from swh.model import hashutil from swh.vault.cookers.base import BaseVaultCooker from swh.vault.cookers.utils import revision_log from swh.vault.to_disk import DirectoryBuilder class RevisionFlatCooker(BaseVaultCooker): """Cooker to create a revision_flat bundle """ CACHE_TYPE_KEY = "revision_flat" def check_exists(self): return not list(self.storage.revision_missing([self.obj_id])) def prepare_bundle(self): with tempfile.TemporaryDirectory(prefix="tmp-vault-revision-") as td: root = Path(td) for revision in revision_log(self.storage, self.obj_id): revdir = root / hashutil.hash_to_hex(revision["id"]) revdir.mkdir() directory_builder = DirectoryBuilder( self.storage, str(revdir).encode(), revision["directory"] ) directory_builder.build() with tarfile.open(fileobj=self.fileobj, mode="w:gz") as tar: tar.add(td, arcname=hashutil.hash_to_hex(self.obj_id)) diff --git a/swh/vault/cookers/revision_gitfast.py b/swh/vault/cookers/revision_gitfast.py index 22e2b92..a47d0d8 100644 --- a/swh/vault/cookers/revision_gitfast.py +++ b/swh/vault/cookers/revision_gitfast.py @@ -1,219 +1,219 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import os import time import zlib from fastimport.commands import ( - CommitCommand, - ResetCommand, BlobCommand, + CommitCommand, FileDeleteCommand, FileModifyCommand, + ResetCommand, ) from swh.model import hashutil -from swh.model.toposort import toposort from swh.model.from_disk import mode_to_perms +from swh.model.toposort import toposort from swh.vault.cookers.base import BaseVaultCooker from swh.vault.cookers.utils import revision_log from swh.vault.to_disk import get_filtered_files_content class RevisionGitfastCooker(BaseVaultCooker): """Cooker to create a git fast-import bundle """ CACHE_TYPE_KEY = "revision_gitfast" def check_exists(self): return not list(self.storage.revision_missing([self.obj_id])) def prepare_bundle(self): self.log = list(toposort(revision_log(self.storage, self.obj_id))) self.gzobj = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) self.fastexport() self.write(self.gzobj.flush()) def write_cmd(self, cmd): chunk = bytes(cmd) + b"\n" super().write(self.gzobj.compress(chunk)) def fastexport(self): """Generate all the git fast-import commands from a given log. """ self.rev_by_id = {r["id"]: r for r in self.log} self.obj_done = set() self.obj_to_mark = {} self.next_available_mark = 1 last_progress_report = None for i, rev in enumerate(self.log, 1): # Update progress if needed ct = time.time() if last_progress_report is None or last_progress_report + 2 <= ct: last_progress_report = ct pg = "Computing revision {}/{}".format(i, len(self.log)) self.backend.set_progress(self.obj_type, self.obj_id, pg) # Compute the current commit self._compute_commit_command(rev) def mark(self, obj_id): """Get the mark ID as bytes of a git object. If the object has not yet been marked, assign a new ID and add it to the mark dictionary. """ if obj_id not in self.obj_to_mark: self.obj_to_mark[obj_id] = self.next_available_mark self.next_available_mark += 1 return str(self.obj_to_mark[obj_id]).encode() def _compute_blob_command_content(self, file_data): """Compute the blob command of a file entry if it has not been computed yet. """ obj_id = file_data["sha1"] if obj_id in self.obj_done: return contents = list(get_filtered_files_content(self.storage, [file_data])) content = contents[0]["content"] self.write_cmd(BlobCommand(mark=self.mark(obj_id), data=content)) self.obj_done.add(obj_id) def _author_tuple_format(self, author, date): # We never want to have None values here so we replace null entries # by ''. if author is not None: author_tuple = (author.get("name") or b"", author.get("email") or b"") else: author_tuple = (b"", b"") if date is not None: date_tuple = ( date.get("timestamp", {}).get("seconds") or 0, (date.get("offset") or 0) * 60, ) else: date_tuple = (0, 0) return author_tuple + date_tuple def _compute_commit_command(self, rev): """Compute a commit command from a specific revision. """ if "parents" in rev and rev["parents"]: from_ = b":" + self.mark(rev["parents"][0]) merges = [b":" + self.mark(r) for r in rev["parents"][1:]] parent = self.rev_by_id[rev["parents"][0]] else: # We issue a reset command before all the new roots so that they # are not automatically added as children of the current branch. self.write_cmd(ResetCommand(b"refs/heads/master", None)) from_ = None merges = None parent = None # Retrieve the file commands while yielding new blob commands if # needed. files = list(self._compute_file_commands(rev, parent)) # Construct and write the commit command author = self._author_tuple_format(rev["author"], rev["date"]) committer = self._author_tuple_format(rev["committer"], rev["committer_date"]) self.write_cmd( CommitCommand( ref=b"refs/heads/master", mark=self.mark(rev["id"]), author=author, committer=committer, message=rev["message"] or b"", from_=from_, merges=merges, file_iter=files, ) ) @functools.lru_cache(maxsize=4096) def _get_dir_ents(self, dir_id=None): """Get the entities of a directory as a dictionary (name -> entity). This function has a cache to avoid doing multiple requests to retrieve the same entities, as doing a directory_ls() is expensive. """ data = self.storage.directory_ls(dir_id) if dir_id is not None else [] return {f["name"]: f for f in data} def _compute_file_commands(self, rev, parent=None): """Compute all the file commands of a revision. Generate a diff of the files between the revision and its main parent to find the necessary file commands to apply. """ # Initialize the stack with the root of the tree. cur_dir = rev["directory"] parent_dir = parent["directory"] if parent else None stack = [(b"", cur_dir, parent_dir)] while stack: # Retrieve the current directory and the directory of the parent # commit in order to compute the diff of the trees. root, cur_dir_id, prev_dir_id = stack.pop() cur_dir = self._get_dir_ents(cur_dir_id) prev_dir = self._get_dir_ents(prev_dir_id) # Find subtrees to delete: # - Subtrees that are not in the new tree (file or directory # deleted). # - Subtrees that do not have the same type in the new tree # (file -> directory or directory -> file) # After this step, every node remaining in the previous directory # has the same type than the one in the current directory. for fname, f in prev_dir.items(): if fname not in cur_dir or f["type"] != cur_dir[fname]["type"]: yield FileDeleteCommand(path=os.path.join(root, fname)) # Find subtrees to modify: # - Leaves (files) will be added or modified using `filemodify` # - Other subtrees (directories) will be added to the stack and # processed in the next iteration. for fname, f in cur_dir.items(): # A file is added or modified if it was not in the tree, if its # permissions changed or if its content changed. if f["type"] == "file" and ( fname not in prev_dir or f["sha1"] != prev_dir[fname]["sha1"] or f["perms"] != prev_dir[fname]["perms"] ): # Issue a blob command for the new blobs if needed. self._compute_blob_command_content(f) yield FileModifyCommand( path=os.path.join(root, fname), mode=mode_to_perms(f["perms"]).value, dataref=(b":" + self.mark(f["sha1"])), data=None, ) # A revision is added or modified if it was not in the tree or # if its target changed elif f["type"] == "rev" and ( fname not in prev_dir or f["target"] != prev_dir[fname]["target"] ): yield FileModifyCommand( path=os.path.join(root, fname), mode=0o160000, dataref=hashutil.hash_to_hex(f["target"]).encode(), data=None, ) # A directory is added or modified if it was not in the tree or # if its target changed. elif f["type"] == "dir": f_prev_target = None if fname in prev_dir and prev_dir[fname]["type"] == "dir": f_prev_target = prev_dir[fname]["target"] if f_prev_target is None or f["target"] != f_prev_target: stack.append( (os.path.join(root, fname), f["target"], f_prev_target) ) diff --git a/swh/vault/tests/__init__.py b/swh/vault/tests/__init__.py index cd10fe1..5d5c665 100644 --- a/swh/vault/tests/__init__.py +++ b/swh/vault/tests/__init__.py @@ -1,5 +1,5 @@ from os import path -import swh.vault +import swh.vault SQL_DIR = path.join(path.dirname(swh.vault.__file__), "sql") diff --git a/swh/vault/tests/conftest.py b/swh/vault/tests/conftest.py index f68956a..23702ad 100644 --- a/swh/vault/tests/conftest.py +++ b/swh/vault/tests/conftest.py @@ -1,80 +1,80 @@ -import pytest import glob import os + import pkg_resources.extern.packaging.version +import pytest +from pytest_postgresql import factories from swh.core.utils import numfile_sortkey as sortkey +from swh.storage.tests import SQL_DIR as STORAGE_SQL_DIR from swh.vault import get_vault from swh.vault.tests import SQL_DIR -from swh.storage.tests import SQL_DIR as STORAGE_SQL_DIR -from pytest_postgresql import factories - os.environ["LC_ALL"] = "C.UTF-8" pytest_v = pkg_resources.get_distribution("pytest").parsed_version if pytest_v < pkg_resources.extern.packaging.version.parse("3.9"): @pytest.fixture def tmp_path(request): - import tempfile import pathlib + import tempfile with tempfile.TemporaryDirectory() as tmpdir: yield pathlib.Path(tmpdir) def db_url(name, postgresql_proc): return "postgresql://{user}@{host}:{port}/{dbname}".format( host=postgresql_proc.host, port=postgresql_proc.port, user="postgres", dbname=name, ) postgresql2 = factories.postgresql("postgresql_proc", "tests2") @pytest.fixture def swh_vault(request, postgresql_proc, postgresql, postgresql2, tmp_path): for sql_dir, pg in ((SQL_DIR, postgresql), (STORAGE_SQL_DIR, postgresql2)): dump_files = os.path.join(sql_dir, "*.sql") all_dump_files = sorted(glob.glob(dump_files), key=sortkey) cursor = pg.cursor() for fname in all_dump_files: with open(fname) as fobj: # disable concurrent index creation since we run in a # transaction cursor.execute(fobj.read().replace("concurrently", "")) pg.commit() vault_config = { "db": db_url("tests", postgresql_proc), "storage": { "cls": "local", "db": db_url("tests2", postgresql_proc), "objstorage": { "cls": "pathslicing", "args": {"root": str(tmp_path), "slicing": "0:1/1:5",}, }, }, "cache": { "cls": "pathslicing", "args": { "root": str(tmp_path), "slicing": "0:1/1:5", "allow_delete": True, }, }, "scheduler": {"cls": "remote", "args": {"url": "http://swh-scheduler:5008",},}, } return get_vault("local", vault_config) @pytest.fixture def swh_storage(swh_vault): return swh_vault.storage diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py index 05d0499..c699e55 100644 --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -1,336 +1,336 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime -import psycopg2 +from unittest.mock import MagicMock, patch -from unittest.mock import patch, MagicMock +import psycopg2 import pytest from swh.model import hashutil from swh.vault.tests.vault_testing import hash_content @contextlib.contextmanager def mock_cooking(vault_backend): with patch.object(vault_backend, "_send_task") as mt: mt.return_value = 42 with patch("swh.vault.backend.get_cooker_cls") as mg: mcc = MagicMock() mc = MagicMock() mg.return_value = mcc mcc.return_value = mc mc.check_exists.return_value = True yield { "_send_task": mt, "get_cooker_cls": mg, "cooker_cls": mcc, "cooker": mc, } def assertTimestampAlmostNow(ts, tolerance_secs=1.0): # noqa now = datetime.datetime.now(datetime.timezone.utc) creation_delta_secs = (ts - now).total_seconds() assert creation_delta_secs < tolerance_secs def fake_cook(backend, obj_type, result_content, sticky=False): content, obj_id = hash_content(result_content) with mock_cooking(backend): backend.create_task(obj_type, obj_id, sticky) backend.cache.add(obj_type, obj_id, b"content") backend.set_status(obj_type, obj_id, "done") return obj_id, content def fail_cook(backend, obj_type, obj_id, failure_reason): with mock_cooking(backend): backend.create_task(obj_type, obj_id) backend.set_status(obj_type, obj_id, "failed") backend.set_progress(obj_type, obj_id, failure_reason) TEST_TYPE = "revision_gitfast" TEST_HEX_ID = "4a4b9771542143cf070386f86b4b92d42966bdbc" TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) TEST_PROGRESS = ( "Mr. White, You're telling me you're cooking again?" " \N{ASTONISHED FACE} " ) TEST_EMAIL = "ouiche@lorraine.fr" def test_create_task_simple(swh_vault): with mock_cooking(swh_vault) as m: swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) m["get_cooker_cls"].assert_called_once_with(TEST_TYPE) args = m["cooker_cls"].call_args[0] assert args[0] == TEST_TYPE assert args[1] == TEST_HEX_ID assert m["cooker"].check_exists.call_count == 1 assert m["_send_task"].call_count == 1 args = m["_send_task"].call_args[0] assert args[0] == TEST_TYPE assert args[1] == TEST_HEX_ID info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info["object_id"] == TEST_OBJ_ID assert info["type"] == TEST_TYPE assert info["task_status"] == "new" assert info["task_id"] == 42 assertTimestampAlmostNow(info["ts_created"]) assert info["ts_done"] is None assert info["progress_msg"] is None def test_create_fail_duplicate_task(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) with pytest.raises(psycopg2.IntegrityError): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) def test_create_fail_nonexisting_object(swh_vault): with mock_cooking(swh_vault) as m: m["cooker"].check_exists.side_effect = ValueError("Nothing here.") with pytest.raises(ValueError): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) def test_create_set_progress(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info["progress_msg"] is None swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, TEST_PROGRESS) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info["progress_msg"] == TEST_PROGRESS def test_create_set_status(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info["task_status"] == "new" assert info["ts_done"] is None swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "pending") info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info["task_status"] == "pending" assert info["ts_done"] is None swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "done") info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info["task_status"] == "done" assertTimestampAlmostNow(info["ts_done"]) def test_create_update_access_ts(swh_vault): with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_1 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_1) swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_2 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_2) swh_vault.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_3 = info["ts_last_access"] assertTimestampAlmostNow(access_ts_3) assert access_ts_1 < access_ts_2 assert access_ts_2 < access_ts_3 def test_cook_request_idempotent(swh_vault): with mock_cooking(swh_vault): info1 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) info2 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) info3 = swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) assert info1 == info2 assert info1 == info3 def test_cook_email_pending_done(swh_vault): with mock_cooking(swh_vault), patch.object( swh_vault, "add_notif_email" ) as madd, patch.object(swh_vault, "send_notification") as msend: swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) madd.assert_not_called() msend.assert_not_called() madd.reset_mock() msend.reset_mock() swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) msend.assert_not_called() madd.reset_mock() msend.reset_mock() swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "done") swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_OBJ_ID, "done") madd.assert_not_called() def test_send_all_emails(swh_vault): with mock_cooking(swh_vault): emails = ("a@example.com", "billg@example.com", "test+42@example.org") for email in emails: swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email=email) swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "done") with patch.object(swh_vault, "smtp_server") as m: swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) sent_emails = {k[0][0] for k in m.send_message.call_args_list} assert {k["To"] for k in sent_emails} == set(emails) for e in sent_emails: assert "bot@softwareheritage.org" in e["From"] assert TEST_TYPE in e["Subject"] assert TEST_HEX_ID[:5] in e["Subject"] assert TEST_TYPE in str(e) assert "https://archive.softwareheritage.org/" in str(e) assert TEST_HEX_ID[:5] in str(e) assert "--\x20\n" in str(e) # Well-formated signature!!! # Check that the entries have been deleted and recalling the # function does not re-send the e-mails m.reset_mock() swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) m.assert_not_called() def test_available(swh_vault): assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) with mock_cooking(swh_vault): swh_vault.create_task(TEST_TYPE, TEST_OBJ_ID) assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) swh_vault.cache.add(TEST_TYPE, TEST_OBJ_ID, b"content") assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "done") assert swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) def test_fetch(swh_vault): assert swh_vault.fetch(TEST_TYPE, TEST_OBJ_ID) is None obj_id, content = fake_cook(swh_vault, TEST_TYPE, b"content") info = swh_vault.task_info(TEST_TYPE, obj_id) access_ts_before = info["ts_last_access"] assert swh_vault.fetch(TEST_TYPE, obj_id) == b"content" info = swh_vault.task_info(TEST_TYPE, obj_id) access_ts_after = info["ts_last_access"] assertTimestampAlmostNow(access_ts_after) assert access_ts_before < access_ts_after def test_cache_expire_oldest(swh_vault): r = range(1, 10) inserted = {} for i in r: sticky = i == 5 content = b"content%s" % str(i).encode() obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) swh_vault.cache_expire_oldest(n=4) should_be_still_here = {2, 3, 5, 8, 9} for i in r: assert swh_vault.is_available(TEST_TYPE, inserted[i][0]) == ( i in should_be_still_here ) def test_cache_expire_until(swh_vault): r = range(1, 10) inserted = {} for i in r: sticky = i == 5 content = b"content%s" % str(i).encode() obj_id, content = fake_cook(swh_vault, TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) if i == 7: cutoff_date = datetime.datetime.now() swh_vault.update_access_ts(TEST_TYPE, inserted[2][0]) swh_vault.update_access_ts(TEST_TYPE, inserted[3][0]) swh_vault.cache_expire_until(date=cutoff_date) should_be_still_here = {2, 3, 5, 8, 9} for i in r: assert swh_vault.is_available(TEST_TYPE, inserted[i][0]) == ( i in should_be_still_here ) def test_fail_cook_simple(swh_vault): fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, "error42") assert not swh_vault.is_available(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info["progress_msg"] == "error42" def test_send_failure_email(swh_vault): with mock_cooking(swh_vault): swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID, email="a@example.com") swh_vault.set_status(TEST_TYPE, TEST_OBJ_ID, "failed") swh_vault.set_progress(TEST_TYPE, TEST_OBJ_ID, "test error") with patch.object(swh_vault, "smtp_server") as m: swh_vault.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) e = [k[0][0] for k in m.send_message.call_args_list][0] assert e["To"] == "a@example.com" assert "bot@softwareheritage.org" in e["From"] assert TEST_TYPE in e["Subject"] assert TEST_HEX_ID[:5] in e["Subject"] assert "fail" in e["Subject"] assert TEST_TYPE in str(e) assert TEST_HEX_ID[:5] in str(e) assert "test error" in str(e) assert "--\x20\n" in str(e) # Well-formated signature def test_retry_failed_bundle(swh_vault): fail_cook(swh_vault, TEST_TYPE, TEST_OBJ_ID, "error42") info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info["task_status"] == "failed" with mock_cooking(swh_vault): swh_vault.cook_request(TEST_TYPE, TEST_OBJ_ID) info = swh_vault.task_info(TEST_TYPE, TEST_OBJ_ID) assert info["task_status"] == "new" diff --git a/swh/vault/tests/test_cookers.py b/swh/vault/tests/test_cookers.py index 93cb8c6..b05e381 100644 --- a/swh/vault/tests/test_cookers.py +++ b/swh/vault/tests/test_cookers.py @@ -1,560 +1,559 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime import gzip import io import os import pathlib import subprocess import tarfile import tempfile import unittest import unittest.mock import dulwich.fastexport import dulwich.index import dulwich.objects import dulwich.porcelain import dulwich.repo from swh.loader.git.from_disk import GitLoaderFromDisk -from swh.model import hashutil -from swh.model import from_disk +from swh.model import from_disk, hashutil from swh.model.model import Directory, DirectoryEntry, Person, Revision, RevisionType from swh.vault.cookers import DirectoryCooker, RevisionGitfastCooker from swh.vault.tests.vault_testing import hash_content -from swh.vault.to_disk import SKIPPED_MESSAGE, HIDDEN_MESSAGE +from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE class TestRepo: """A tiny context manager for a test git repository, with some utility functions to perform basic git stuff. """ def __enter__(self): self.tmp_dir = tempfile.TemporaryDirectory(prefix="tmp-vault-repo-") self.repo_dir = self.tmp_dir.__enter__() self.repo = dulwich.repo.Repo.init(self.repo_dir) self.author_name = b"Test Author" self.author_email = b"test@softwareheritage.org" self.author = b"%s <%s>" % (self.author_name, self.author_email) self.base_date = 258244200 self.counter = 0 return pathlib.Path(self.repo_dir) def __exit__(self, exc, value, tb): self.tmp_dir.__exit__(exc, value, tb) def checkout(self, rev_sha): rev = self.repo[rev_sha] dulwich.index.build_index_from_tree( self.repo_dir, self.repo.index_path(), self.repo.object_store, rev.tree ) def git_shell(self, *cmd, stdout=subprocess.DEVNULL, **kwargs): name = self.author_name email = self.author_email date = "%d +0000" % (self.base_date + self.counter) env = { # Set git commit format "GIT_AUTHOR_NAME": name, "GIT_AUTHOR_EMAIL": email, "GIT_AUTHOR_DATE": date, "GIT_COMMITTER_NAME": name, "GIT_COMMITTER_EMAIL": email, "GIT_COMMITTER_DATE": date, # Ignore all the system-wide and user configurations "GIT_CONFIG_NOSYSTEM": "1", "HOME": str(self.tmp_dir), "XDG_CONFIG_HOME": str(self.tmp_dir), } kwargs.setdefault("env", {}).update(env) subprocess.check_call( ("git", "-C", self.repo_dir) + cmd, stdout=stdout, **kwargs ) def commit(self, message="Commit test\n", ref=b"HEAD"): """Commit the current working tree in a new commit with message on the branch 'ref'. At the end of the commit, the reference should stay the same and the index should be clean. """ self.git_shell("add", ".") message = message.encode() + b"\n" ret = self.repo.do_commit( message=message, committer=self.author, commit_timestamp=self.base_date + self.counter, commit_timezone=0, ref=ref, ) self.counter += 1 # committing on another branch leaves # dangling files in index if ref != b"HEAD": # XXX this should work (but does not) # dulwich.porcelain.reset(self.repo, 'hard') self.git_shell("reset", "--hard", "HEAD") return ret def merge(self, parent_sha_list, message="Merge branches."): self.git_shell( "merge", "--allow-unrelated-histories", "-m", message, *[p.decode() for p in parent_sha_list], ) self.counter += 1 return self.repo.refs[b"HEAD"] def print_debug_graph(self, reflog=False): args = ["log", "--all", "--graph", "--decorate"] if reflog: args.append("--reflog") self.git_shell(*args, stdout=None) def git_loader( storage, repo_path, visit_date=datetime.datetime.now(datetime.timezone.utc) ): """Instantiate a Git Loader using the storage instance as storage. """ loader = GitLoaderFromDisk( "fake_origin", directory=repo_path, visit_date=visit_date ) loader.storage = storage return loader @contextlib.contextmanager def cook_extract_directory(storage, obj_id): """Context manager that cooks a directory and extract it.""" backend = unittest.mock.MagicMock() backend.storage = storage cooker = DirectoryCooker("directory", obj_id, backend=backend, storage=storage) cooker.fileobj = io.BytesIO() assert cooker.check_exists() cooker.prepare_bundle() cooker.fileobj.seek(0) with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: tar.extractall(td) yield pathlib.Path(td) / hashutil.hash_to_hex(obj_id) cooker.storage = None @contextlib.contextmanager def cook_stream_revision_gitfast(storage, obj_id): """Context manager that cooks a revision and stream its fastexport.""" backend = unittest.mock.MagicMock() backend.storage = storage cooker = RevisionGitfastCooker( "revision_gitfast", obj_id, backend=backend, storage=storage ) cooker.fileobj = io.BytesIO() assert cooker.check_exists() cooker.prepare_bundle() cooker.fileobj.seek(0) fastexport_stream = gzip.GzipFile(fileobj=cooker.fileobj) yield fastexport_stream cooker.storage = None @contextlib.contextmanager def cook_extract_revision_gitfast(storage, obj_id): """Context manager that cooks a revision and extract it.""" test_repo = TestRepo() with cook_stream_revision_gitfast(storage, obj_id) as stream, test_repo as p: processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) processor.import_stream(stream) yield test_repo, p TEST_CONTENT = ( " test content\n" "and unicode \N{BLACK HEART SUIT}\n" " and trailing spaces " ) TEST_EXECUTABLE = b"\x42\x40\x00\x00\x05" class TestDirectoryCooker: def test_directory_simple(self, swh_storage): repo = TestRepo() with repo as rp: (rp / "file").write_text(TEST_CONTENT) (rp / "executable").write_bytes(TEST_EXECUTABLE) (rp / "executable").chmod(0o755) (rp / "link").symlink_to("file") (rp / "dir1/dir2").mkdir(parents=True) (rp / "dir1/dir2/file").write_text(TEST_CONTENT) c = repo.commit() loader = git_loader(swh_storage, str(rp)) loader.load() obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with cook_extract_directory(swh_storage, obj_id) as p: assert (p / "file").stat().st_mode == 0o100644 assert (p / "file").read_text() == TEST_CONTENT assert (p / "executable").stat().st_mode == 0o100755 assert (p / "executable").read_bytes() == TEST_EXECUTABLE assert (p / "link").is_symlink assert os.readlink(str(p / "link")) == "file" assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT directory = from_disk.Directory.from_disk(path=bytes(p)) assert obj_id_hex == hashutil.hash_to_hex(directory.hash) def test_directory_filtered_objects(self, swh_storage): repo = TestRepo() with repo as rp: file_1, id_1 = hash_content(b"test1") file_2, id_2 = hash_content(b"test2") file_3, id_3 = hash_content(b"test3") (rp / "file").write_bytes(file_1) (rp / "hidden_file").write_bytes(file_2) (rp / "absent_file").write_bytes(file_3) c = repo.commit() loader = git_loader(swh_storage, str(rp)) loader.load() obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) # FIXME: storage.content_update() should be changed to allow things # like that with swh_storage.get_db().transaction() as cur: cur.execute( """update content set status = 'visible' where sha1 = %s""", (id_1,), ) cur.execute( """update content set status = 'hidden' where sha1 = %s""", (id_2,), ) cur.execute( """update content set status = 'absent' where sha1 = %s""", (id_3,), ) with cook_extract_directory(swh_storage, obj_id) as p: assert (p / "file").read_bytes() == b"test1" assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE def test_directory_bogus_perms(self, swh_storage): # Some early git repositories have 664/775 permissions... let's check # if all the weird modes are properly normalized in the directory # cooker. repo = TestRepo() with repo as rp: (rp / "file").write_text(TEST_CONTENT) (rp / "file").chmod(0o664) (rp / "executable").write_bytes(TEST_EXECUTABLE) (rp / "executable").chmod(0o775) (rp / "wat").write_text(TEST_CONTENT) (rp / "wat").chmod(0o604) c = repo.commit() loader = git_loader(swh_storage, str(rp)) loader.load() obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with cook_extract_directory(swh_storage, obj_id) as p: assert (p / "file").stat().st_mode == 0o100644 assert (p / "executable").stat().st_mode == 0o100755 assert (p / "wat").stat().st_mode == 0o100644 def test_directory_revision_data(self, swh_storage): target_rev = "0e8a3ad980ec179856012b7eecf4327e99cd44cd" dir = Directory( entries=( DirectoryEntry( name=b"submodule", type="rev", target=hashutil.hash_to_bytes(target_rev), perms=0o100644, ), ), ) swh_storage.directory_add([dir]) with cook_extract_directory(swh_storage, dir.id) as p: assert (p / "submodule").is_symlink() assert os.readlink(str(p / "submodule")) == target_rev class TestRevisionGitfastCooker: def test_revision_simple(self, swh_storage): # # 1--2--3--4--5--6--7 # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) repo.commit("add file1") (rp / "file2").write_text(TEST_CONTENT) repo.commit("add file2") (rp / "dir1/dir2").mkdir(parents=True) (rp / "dir1/dir2/file").write_text(TEST_CONTENT) repo.commit("add dir1/dir2/file") (rp / "bin1").write_bytes(TEST_EXECUTABLE) (rp / "bin1").chmod(0o755) repo.commit("add bin1") (rp / "link1").symlink_to("file1") repo.commit("link link1 to file1") (rp / "file2").unlink() repo.commit("remove file2") (rp / "bin1").rename(rp / "bin") repo.commit("rename bin1 to bin") loader = git_loader(swh_storage, str(rp)) loader.load() obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with cook_extract_revision_gitfast(swh_storage, obj_id) as (ert, p): ert.checkout(b"HEAD") assert (p / "file1").stat().st_mode == 0o100644 assert (p / "file1").read_text() == TEST_CONTENT assert (p / "link1").is_symlink assert os.readlink(str(p / "link1")) == "file1" assert (p / "bin").stat().st_mode == 0o100755 assert (p / "bin").read_bytes() == TEST_EXECUTABLE assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex def test_revision_two_roots(self, swh_storage): # # 1----3---4 # / # 2---- # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) c1 = repo.commit("Add file1") del repo.repo.refs[b"refs/heads/master"] # git update-ref -d HEAD (rp / "file2").write_text(TEST_CONTENT) repo.commit("Add file2") repo.merge([c1]) (rp / "file3").write_text(TEST_CONTENT) repo.commit("add file3") obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) loader = git_loader(swh_storage, str(rp)) loader.load() with cook_extract_revision_gitfast(swh_storage, obj_id) as (ert, p): assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex def test_revision_two_double_fork_merge(self, swh_storage): # # 2---4---6 # / / / # 1---3---5 # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) c1 = repo.commit("Add file1") repo.repo.refs[b"refs/heads/c1"] = c1 (rp / "file2").write_text(TEST_CONTENT) repo.commit("Add file2") (rp / "file3").write_text(TEST_CONTENT) c3 = repo.commit("Add file3", ref=b"refs/heads/c1") repo.repo.refs[b"refs/heads/c3"] = c3 repo.merge([c3]) (rp / "file5").write_text(TEST_CONTENT) c5 = repo.commit("Add file3", ref=b"refs/heads/c3") repo.merge([c5]) obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) loader = git_loader(swh_storage, str(rp)) loader.load() with cook_extract_revision_gitfast(swh_storage, obj_id) as (ert, p): assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex def test_revision_triple_merge(self, swh_storage): # # .---.---5 # / / / # 2 3 4 # / / / # 1---.---. # repo = TestRepo() with repo as rp: (rp / "file1").write_text(TEST_CONTENT) c1 = repo.commit("Commit 1") repo.repo.refs[b"refs/heads/b1"] = c1 repo.repo.refs[b"refs/heads/b2"] = c1 repo.commit("Commit 2") c3 = repo.commit("Commit 3", ref=b"refs/heads/b1") c4 = repo.commit("Commit 4", ref=b"refs/heads/b2") repo.merge([c3, c4]) obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) loader = git_loader(swh_storage, str(rp)) loader.load() with cook_extract_revision_gitfast(swh_storage, obj_id) as (ert, p): assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex def test_revision_filtered_objects(self, swh_storage): repo = TestRepo() with repo as rp: file_1, id_1 = hash_content(b"test1") file_2, id_2 = hash_content(b"test2") file_3, id_3 = hash_content(b"test3") (rp / "file").write_bytes(file_1) (rp / "hidden_file").write_bytes(file_2) (rp / "absent_file").write_bytes(file_3) repo.commit() obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) loader = git_loader(swh_storage, str(rp)) loader.load() # FIXME: storage.content_update() should be changed to allow things # like that with swh_storage.get_db().transaction() as cur: cur.execute( """update content set status = 'visible' where sha1 = %s""", (id_1,), ) cur.execute( """update content set status = 'hidden' where sha1 = %s""", (id_2,), ) cur.execute( """update content set status = 'absent' where sha1 = %s""", (id_3,), ) with cook_extract_revision_gitfast(swh_storage, obj_id) as (ert, p): ert.checkout(b"HEAD") assert (p / "file").read_bytes() == b"test1" assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE def test_revision_bogus_perms(self, swh_storage): # Some early git repositories have 664/775 permissions... let's check # if all the weird modes are properly normalized in the revision # cooker. repo = TestRepo() with repo as rp: (rp / "file").write_text(TEST_CONTENT) (rp / "file").chmod(0o664) (rp / "executable").write_bytes(TEST_EXECUTABLE) (rp / "executable").chmod(0o775) (rp / "wat").write_text(TEST_CONTENT) (rp / "wat").chmod(0o604) repo.commit("initial commit") loader = git_loader(swh_storage, str(rp)) loader.load() obj_id_hex = repo.repo.refs[b"HEAD"].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with cook_extract_revision_gitfast(swh_storage, obj_id) as (ert, p): ert.checkout(b"HEAD") assert (p / "file").stat().st_mode == 0o100644 assert (p / "executable").stat().st_mode == 0o100755 assert (p / "wat").stat().st_mode == 0o100644 def test_revision_null_fields(self, swh_storage): # Our schema doesn't enforce a lot of non-null revision fields. We need # to check these cases don't break the cooker. repo = TestRepo() with repo as rp: (rp / "file").write_text(TEST_CONTENT) c = repo.commit("initial commit") loader = git_loader(swh_storage, str(rp)) loader.load() repo.repo.refs[b"HEAD"].decode() dir_id_hex = repo.repo[c].tree.decode() dir_id = hashutil.hash_to_bytes(dir_id_hex) test_revision = Revision( message=b"", author=Person(name=None, email=None, fullname=b""), date=None, committer=Person(name=None, email=None, fullname=b""), committer_date=None, parents=(), type=RevisionType.GIT, directory=dir_id, metadata={}, synthetic=True, ) swh_storage.revision_add([test_revision]) with cook_extract_revision_gitfast(swh_storage, test_revision.id) as (ert, p): ert.checkout(b"HEAD") assert (p / "file").stat().st_mode == 0o100644 def test_revision_revision_data(self, swh_storage): target_rev = "0e8a3ad980ec179856012b7eecf4327e99cd44cd" dir = Directory( entries=( DirectoryEntry( name=b"submodule", type="rev", target=hashutil.hash_to_bytes(target_rev), perms=0o100644, ), ), ) swh_storage.directory_add([dir]) rev = Revision( message=b"", author=Person(name=None, email=None, fullname=b""), date=None, committer=Person(name=None, email=None, fullname=b""), committer_date=None, parents=(), type=RevisionType.GIT, directory=dir.id, metadata={}, synthetic=True, ) swh_storage.revision_add([rev]) with cook_stream_revision_gitfast(swh_storage, rev.id) as stream: pattern = "M 160000 {} submodule".format(target_rev).encode() assert pattern in stream.read() diff --git a/swh/vault/tests/test_cookers_base.py b/swh/vault/tests/test_cookers_base.py index 5486d75..3eacecf 100644 --- a/swh/vault/tests/test_cookers_base.py +++ b/swh/vault/tests/test_cookers_base.py @@ -1,75 +1,74 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from unittest.mock import MagicMock from swh.model import hashutil from swh.vault.cookers.base import BaseVaultCooker - TEST_BUNDLE_CHUNKS = [b"test content 1\n", b"test content 2\n", b"test content 3\n"] TEST_BUNDLE_CONTENT = b"".join(TEST_BUNDLE_CHUNKS) TEST_OBJ_TYPE = "test_type" TEST_HEX_ID = "17a3e48bce37be5226490e750202ad3a9a1a3fe9" TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) class BaseVaultCookerMock(BaseVaultCooker): CACHE_TYPE_KEY = TEST_OBJ_TYPE def __init__(self): # we do not call super() here to bypass the building of db objects from # config since we do mock these db objects self.config = {} self.storage = MagicMock() self.backend = MagicMock() self.obj_type = self.CACHE_TYPE_KEY self.obj_id = hashutil.hash_to_bytes(TEST_OBJ_ID) self.max_bundle_size = 1024 def check_exists(self): return True def prepare_bundle(self): for chunk in TEST_BUNDLE_CHUNKS: self.write(chunk) def test_simple_cook(): cooker = BaseVaultCookerMock() cooker.cook() cooker.backend.put_bundle.assert_called_once_with( TEST_OBJ_TYPE, TEST_OBJ_ID, TEST_BUNDLE_CONTENT ) cooker.backend.set_status.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID, "done") cooker.backend.set_progress.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID, None) cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID) def test_code_exception_cook(): cooker = BaseVaultCookerMock() cooker.prepare_bundle = MagicMock() cooker.prepare_bundle.side_effect = RuntimeError("Nope") cooker.cook() # Potentially remove this when we have objstorage streaming cooker.backend.put_bundle.assert_not_called() cooker.backend.set_status.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID, "failed") assert "Nope" not in cooker.backend.set_progress.call_args[0][2] cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID) def test_policy_exception_cook(): cooker = BaseVaultCookerMock() cooker.max_bundle_size = 8 cooker.cook() # Potentially remove this when we have objstorage streaming cooker.backend.put_bundle.assert_not_called() cooker.backend.set_status.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID, "failed") assert "exceeds" in cooker.backend.set_progress.call_args[0][2] cooker.backend.send_notif.assert_called_with(TEST_OBJ_TYPE, TEST_OBJ_ID) diff --git a/swh/vault/tests/test_server.py b/swh/vault/tests/test_server.py index 507d56a..5bbbf14 100644 --- a/swh/vault/tests/test_server.py +++ b/swh/vault/tests/test_server.py @@ -1,56 +1,55 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest -from swh.core.api.serializers import msgpack_loads, msgpack_dumps - +from swh.core.api.serializers import msgpack_dumps, msgpack_loads from swh.vault.api.server import make_app @pytest.fixture def client(swh_vault, loop, aiohttp_client): app = make_app(backend=swh_vault) return loop.run_until_complete(aiohttp_client(app)) async def test_index(client): resp = await client.get("/") assert resp.status == 200 async def test_cook_notfound(client): resp = await client.post("/cook/directory/000000") assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["exception"]["type"] == "NotFoundExc" assert content["exception"]["args"] == ["Object 000000 was not found."] async def test_progress_notfound(client): resp = await client.get("/progress/directory/000000") assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["exception"]["type"] == "NotFoundExc" assert content["exception"]["args"] == ["directory 000000 was not found."] async def test_batch_cook_invalid_type(client): data = msgpack_dumps([("foobar", [])]) resp = await client.post( "/batch_cook", data=data, headers={"Content-Type": "application/x-msgpack"} ) assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["exception"]["type"] == "NotFoundExc" assert content["exception"]["args"] == ["foobar is an unknown type."] async def test_batch_progress_notfound(client): resp = await client.get("/batch_progress/1") assert resp.status == 400 content = msgpack_loads(await resp.content.read()) assert content["exception"]["type"] == "NotFoundExc" assert content["exception"]["args"] == ["Batch 1 does not exist."] diff --git a/swh/vault/tests/test_to_disk.py b/swh/vault/tests/test_to_disk.py index 0b36879..7596dd7 100644 --- a/swh/vault/tests/test_to_disk.py +++ b/swh/vault/tests/test_to_disk.py @@ -1,73 +1,72 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.model.model import Content, SkippedContent - from swh.vault.to_disk import get_filtered_files_content def test_get_filtered_files_content(swh_storage): content = Content.from_data(b"foo bar") skipped_content = SkippedContent( sha1=None, sha1_git=b"c" * 20, sha256=None, blake2s256=None, length=42, status="absent", reason="for some reason", ) swh_storage.content_add([content]) swh_storage.skipped_content_add([skipped_content]) files_data = [ { "status": "visible", "sha1": content.sha1, "sha1_git": content.sha1_git, "target": content.sha1_git, }, {"status": "absent", "target": skipped_content.sha1_git,}, ] res = list(get_filtered_files_content(swh_storage, files_data)) assert res == [ { "content": content.data, "status": "visible", "sha1": content.sha1, "sha1_git": content.sha1_git, "target": content.sha1_git, }, { "content": ( b"This content has not been retrieved in the " b"Software Heritage archive due to its size." ), "status": "absent", "target": skipped_content.sha1_git, }, ] def test_get_filtered_files_content__unknown_status(swh_storage): content = Content.from_data(b"foo bar") swh_storage.content_add([content]) files_data = [ { "status": "visible", "sha1": content.sha1, "sha1_git": content.sha1_git, "target": content.sha1_git, }, {"status": None, "target": b"c" * 20,}, ] with pytest.raises(AssertionError, match="unexpected status None"): list(get_filtered_files_content(swh_storage, files_data)) diff --git a/swh/vault/to_disk.py b/swh/vault/to_disk.py index aae0a5e..152040d 100644 --- a/swh/vault/to_disk.py +++ b/swh/vault/to_disk.py @@ -1,141 +1,139 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections import functools import os - from typing import Any, Dict, Iterator, List from swh.model import hashutil -from swh.model.from_disk import mode_to_perms, DentryPerms +from swh.model.from_disk import DentryPerms, mode_to_perms from swh.storage.algos.dir_iterators import dir_iterator from swh.storage.interface import StorageInterface - SKIPPED_MESSAGE = ( b"This content has not been retrieved in the " b"Software Heritage archive due to its size." ) HIDDEN_MESSAGE = b"This content is hidden." def get_filtered_files_content( storage: StorageInterface, files_data: List[Dict] ) -> Iterator[Dict[str, Any]]: """Retrieve the files specified by files_data and apply filters for skipped and missing contents. Args: storage: the storage from which to retrieve the objects files_data: list of file entries as returned by directory_ls() Yields: The entries given in files_data with a new 'content' key that points to the file content in bytes. The contents can be replaced by a specific message to indicate that they could not be retrieved (either due to privacy policy or because their sizes were too big for us to archive it). """ for file_data in files_data: status = file_data["status"] if status == "absent": content = SKIPPED_MESSAGE elif status == "hidden": content = HIDDEN_MESSAGE elif status == "visible": sha1 = file_data["sha1"] data = storage.content_get_data(sha1) if data is None: content = SKIPPED_MESSAGE else: content = data else: assert False, ( f"unexpected status {status!r} " f"for content {hashutil.hash_to_hex(file_data['target'])}" ) yield {"content": content, **file_data} def apply_chunked(func, input_list, chunk_size): """Apply func on input_list divided in chunks of size chunk_size""" for i in range(0, len(input_list), chunk_size): yield from func(input_list[i : i + chunk_size]) class DirectoryBuilder: """Reconstructs the on-disk representation of a directory in the storage. """ def __init__(self, storage, root, dir_id): """Initialize the directory builder. Args: storage: the storage object root: the path where the directory should be reconstructed dir_id: the identifier of the directory in the storage """ self.storage = storage self.root = root self.dir_id = dir_id def build(self): """Perform the reconstruction of the directory in the given root.""" # Retrieve data from the database. # Split into files, revisions and directory data. entries = collections.defaultdict(list) for entry in dir_iterator(self.storage, self.dir_id): entries[entry["type"]].append(entry) # Recreate the directory's subtree and then the files into it. self._create_tree(entries["dir"]) self._create_files(entries["file"]) self._create_revisions(entries["rev"]) def _create_tree(self, directories): """Create a directory tree from the given paths The tree is created from `root` and each given directory in `directories` will be created. """ # Directories are sorted by depth so they are created in the # right order bsep = os.path.sep.encode() directories = sorted(directories, key=lambda x: len(x["path"].split(bsep))) for dir in directories: os.makedirs(os.path.join(self.root, dir["path"])) def _create_files(self, files_data): """Create the files in the tree and fetch their contents.""" f = functools.partial(get_filtered_files_content, self.storage) files_data = apply_chunked(f, files_data, 1000) for file_data in files_data: path = os.path.join(self.root, file_data["path"]) self._create_file(path, file_data["content"], file_data["perms"]) def _create_revisions(self, revs_data): """Create the revisions in the tree as broken symlinks to the target identifier.""" for file_data in revs_data: path = os.path.join(self.root, file_data["path"]) self._create_file( path, hashutil.hash_to_hex(file_data["target"]), mode=0o120000 ) def _create_file(self, path, content, mode=0o100644): """Create the given file and fill it with content.""" perms = mode_to_perms(mode) if perms == DentryPerms.symlink: os.symlink(content, path) else: with open(path, "wb") as f: f.write(content) os.chmod(path, perms.value)