diff --git a/PKG-INFO b/PKG-INFO index b47355d..d138773 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.vault -Version: 0.0.6 +Version: 0.0.7 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/docs/api.rst b/docs/api.rst new file mode 100644 index 0000000..ade78b4 --- /dev/null +++ b/docs/api.rst @@ -0,0 +1,178 @@ +.. _vault-api-ref: + +Vault API Reference +=================== + +Software source code **objects**---e.g., individual files, directories, +commits, tagged releases, etc.---are stored in the Software Heritage (SWH) +Archive in fully deduplicated form. That allows direct access to individual +artifacts, but require some preparation ("cooking") when fast access to a large +set of related objects (e.g., an entire repository) is required. + +The **Software Heritage Vault** takes care of that preparation by +asynchronously assembling **bundles** of related source code objects, caching, +and garbage collecting them as needed. + +The Vault is accessible via a RESTful API documented below. + +All endpoints are mounted at API root, which is currently at +https://archive.softwareheritage.org/api/1/. + +Unless otherwise stated, API endpoints respond to HTTP GET method. + + +Object identification +--------------------- + +The vault stores bundles corresponding to different kinds of objects (see +:ref:`data-model`). The following object kinds are currently supported by the +Vault: + +- directories +- revisions +- snapshots + +The URL fragment ``:objectkind/:objectid`` is used throughout the vault API to +identify vault objects. The syntax and meaning of ``:objectid`` for the +different object kinds is detailed below. + +Optionally, a third parameter, ``:format``, can be used (when multiple formats +are supported) to specify the format of the resulting bundle when needed. The +URL fragment becomes then ``:objectkind/:objectid/:format``. + + +Directories +~~~~~~~~~~~ + +- object kind: ``directory`` +- URL fragment: ``directory/:dir_id`` + +where ``:dir_id`` is a :py:func:`directory identifier +`. + +The only format available for a directory export is a gzip-compressed +tarball. You can extract the resulting bundle using: + +.. code:: shell + + tar xaf bundle.tar.gz + + +Revisions +~~~~~~~~~ + +- object kind: ``revision`` +- URL fragment: ``revision/:rev_id/:format`` + +where ``:rev_id`` is a :py:func:`revision identifier +` and ``:format`` is the export +format. + +The only format available for a revision export is ``gitfast``: a +gzip-compressed `git fast-export +`_. You can extract the resulting +bundle using: + +.. code:: shell + + git init + zcat bundle.gitfast.gz | git fast-import + git checkout HEAD + + +Repository snapshots +~~~~~~~~~~~~~~~~~~~~ + +.. TODO +**(NOT AVAILABLE YET)** + +- object kind: ``snapshot`` +- URL fragment: ``snapshot/:snp_id`` + +where ``:snp_id`` is a :py:func:`snapshot identifier +`. + + +Cooking and status checking +--------------------------- + +Vault bundles might be ready for retrieval or not. When they are not, they will +need to be **cooked** before they can be retrieved. A cooked bundle will remain +around until it expires; after expiration, it will need to be cooked again +before it can be retrieved. Cooking is idempotent, and a no-op in between a +previous cooking operation and expiration. + +.. http:post:: /vault/:objectkind/:objectid/:format +.. http:get:: /vault/:objectkind/:objectid/:format + + **Request body**: optionally, an ``email`` POST parameter containing an + e-mail to notify when the bundle cooking has ended. + + **Allowed HTTP Methods:** + + - :http:method:`post` to **request** a bundle cooking + - :http:method:`get` to check the progress and status of the cooking + - :http:method:`head` + - :http:method:`options` + + **Response:** + + :statuscode 200: bundle available for cooking, status of the cooking + :statuscode 400: malformed identifier hash or format + :statuscode 404: unavailable bundle or object not found + + .. sourcecode:: http + + HTTP/1.1 200 OK + Content-Type: application/json + + { + "id": 42, + "fetch_url": "/api/1/vault/directory/:dir_id/raw/", + "obj_id": ":dir_id", + "obj_type": "directory", + "progress_message": "Creating tarball...", + "status": "pending" + } + + After a cooking request has been started, all subsequent GET and POST + requests to the cooking URL return some JSON data containing information + about the progress of the bundle creation. The JSON contains the + following keys: + + - ``id``: the ID of the cooking request + + - ``fetch_url``: the URL that can be used for the retrieval of the bundle + + - ``obj_type``: an internal identifier uniquely representing the object + kind and the format of the required bundle. + + - ``obj_id``: the identifier of the requested bundle + + - ``progress_message``: a string describing the current progress of the + cooking. If the cooking failed, ``progress_message`` will contain the + reason of the failure. + + - ``status``: one of the following values: + + - ``new``: the bundle request was created + - ``pending``: the bundle is being cooked + - ``done``: the bundle has been cooked and is ready for retrieval + - ``failed``: the bundle cooking failed and can be retried + +Retrieval +--------- + +Retrieve a specific bundle from the vault with: + +.. http:get:: /vault/:objectkind/:objectid/:format/raw + + Where ``:format`` is optional, depending on the object kind. + + **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, + :http:method:`options` + + **Response**: + + :statuscode 200: bundle available; response body is the bundle. + :statuscode 404: unavailable bundle; client should request its cooking. diff --git a/docs/getting-started.rst b/docs/getting-started.rst new file mode 100644 index 0000000..9239d96 --- /dev/null +++ b/docs/getting-started.rst @@ -0,0 +1,62 @@ +.. _vault-primer: + +Getting started +=============== + +The Vault is a service in charge of reconstructing parts of the archive +as self-contained bundles, that can then be imported locally, for +instance in a Git repository. This is basically where you can do a +``git clone`` of a repository stored in Software Heritage. + +The Vault is asynchronous : you first need to do a request to prepare +the bundle you need, and then a second request to fetch the bundle once +the Vault has finished to reconstitute the bundle. + +Example: retrieving a directory +------------------------------- + +First, ask the Vault to prepare your bundle: + +.. code:: shell + + curl -X POST https://archive.softwareheritage.org/1/vault/directory/:dir_id/ + +where ``:dir_id`` is a :py:func:`directory identifier +`. This initial request and all +subsequent requests to this endpoint will return some JSON data containing +information about the progress of bundle creation: + +.. code:: json + + { + "id": 42, + "fetch_url": "/api/1/vault/directory/:dir_id/raw/", + "obj_id": ":dir_id", + "obj_type": "directory", + "progress_message": "Creating tarball...", + "status": "pending" + } + +Once the status is ``done``, you can fetch the bundle at the address +given in the ``fetch_url`` field. + +.. code:: shell + + curl -o bundle.tar.gz https://archive.softwareheritage.org/1/vault/directory/:dir_id/raw + tar xaf bundle.tar.gz + +E-mail notifications +-------------------- + +You can also ask to be notified by e-mail once the bundle you requested is +ready, by giving an ``email`` POST parameter: + +.. code:: shell + + curl -X POST -d 'email=example@example.com' \ + https://archive.softwareheritage.org/1/vault/directory/:dir_id/ + +API reference +~~~~~~~~~~~~~ + +For a more exhaustive overview of the Vault API, see the :ref:`vault-api-ref`. diff --git a/docs/index.rst b/docs/index.rst index d57ac37..1151a5f 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,17 +1,19 @@ .. _swh-vault: Software Heritage - Development Documentation ============================================= .. toctree:: :maxdepth: 2 :caption: Contents: + getting-started.rst + api.rst Indices and tables ================== * :ref:`genindex` * :ref:`modindex` * :ref:`search` diff --git a/docs/vault-blueprint.md b/docs/vault-blueprint.md deleted file mode 100644 index 993eb32..0000000 --- a/docs/vault-blueprint.md +++ /dev/null @@ -1,139 +0,0 @@ -Software Heritage Vault -======================= - -Software source code **objects**---e.g., individual source code files, -tarballs, commits, tagged releases, etc.---are stored in the Software Heritage -(SWH) Archive in fully deduplicated form. That allows direct access to -individual artifacts but require some preparation, usually in the form of -collecting and assembling multiple artifacts in a single **bundle**, when fast -access to a set of related artifacts (e.g., the snapshot of a VCS repository, -the archive corresponding to a Git commit, or a specific software release as a -zip archive) is required. - -The **Software Heritage Vault** is a cache of pre-built source code bundles -which are assembled opportunistically retrieving objects from the Software -Heritage Archive, can be accessed efficiently, and might be garbage collected -after a long period of non-use. - - -Requirements ------------- - -* **Shared cache** - - The vault is a cache shared among the various origins that the SWH archive - tracks. If the same bundle, originally coming from different origins, is - requested, a single entry for it in the cache shall exist. - -* **Efficient retrieval** - - Where supported by the desired access protocol (e.g., HTTP) it should be - possible for the vault to serve bundles efficiently (e.g., as static files - served via HTTP, possibly further proxied/cached at that level). In - particular, this rules out building bundles on the fly from the archive DB. - - -API ---- - -All URLs below are meant to be mounted at API root, which is currently at -. Unless otherwise stated, all API -endpoints respond on HTTP GET method. - - -## Object identification - -The vault stores bundles corresponding to different kinds of objects. The -following object kinds are supported: - -* directories -* revisions -* repository snapshots - -The URL fragment `:objectkind/:objectid` is used throughout the vault API to -fully identify vault objects. The syntax and meaning of :objectid for the -different object kinds is detailed below. - - -### Directories - -* object kind: directory -* URL fragment: directory/:sha1git - -where :sha1git is the directory ID in the SWH data model. - -### Revisions - -* object kind: revision -* URL fragment: revision/:sha1git - -where :sha1git is the revision ID in the SWH data model. - -### Repository snapshots - -* object kind: snapshot -* URL fragment: snapshot/:sha1git - -where :sha1git is the snapshot ID in the SWH data model. (**TODO** repository -snapshots don't exist yet as first-class citizens in the SWH data model; see -References below.) - - -## Cooking - -Bundles in the vault might be ready for retrieval or not. When they are not, -they will need to be **cooked** before they can be retrieved. A cooked bundle -will remain around until it expires; at that point it will need to be cooked -again before it can be retrieved. Cooking is idempotent, and a no-op in between -a previous cooking operation and expiration. - -To cook a bundle: - -* POST /vault/:objectkind/:objectid - - Request body: **TODO** something here in a JSON payload that would allow - notifying the user when the bundle is ready. - - Response: 201 Created - - -## Retrieval - -* GET /vault/:objectkind - - (paginated) list of all bundles of a given kind available in the vault; see - Pagination. Note that, due to cache expiration, objects might disappear - between listing and subsequent actions on them. - - Examples: - - * GET /vault/directory - * GET /vault/revision - -* GET /vault/:objectkind/:objectid - - Retrieve a specific bundle from the vault. - - Response: - - * 200 OK: bundle available; response body is the bundle - * 404 Not Found: missing bundle; client should request its preparation (see Cooking) - - -References ----------- - -* [Repository snapshot objects](https://wiki.softwareheritage.org/index.php?title=User:StefanoZacchiroli/Repository_snapshot_objects) -* Amazon Web Services, - [API Reference for Amazon Glacier](http://docs.aws.amazon.com/amazonglacier/latest/dev/amazon-glacier-api.html); - specifically - [Job Operations](http://docs.aws.amazon.com/amazonglacier/latest/dev/job-operations.html) - - -TODO -==== - -* **TODO** pagination using HATEOAS -* **TODO** authorization: the cooking API should be somehow controlled to avoid - obvious abuses (e.g., let's cache everything) -* **TODO** finalize repository snapshot proposal diff --git a/sql/swh-vault-schema.sql b/sql/swh-vault-schema.sql index 5265f7a..3f5640e 100644 --- a/sql/swh-vault-schema.sql +++ b/sql/swh-vault-schema.sql @@ -1,42 +1,42 @@ create table dbversion ( version int primary key, release timestamptz not null, description text not null ); comment on table dbversion is 'Schema update tracking'; insert into dbversion (version, release, description) values (1, now(), 'Initial version'); create domain obj_hash as bytea; create type cook_type as enum ('directory', 'revision_gitfast'); comment on type cook_type is 'Type of the requested bundle'; -create type cook_status as enum ('new', 'pending', 'done'); +create type cook_status as enum ('new', 'pending', 'done', 'failed'); comment on type cook_status is 'Status of the cooking'; create table vault_bundle ( id bigserial primary key, type cook_type not null, -- requested cooking type object_id obj_hash not null, -- requested object ID task_id integer, -- scheduler task id task_status cook_status not null default 'new', -- status of the task sticky boolean not null default false, -- bundle cannot expire ts_created timestamptz not null default now(), -- timestamp of creation ts_done timestamptz, -- timestamp of the cooking result ts_last_access timestamptz not null default now(), -- last access progress_msg text, -- progress message unique(type, object_id) ); create table vault_notif_email ( id bigserial primary key, email text not null, -- e-mail to notify bundle_id bigint not null references vault_bundle(id) ); diff --git a/swh.vault.egg-info/PKG-INFO b/swh.vault.egg-info/PKG-INFO index b47355d..d138773 100644 --- a/swh.vault.egg-info/PKG-INFO +++ b/swh.vault.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.vault -Version: 0.0.6 +Version: 0.0.7 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.vault.egg-info/SOURCES.txt b/swh.vault.egg-info/SOURCES.txt index e5dee1f..df4a40e 100644 --- a/swh.vault.egg-info/SOURCES.txt +++ b/swh.vault.egg-info/SOURCES.txt @@ -1,46 +1,49 @@ .gitignore AUTHORS LICENSE MANIFEST.in Makefile requirements-swh.txt requirements.txt setup.py version.txt debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format docs/.gitignore docs/Makefile +docs/api.rst docs/conf.py +docs/getting-started.rst docs/index.rst -docs/vault-blueprint.md docs/_static/.placeholder docs/_templates/.placeholder sql/swh-vault-schema.sql swh/__init__.py swh.vault.egg-info/PKG-INFO swh.vault.egg-info/SOURCES.txt swh.vault.egg-info/dependency_links.txt swh.vault.egg-info/not-zip-safe swh.vault.egg-info/requires.txt swh.vault.egg-info/top_level.txt swh/vault/__init__.py swh/vault/backend.py swh/vault/cache.py swh/vault/cooking_tasks.py +swh/vault/to_disk.py swh/vault/api/__init__.py swh/vault/api/client.py swh/vault/api/server.py swh/vault/cookers/__init__.py swh/vault/cookers/base.py swh/vault/cookers/directory.py swh/vault/cookers/revision_flat.py swh/vault/cookers/revision_gitfast.py swh/vault/tests/test_backend.py swh/vault/tests/test_cache.py swh/vault/tests/test_cookers.py +swh/vault/tests/test_cookers_base.py swh/vault/tests/vault_testing.py \ No newline at end of file diff --git a/swh/vault/backend.py b/swh/vault/backend.py index 19f53db..3b402c9 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,333 +1,382 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import smtplib import psycopg2 import psycopg2.extras from functools import wraps from email.mime.text import MIMEText from swh.model import hashutil from swh.scheduler.backend import SchedulerBackend from swh.scheduler.utils import create_oneshot_task_dict from swh.vault.cache import VaultCache from swh.vault.cookers import get_cooker from swh.vault.cooking_tasks import SWHCookingTask # noqa cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' NOTIF_EMAIL_FROM = ('"Software Heritage Vault" ' '') -NOTIF_EMAIL_SUBJECT = ("Bundle ready: {obj_type} {short_id}") -NOTIF_EMAIL_BODY = """ +NOTIF_EMAIL_SUBJECT_SUCCESS = ("Bundle ready: {obj_type} {short_id}") +NOTIF_EMAIL_SUBJECT_FAILURE = ("Bundle failed: {obj_type} {short_id}") + +NOTIF_EMAIL_BODY_SUCCESS = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ +NOTIF_EMAIL_BODY_FAILURE = """ +You have requested the following bundle from the Software Heritage +Vault: + +Object Type: {obj_type} +Object ID: {hex_id} + +This bundle could not be cooked for the following reason: + +{progress_msg} + +We apologize for the inconvenience. + +--\x20 +The Software Heritage Developers +""" + class NotFoundExc(Exception): """Bundle was not found.""" pass # TODO: Imported from swh.scheduler.backend. Factorization needed. def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False # TODO: I don't like using None, it's confusing for the user. how about # a NEW_CURSOR object()? if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped # TODO: This has to be factorized with other database base classes and helpers # (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) # The three first methods are imported from swh.scheduler.backend. class VaultBackend: """ Backend for the Software Heritage vault. """ def __init__(self, config): self.config = config self.cache = VaultCache(self.config['cache']) self.db = None self.reconnect() self.smtp_server = smtplib.SMTP('localhost', 25) - self.scheduler = SchedulerBackend( - scheduling_db=self.config['scheduling_db']) + if self.config['scheduling_db'] is not None: + self.scheduler = SchedulerBackend( + scheduling_db=self.config['scheduling_db']) def reconnect(self): """Reconnect to the database.""" if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['db'], cursor_factory=psycopg2.extras.RealDictCursor, ) def close(self): """Close the underlying database connection.""" self.db.close() def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() @autocommit def task_info(self, obj_type, obj_id, cursor=None): """Fetch information from a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' SELECT id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) res = cursor.fetchone() if res: res['object_id'] = bytes(res['object_id']) return res def _send_task(self, args): """Send a cooking task to the celery scheduler""" task = create_oneshot_task_dict('swh-vault-cooking', *args) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]['id'] @autocommit def create_task(self, obj_type, obj_id, sticky=False, cursor=None): """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) hex_id = hashutil.hash_to_hex(obj_id) args = [obj_type, hex_id] cooker_class = get_cooker(obj_type) cooker = cooker_class(*args) if not cooker.check_exists(): raise NotFoundExc("Object {} was not found.".format(hex_id)) cursor.execute(''' INSERT INTO vault_bundle (type, object_id, sticky) VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky)) self.commit() task_id = self._send_task(args) cursor.execute(''' UPDATE vault_bundle SET task_id = %s WHERE type = %s AND object_id = %s''', (task_id, obj_type, obj_id)) @autocommit def add_notif_email(self, obj_type, obj_id, email, cursor=None): """Add an e-mail address to notify when a given bundle is ready""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))''', (email, obj_type, obj_id)) @autocommit def cook_request(self, obj_type, obj_id, *, sticky=False, email=None, cursor=None): """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" info = self.task_info(obj_type, obj_id) + + # If there's a failed bundle entry, delete it first. + if info is not None and info['task_status'] == 'failed': + cursor.execute('''DELETE FROM vault_bundle + WHERE type = %s AND object_id = %s''', + (obj_type, obj_id)) + self.commit() + info = None + + # If there's no bundle entry, create the task. if info is None: self.create_task(obj_type, obj_id, sticky) + if email is not None: + # If the task is already done, send the email directly if info is not None and info['task_status'] == 'done': self.send_notification(None, email, obj_type, obj_id) + # Else, add it to the notification queue else: self.add_notif_email(obj_type, obj_id, email) + info = self.task_info(obj_type, obj_id) return info @autocommit def is_available(self, obj_type, obj_id, cursor=None): """Check whether a bundle is available for retrieval""" info = self.task_info(obj_type, obj_id, cursor=cursor) return (info is not None and info['task_status'] == 'done' and self.cache.is_cached(obj_type, obj_id)) @autocommit def fetch(self, obj_type, obj_id, cursor=None): """Retrieve a bundle from the cache""" if not self.is_available(obj_type, obj_id, cursor=cursor): return None self.update_access_ts(obj_type, obj_id, cursor=cursor) return self.cache.get(obj_type, obj_id) @autocommit def update_access_ts(self, obj_type, obj_id, cursor=None): """Update the last access timestamp of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) @autocommit def set_status(self, obj_type, obj_id, status, cursor=None): """Set the cooking status of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) req = (''' UPDATE vault_bundle SET task_status = %s ''' + (''', ts_done = NOW() ''' if status == 'done' else '') + '''WHERE type = %s AND object_id = %s''') cursor.execute(req, (status, obj_type, obj_id)) @autocommit def set_progress(self, obj_type, obj_id, progress, cursor=None): """Set the cooking progress of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s''', (progress, obj_type, obj_id)) @autocommit def send_all_notifications(self, obj_type, obj_id, cursor=None): """Send all the e-mails in the notification list of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' - SELECT vault_notif_email.id AS id, email + SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', (obj_type, obj_id)) for d in cursor: - self.send_notification(d['id'], d['email'], obj_type, obj_id) + self.send_notification(d['id'], d['email'], obj_type, obj_id, + status=d['task_status'], + progress_msg=d['progress_msg']) @autocommit - def send_notification(self, n_id, email, obj_type, obj_id, cursor=None): + def send_notification(self, n_id, email, obj_type, obj_id, status, + progress_msg=None, cursor=None): """Send the notification of a bundle to a specific e-mail""" hex_id = hashutil.hash_to_hex(obj_id) short_id = hex_id[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = ('https://archive.softwareheritage.org/api/1/vault/{}/{}/' 'raw'.format(obj_type, hex_id)) - text = NOTIF_EMAIL_BODY.strip() - text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) - msg = MIMEText(text) - msg['Subject'] = (NOTIF_EMAIL_SUBJECT - .format(obj_type=obj_type, short_id=short_id)) + if status == 'done': + text = NOTIF_EMAIL_BODY_SUCCESS.strip() + text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) + msg = MIMEText(text) + msg['Subject'] = (NOTIF_EMAIL_SUBJECT_SUCCESS + .format(obj_type=obj_type, short_id=short_id)) + elif status == 'failed': + text = NOTIF_EMAIL_BODY_FAILURE.strip() + text = text.format(obj_type=obj_type, hex_id=hex_id, + progress_msg=progress_msg) + msg = MIMEText(text) + msg['Subject'] = (NOTIF_EMAIL_SUBJECT_FAILURE + .format(obj_type=obj_type, short_id=short_id)) + else: + raise RuntimeError("send_notification called on a '{}' bundle" + .format(status)) + msg['From'] = NOTIF_EMAIL_FROM msg['To'] = email self._smtp_send(msg) if n_id is not None: cursor.execute(''' DELETE FROM vault_notif_email WHERE id = %s''', (n_id,)) def _smtp_send(self, msg): # Reconnect if needed try: status = self.smtp_server.noop()[0] except: # smtplib.SMTPServerDisconnected status = -1 if status != 250: self.smtp_server.connect() # Send the message self.smtp_server.send_message(msg) @autocommit def _cache_expire(self, cond, *args, cursor=None): """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cursor.execute(''' DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, object_id '''.format(cond), args) for d in cursor: self.cache.delete(d['type'], bytes(d['object_id'])) @autocommit def cache_expire_oldest(self, n=1, by='last_access', cursor=None): """Expire the `n` oldest bundles""" assert by in ('created', 'done', 'last_access') filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n) return self._cache_expire(filter) @autocommit def cache_expire_until(self, date, by='last_access', cursor=None): """Expire all the bundles until a certain date""" assert by in ('created', 'done', 'last_access') filter = '''AND ts_{} <= %s'''.format(by) return self._cache_expire(filter, date) diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index 7fc6d6b..a4ffdac 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,229 +1,128 @@ -# Copyright (C) 2016-2017 The Software Heritage developers +# Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io -import itertools -import os -import tarfile -import tempfile - -from pathlib import Path +import logging from swh.core import config from swh.model import hashutil -from swh.model.from_disk import mode_to_perms, DentryPerms from swh.storage import get_storage from swh.vault.api.client import RemoteVaultClient DEFAULT_CONFIG_PATH = 'vault/cooker' DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/', }, }), - 'vault_url': ('str', 'http://localhost:5005/') + 'vault_url': ('str', 'http://localhost:5005/'), + 'max_bundle_size': ('int', 2 ** 29), # 512 MiB } +class PolicyError(Exception): + """Raised when the bundle violates the cooking policy.""" + pass + + +class BundleTooLargeError(PolicyError): + """Raised when the bundle is too large to be cooked.""" + pass + + +class BytesIOBundleSizeLimit(io.BytesIO): + def __init__(self, *args, size_limit=None, **kwargs): + super().__init__(*args, **kwargs) + self.size_limit = size_limit + + def write(self, chunk): + if ((self.size_limit is not None + and self.getbuffer().nbytes + len(chunk) > self.size_limit)): + raise BundleTooLargeError( + "The requested bundle exceeds the maximum allowed " + "size of {} bytes.".format(self.size_limit)) + return super().write(chunk) + + class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle """ CACHE_TYPE_KEY = None def __init__(self, obj_type, obj_id): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: storage: the storage object cache: the cache where to store the bundle obj_id: id of the object to be cooked into a bundle. """ self.config = config.load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG) self.obj_type = obj_type self.obj_id = hashutil.hash_to_bytes(obj_id) self.backend = RemoteVaultClient(self.config['vault_url']) self.storage = get_storage(**self.config['storage']) + self.max_bundle_size = self.config['max_bundle_size'] @abc.abstractmethod def check_exists(self): """Checks that the requested object exists and can be cooked. Override this in the cooker implementation. """ raise NotImplemented @abc.abstractmethod def prepare_bundle(self): """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplemented + def write(self, chunk): + self.fileobj.write(chunk) + def cook(self): """Cook the requested object into a bundle """ self.backend.set_status(self.obj_type, self.obj_id, 'pending') self.backend.set_progress(self.obj_type, self.obj_id, 'Processing...') - content_iter = self.prepare_bundle() - - # TODO: use proper content streaming - bundle = b''.join(content_iter) - self.backend.put_bundle(self.CACHE_TYPE_KEY, self.obj_id, bundle) - - self.backend.set_status(self.obj_type, self.obj_id, 'done') - self.backend.set_progress(self.obj_type, self.obj_id, None) - self.backend.send_notif(self.obj_type, self.obj_id) - - -SKIPPED_MESSAGE = (b'This content has not been retrieved in the ' - b'Software Heritage archive due to its size.') - -HIDDEN_MESSAGE = (b'This content is hidden.') - - -def get_filtered_file_content(storage, file_data): - """Retrieve the file specified by file_data and apply filters for skipped - and missing contents. - - Args: - storage: the storage from which to retrieve the object - file_data: file entry descriptor as returned by directory_ls() - - Returns: - Bytes containing the specified content. The content will be replaced by - a specific message to indicate that the content could not be retrieved - (either due to privacy policy or because its size was too big for us to - archive it). - """ - - assert file_data['type'] == 'file' - - if file_data['status'] == 'absent': - return SKIPPED_MESSAGE - elif file_data['status'] == 'hidden': - return HIDDEN_MESSAGE - else: - return list(storage.content_get([file_data['sha1']]))[0]['data'] - -def get_tar_bytes(path, arcname=None): - path = Path(path) - if not arcname: - arcname = path.name - tar_buffer = io.BytesIO() - tar = tarfile.open(fileobj=tar_buffer, mode='w') - tar.add(str(path), arcname=arcname) - return tar_buffer.getbuffer() - - -class DirectoryBuilder: - """Creates a cooked directory from its sha1_git in the db. - - Warning: This is NOT a directly accessible cooker, but a low-level - one that executes the manipulations. - - """ - def __init__(self, storage): - self.storage = storage - - def get_directory_bytes(self, dir_id): - # Create temporary folder to retrieve the files into. - root = bytes(tempfile.mkdtemp(prefix='directory.', - suffix='.cook'), 'utf8') - self.build_directory(dir_id, root) - # Use the created directory to make a bundle with the data as - # a compressed directory. - bundle_content = self._create_bundle_content( - root, - hashutil.hash_to_hex(dir_id)) - return bundle_content - - def build_directory(self, dir_id, root): - # Retrieve data from the database. - data = self.storage.directory_ls(dir_id, recursive=True) - - # Split into files and directory data. - # TODO(seirl): also handle revision data. - data1, data2 = itertools.tee(data, 2) - dir_data = (entry['name'] for entry in data1 if entry['type'] == 'dir') - file_data = (entry for entry in data2 if entry['type'] == 'file') - - # Recreate the directory's subtree and then the files into it. - self._create_tree(root, dir_data) - self._create_files(root, file_data) - - def _create_tree(self, root, directory_paths): - """Create a directory tree from the given paths - - The tree is created from `root` and each given path in - `directory_paths` will be created. - - """ - # Directories are sorted by depth so they are created in the - # right order - bsep = bytes(os.path.sep, 'utf8') - dir_names = sorted( - directory_paths, - key=lambda x: len(x.split(bsep))) - for dir_name in dir_names: - os.makedirs(os.path.join(root, dir_name)) - - def _create_files(self, root, file_datas): - """Create the files according to their status. - - """ - # Then create the files - for file_data in file_datas: - path = os.path.join(root, file_data['name']) - content = get_filtered_file_content(self.storage, file_data) - self._create_file(path, content, file_data['perms']) - - def _create_file(self, path, content, mode=0o100644): - """Create the given file and fill it with content. - - """ - perms = mode_to_perms(mode) - if perms == DentryPerms.symlink: - os.symlink(content, path) + self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) + try: + self.prepare_bundle() + bundle = self.fileobj.getvalue() + except PolicyError as e: + self.backend.set_status(self.obj_type, self.obj_id, 'failed') + self.backend.set_progress(self.obj_type, self.obj_id, str(e)) + except Exception as e: + self.backend.set_status(self.obj_type, self.obj_id, 'failed') + self.backend.set_progress( + self.obj_type, self.obj_id, + "Internal Server Error. This incident will be reported.") + logging.exception("Bundle cooking failed.") else: - with open(path, 'wb') as f: - f.write(content) - os.chmod(path, perms.value) - - def _get_file_content(self, obj_id): - """Get the content of the given file. - - """ - content = list(self.storage.content_get([obj_id]))[0]['data'] - return content - - def _create_bundle_content(self, path, hex_dir_id): - """Create a bundle from the given directory - - Args: - path: location of the directory to package. - hex_dir_id: hex representation of the directory id - - Returns: - bytes that represent the compressed directory as a bundle. - - """ - return get_tar_bytes(path.decode(), hex_dir_id) + # TODO: use proper content streaming instead of put_bundle() + self.backend.put_bundle(self.CACHE_TYPE_KEY, self.obj_id, bundle) + self.backend.set_status(self.obj_type, self.obj_id, 'done') + self.backend.set_progress(self.obj_type, self.obj_id, None) + finally: + self.backend.send_notif(self.obj_type, self.obj_id) diff --git a/swh/vault/cookers/directory.py b/swh/vault/cookers/directory.py index 46e4819..70d9da0 100644 --- a/swh/vault/cookers/directory.py +++ b/swh/vault/cookers/directory.py @@ -1,18 +1,27 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.vault.cookers.base import BaseVaultCooker, DirectoryBuilder +import tarfile +import tempfile + +from swh.model import hashutil +from swh.vault.cookers.base import BaseVaultCooker +from swh.vault.to_disk import DirectoryBuilder class DirectoryCooker(BaseVaultCooker): """Cooker to create a directory bundle """ CACHE_TYPE_KEY = 'directory' def check_exists(self): return not list(self.storage.directory_missing([self.obj_id])) def prepare_bundle(self): - directory_builder = DirectoryBuilder(self.storage) - yield directory_builder.get_directory_bytes(self.obj_id) + with tempfile.TemporaryDirectory(prefix='tmp-vault-directory-') as td: + directory_builder = DirectoryBuilder( + self.storage, td.encode(), self.obj_id) + directory_builder.build() + tar = tarfile.open(fileobj=self.fileobj, mode='w') + tar.add(td, arcname=hashutil.hash_to_hex(self.obj_id)) diff --git a/swh/vault/cookers/revision_flat.py b/swh/vault/cookers/revision_flat.py index 2d0431b..0fc8579 100644 --- a/swh/vault/cookers/revision_flat.py +++ b/swh/vault/cookers/revision_flat.py @@ -1,37 +1,32 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import tarfile import tempfile from pathlib import Path from swh.model import hashutil - -from .base import BaseVaultCooker, DirectoryBuilder, get_tar_bytes +from swh.vault.cookers.base import BaseVaultCooker +from swh.vault.to_disk import DirectoryBuilder class RevisionFlatCooker(BaseVaultCooker): - """Cooker to create a directory bundle """ + """Cooker to create a revision_flat bundle """ CACHE_TYPE_KEY = 'revision_flat' def check_exists(self): return not list(self.storage.revision_missing([self.obj_id])) def prepare_bundle(self): - """Cook the requested revision into a Bundle - - Returns: - bytes that correspond to the bundle - - """ - directory_builder = DirectoryBuilder(self.storage) - with tempfile.TemporaryDirectory(suffix='.cook') as root_tmp: - root = Path(root_tmp) + with tempfile.TemporaryDirectory(prefix='tmp-vault-revision-') as td: + root = Path(td) for revision in self.storage.revision_log([self.obj_id]): revdir = root / hashutil.hash_to_hex(revision['id']) revdir.mkdir() - directory_builder.build_directory(revision['directory'], - str(revdir).encode()) - # FIXME: stream the bytes! this tarball can be HUUUUUGE - yield get_tar_bytes(root_tmp, hashutil.hash_to_hex(self.obj_id)) + directory_builder = DirectoryBuilder( + self.storage, str(revdir).encode(), revision['directory']) + directory_builder.build() + tar = tarfile.open(fileobj=self.fileobj, mode='w') + tar.add(td, arcname=hashutil.hash_to_hex(self.obj_id)) diff --git a/swh/vault/cookers/revision_gitfast.py b/swh/vault/cookers/revision_gitfast.py index d92870b..a5f74ac 100644 --- a/swh/vault/cookers/revision_gitfast.py +++ b/swh/vault/cookers/revision_gitfast.py @@ -1,230 +1,207 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import collections -import fastimport.commands import functools import os import time import zlib -from .base import BaseVaultCooker, get_filtered_file_content +from fastimport.commands import (CommitCommand, ResetCommand, BlobCommand, + FileDeleteCommand, FileModifyCommand) + +from swh.model import hashutil +from swh.model.toposort import toposort from swh.model.from_disk import mode_to_perms +from swh.vault.cookers.base import BaseVaultCooker +from swh.vault.to_disk import get_filtered_file_content class RevisionGitfastCooker(BaseVaultCooker): """Cooker to create a git fast-import bundle """ CACHE_TYPE_KEY = 'revision_gitfast' def check_exists(self): return not list(self.storage.revision_missing([self.obj_id])) def prepare_bundle(self): - log = self.storage.revision_log([self.obj_id]) - commands = self.fastexport(log) + self.log = list(toposort(self.storage.revision_log([self.obj_id]))) + self.gzobj = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) + self.fastexport() + self.write(self.gzobj.flush()) - compressobj = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) - for command in commands: - yield compressobj.compress(bytes(command) + b'\n') - yield compressobj.flush() + def write_cmd(self, cmd): + chunk = bytes(cmd) + b'\n' + super().write(self.gzobj.compress(chunk)) - def fastexport(self, log): + def fastexport(self): """Generate all the git fast-import commands from a given log. """ - self.rev_by_id = {r['id']: r for r in log} - self.rev_sorted = list(self._toposort(self.rev_by_id)) + self.rev_by_id = {r['id']: r for r in self.log} self.obj_done = set() self.obj_to_mark = {} self.next_available_mark = 1 last_progress_report = None - for i, rev in enumerate(self.rev_sorted, 1): + for i, rev in enumerate(self.log, 1): # Update progress if needed ct = time.time() if (last_progress_report is None or last_progress_report + 2 <= ct): last_progress_report = ct - pg = ('Computing revision {}/{}' - .format(i, len(self.rev_sorted))) + pg = ('Computing revision {}/{}'.format(i, len(self.log))) self.backend.set_progress(self.obj_type, self.obj_id, pg) # Compute the current commit - yield from self._compute_commit_command(rev) - - def _toposort(self, rev_by_id): - """Perform a topological sort on the revision graph. - """ - children = collections.defaultdict(list) # rev -> children - in_degree = {} # rev -> numbers of parents left to compute - - # Compute the in_degrees and the parents of all the revisions. - # Add the roots to the processing queue. - queue = collections.deque() - for rev_id, rev in rev_by_id.items(): - in_degree[rev_id] = len(rev['parents']) - if not rev['parents']: - queue.append(rev_id) - for parent in rev['parents']: - children[parent].append(rev_id) - - # Topological sort: yield the 'ready' nodes, decrease the in degree of - # their children and add the 'ready' ones to the queue. - while queue: - rev_id = queue.popleft() - yield rev_by_id[rev_id] - for child in children[rev_id]: - in_degree[child] -= 1 - if in_degree[child] == 0: - queue.append(child) + self._compute_commit_command(rev) def mark(self, obj_id): """Get the mark ID as bytes of a git object. If the object has not yet been marked, assign a new ID and add it to the mark dictionary. """ if obj_id not in self.obj_to_mark: self.obj_to_mark[obj_id] = self.next_available_mark self.next_available_mark += 1 return str(self.obj_to_mark[obj_id]).encode() def _compute_blob_command_content(self, file_data): """Compute the blob command of a file entry if it has not been computed yet. """ obj_id = file_data['sha1'] if obj_id in self.obj_done: return content = get_filtered_file_content(self.storage, file_data) - yield fastimport.commands.BlobCommand( - mark=self.mark(obj_id), - data=content, - ) + self.write_cmd(BlobCommand(mark=self.mark(obj_id), data=content)) self.obj_done.add(obj_id) def _author_tuple_format(self, author, date): # We never want to have None values here so we replace null entries # by ''. if author is not None: author_tuple = (author.get('name') or b'', author.get('email') or b'') else: author_tuple = (b'', b'') if date is not None: date_tuple = (date.get('timestamp', {}).get('seconds') or 0, (date.get('offset') or 0) * 60) else: date_tuple = (0, 0) return author_tuple + date_tuple def _compute_commit_command(self, rev): """Compute a commit command from a specific revision. """ if 'parents' in rev and rev['parents']: from_ = b':' + self.mark(rev['parents'][0]) merges = [b':' + self.mark(r) for r in rev['parents'][1:]] parent = self.rev_by_id[rev['parents'][0]] else: # We issue a reset command before all the new roots so that they # are not automatically added as children of the current branch. - yield fastimport.commands.ResetCommand(b'refs/heads/master', None) + self.write_cmd(ResetCommand(b'refs/heads/master', None)) from_ = None merges = None parent = None # Retrieve the file commands while yielding new blob commands if # needed. - files = yield from self._compute_file_commands(rev, parent) + files = list(self._compute_file_commands(rev, parent)) - # Construct and yield the commit command + # Construct and write the commit command author = self._author_tuple_format(rev['author'], rev['date']) committer = self._author_tuple_format(rev['committer'], rev['committer_date']) - yield fastimport.commands.CommitCommand( + self.write_cmd(CommitCommand( ref=b'refs/heads/master', mark=self.mark(rev['id']), author=author, committer=committer, message=rev['message'] or b'', from_=from_, merges=merges, - file_iter=files, - ) + file_iter=files)) @functools.lru_cache(maxsize=4096) def _get_dir_ents(self, dir_id=None): """Get the entities of a directory as a dictionary (name -> entity). This function has a cache to avoid doing multiple requests to retrieve the same entities, as doing a directory_ls() is expensive. """ data = (self.storage.directory_ls(dir_id) if dir_id is not None else []) return {f['name']: f for f in data} def _compute_file_commands(self, rev, parent=None): """Compute all the file commands of a revision. Generate a diff of the files between the revision and its main parent to find the necessary file commands to apply. """ - commands = [] - # Initialize the stack with the root of the tree. cur_dir = rev['directory'] parent_dir = parent['directory'] if parent else None stack = [(b'', cur_dir, parent_dir)] while stack: # Retrieve the current directory and the directory of the parent # commit in order to compute the diff of the trees. root, cur_dir_id, prev_dir_id = stack.pop() cur_dir = self._get_dir_ents(cur_dir_id) prev_dir = self._get_dir_ents(prev_dir_id) # Find subtrees to delete: # - Subtrees that are not in the new tree (file or directory # deleted). # - Subtrees that do not have the same type in the new tree # (file -> directory or directory -> file) # After this step, every node remaining in the previous directory # has the same type than the one in the current directory. for fname, f in prev_dir.items(): if ((fname not in cur_dir or f['type'] != cur_dir[fname]['type'])): - commands.append(fastimport.commands.FileDeleteCommand( - path=os.path.join(root, fname) - )) + yield FileDeleteCommand(path=os.path.join(root, fname)) # Find subtrees to modify: # - Leaves (files) will be added or modified using `filemodify` # - Other subtrees (directories) will be added to the stack and # processed in the next iteration. for fname, f in cur_dir.items(): # A file is added or modified if it was not in the tree, if its # permissions changed or if its content changed. if (f['type'] == 'file' and (fname not in prev_dir or f['sha1'] != prev_dir[fname]['sha1'] or f['perms'] != prev_dir[fname]['perms'])): # Issue a blob command for the new blobs if needed. - yield from self._compute_blob_command_content(f) - commands.append(fastimport.commands.FileModifyCommand( + self._compute_blob_command_content(f) + yield FileModifyCommand( path=os.path.join(root, fname), mode=mode_to_perms(f['perms']).value, dataref=(b':' + self.mark(f['sha1'])), - data=None, - )) + data=None) + # A revision is added or modified if it was not in the tree or + # if its target changed + elif (f['type'] == 'rev' + and (fname not in prev_dir + or f['target'] != prev_dir[fname]['target'])): + yield FileModifyCommand( + path=os.path.join(root, fname), + mode=0o160000, + dataref=hashutil.hash_to_hex(f['target']).encode(), + data=None) # A directory is added or modified if it was not in the tree or # if its target changed. elif f['type'] == 'dir': f_prev_target = None if fname in prev_dir and prev_dir[fname]['type'] == 'dir': f_prev_target = prev_dir[fname]['target'] if f_prev_target is None or f['target'] != f_prev_target: stack.append((os.path.join(root, fname), f['target'], f_prev_target)) - return commands diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py index 5ba8425..8c4724e 100644 --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -1,284 +1,329 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime import psycopg2 import unittest from unittest.mock import patch from swh.core.tests.db_testing import DbTestFixture from swh.model import hashutil from swh.storage.tests.storage_testing import StorageTestFixture from swh.vault.tests.vault_testing import VaultTestFixture, hash_content class BaseTestBackend(VaultTestFixture, StorageTestFixture, DbTestFixture): @contextlib.contextmanager def mock_cooking(self): with patch.object(self.vault_backend, '_send_task') as mt: mt.return_value = 42 with patch('swh.vault.backend.get_cooker') as mg: mcc = unittest.mock.MagicMock() mc = unittest.mock.MagicMock() mg.return_value = mcc mcc.return_value = mc mc.check_exists.return_value = True yield {'send_task': mt, 'get_cooker': mg, 'cooker_cls': mcc, 'cooker': mc} def assertTimestampAlmostNow(self, ts, tolerance_secs=1.0): # noqa now = datetime.datetime.now(datetime.timezone.utc) creation_delta_secs = (ts - now).total_seconds() self.assertLess(creation_delta_secs, tolerance_secs) def fake_cook(self, obj_type, result_content, sticky=False): content, obj_id = hash_content(result_content) with self.mock_cooking(): self.vault_backend.create_task(obj_type, obj_id, sticky) self.vault_backend.cache.add(obj_type, obj_id, b'content') self.vault_backend.set_status(obj_type, obj_id, 'done') return obj_id, content + def fail_cook(self, obj_type, obj_id, failure_reason): + with self.mock_cooking(): + self.vault_backend.create_task(obj_type, obj_id) + self.vault_backend.set_status(obj_type, obj_id, 'failed') + self.vault_backend.set_progress(obj_type, obj_id, failure_reason) + TEST_TYPE = 'revision_gitfast' TEST_HEX_ID = '4a4b9771542143cf070386f86b4b92d42966bdbc' TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) TEST_PROGRESS = ("Mr. White, You're telling me you're cooking again?" " \N{ASTONISHED FACE} ") TEST_EMAIL = 'ouiche@example.com' class TestBackend(BaseTestBackend, unittest.TestCase): def test_create_task_simple(self): with self.mock_cooking() as m: self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) m['get_cooker'].assert_called_once_with(TEST_TYPE) args = m['cooker_cls'].call_args[0] self.assertEqual(args[0], TEST_TYPE) self.assertEqual(args[1], TEST_HEX_ID) self.assertEqual(m['cooker'].check_exists.call_count, 1) self.assertEqual(m['send_task'].call_count, 1) args = m['send_task'].call_args[0][0] self.assertEqual(args[0], TEST_TYPE) self.assertEqual(args[1], TEST_HEX_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['object_id'], TEST_OBJ_ID) self.assertEqual(info['type'], TEST_TYPE) self.assertEqual(info['task_status'], 'new') self.assertEqual(info['task_id'], 42) self.assertTimestampAlmostNow(info['ts_created']) self.assertEqual(info['ts_done'], None) self.assertEqual(info['progress_msg'], None) def test_create_fail_duplicate_task(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) with self.assertRaises(psycopg2.IntegrityError): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) def test_create_fail_nonexisting_object(self): with self.mock_cooking() as m: m['cooker'].check_exists.side_effect = ValueError('Nothing here.') with self.assertRaises(ValueError): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) def test_create_set_progress(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['progress_msg'], None) self.vault_backend.set_progress(TEST_TYPE, TEST_OBJ_ID, TEST_PROGRESS) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['progress_msg'], TEST_PROGRESS) def test_create_set_status(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['task_status'], 'new') self.assertEqual(info['ts_done'], None) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'pending') info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['task_status'], 'pending') self.assertEqual(info['ts_done'], None) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['task_status'], 'done') self.assertTimestampAlmostNow(info['ts_done']) def test_create_update_access_ts(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_1 = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_1) self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_2 = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_2) self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_3 = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_3) self.assertLess(access_ts_1, access_ts_2) self.assertLess(access_ts_2, access_ts_3) def test_cook_request_idempotent(self): with self.mock_cooking(): info1 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) info2 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) info3 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info1, info2) self.assertEqual(info1, info3) def test_cook_email_pending_done(self): with self.mock_cooking(), \ patch.object(self.vault_backend, 'add_notif_email') as madd, \ patch.object(self.vault_backend, 'send_notification') as msend: self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) madd.assert_not_called() msend.assert_not_called() madd.reset_mock() msend.reset_mock() self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) msend.assert_not_called() madd.reset_mock() msend.reset_mock() self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_OBJ_ID) madd.assert_not_called() def test_send_all_emails(self): with self.mock_cooking(): emails = ('a@example.com', 'billg@example.com', 'test+42@example.org') for email in emails: self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email=email) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') with patch.object(self.vault_backend, 'smtp_server') as m: self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) sent_emails = {k[0][0] for k in m.send_message.call_args_list} self.assertEqual({k['To'] for k in sent_emails}, set(emails)) for e in sent_emails: self.assertIn('info@softwareheritage.org', e['From']) self.assertIn(TEST_TYPE, e['Subject']) self.assertIn(TEST_HEX_ID[:5], e['Subject']) self.assertIn(TEST_TYPE, str(e)) self.assertIn('https://archive.softwareheritage.org/', str(e)) self.assertIn(TEST_HEX_ID[:5], str(e)) self.assertIn('--\x20\n', str(e)) # Well-formated signature!!! # Check that the entries have been deleted and recalling the # function does not re-send the e-mails m.reset_mock() self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) m.assert_not_called() def test_available(self): self.assertFalse(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) self.assertFalse(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) self.vault_backend.cache.add(TEST_TYPE, TEST_OBJ_ID, b'content') self.assertFalse(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') self.assertTrue(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) def test_fetch(self): self.assertEqual(self.vault_backend.fetch(TEST_TYPE, TEST_OBJ_ID), None) obj_id, content = self.fake_cook(TEST_TYPE, b'content') info = self.vault_backend.task_info(TEST_TYPE, obj_id) access_ts_before = info['ts_last_access'] self.assertEqual(self.vault_backend.fetch(TEST_TYPE, obj_id), b'content') info = self.vault_backend.task_info(TEST_TYPE, obj_id) access_ts_after = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_after) self.assertLess(access_ts_before, access_ts_after) def test_cache_expire_oldest(self): r = range(1, 10) inserted = {} for i in r: sticky = (i == 5) content = b'content%s' % str(i).encode() obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) self.vault_backend.cache_expire_oldest(n=4) should_be_still_here = {2, 3, 5, 8, 9} for i in r: self.assertEqual(self.vault_backend.is_available( TEST_TYPE, inserted[i][0]), i in should_be_still_here) def test_cache_expire_until(self): r = range(1, 10) inserted = {} for i in r: sticky = (i == 5) content = b'content%s' % str(i).encode() obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) if i == 7: cutoff_date = datetime.datetime.now() self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) self.vault_backend.cache_expire_until(date=cutoff_date) should_be_still_here = {2, 3, 5, 8, 9} for i in r: self.assertEqual(self.vault_backend.is_available( TEST_TYPE, inserted[i][0]), i in should_be_still_here) + + def test_fail_cook_simple(self): + self.fail_cook(TEST_TYPE, TEST_OBJ_ID, 'error42') + self.assertFalse(self.vault_backend.is_available(TEST_TYPE, + TEST_OBJ_ID)) + info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) + self.assertEqual(info['progress_msg'], 'error42') + + def test_send_failure_email(self): + with self.mock_cooking(): + self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, + email='a@example.com') + + self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'failed') + self.vault_backend.set_progress(TEST_TYPE, TEST_OBJ_ID, 'test error') + + with patch.object(self.vault_backend, 'smtp_server') as m: + self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) + + e = [k[0][0] for k in m.send_message.call_args_list][0] + self.assertEqual(e['To'], 'a@example.com') + + self.assertIn('info@softwareheritage.org', e['From']) + self.assertIn(TEST_TYPE, e['Subject']) + self.assertIn(TEST_HEX_ID[:5], e['Subject']) + self.assertIn('fail', e['Subject']) + self.assertIn(TEST_TYPE, str(e)) + self.assertIn(TEST_HEX_ID[:5], str(e)) + self.assertIn('test error', str(e)) + self.assertIn('--\x20\n', str(e)) # Well-formated signature + + def test_retry_failed_bundle(self): + self.fail_cook(TEST_TYPE, TEST_OBJ_ID, 'error42') + info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) + self.assertEqual(info['task_status'], 'failed') + with self.mock_cooking(): + self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) + info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) + self.assertEqual(info['task_status'], 'new') diff --git a/swh/vault/tests/test_cookers.py b/swh/vault/tests/test_cookers.py index c2f2696..71c6d7b 100644 --- a/swh/vault/tests/test_cookers.py +++ b/swh/vault/tests/test_cookers.py @@ -1,420 +1,486 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime import gzip import io import os import pathlib import subprocess import tarfile import tempfile import unittest import unittest.mock import dulwich.fastexport import dulwich.index import dulwich.objects import dulwich.porcelain import dulwich.repo from swh.core.tests.db_testing import DbTestFixture from swh.loader.git.loader import GitLoader from swh.model import hashutil from swh.model.from_disk import Directory from swh.storage.tests.storage_testing import StorageTestFixture from swh.vault.cookers import DirectoryCooker, RevisionGitfastCooker -from swh.vault.cookers.base import SKIPPED_MESSAGE, HIDDEN_MESSAGE from swh.vault.tests.vault_testing import VaultTestFixture, hash_content +from swh.vault.to_disk import SKIPPED_MESSAGE, HIDDEN_MESSAGE class TestRepo: """A tiny context manager for a test git repository, with some utility functions to perform basic git stuff. """ def __enter__(self): self.tmp_dir = tempfile.TemporaryDirectory(prefix='tmp-vault-repo-') self.repo_dir = self.tmp_dir.__enter__() self.repo = dulwich.repo.Repo.init(self.repo_dir) self.author = '"Test Author" '.encode() return pathlib.Path(self.repo_dir) def __exit__(self, exc, value, tb): self.tmp_dir.__exit__(exc, value, tb) def checkout(self, rev_sha): rev = self.repo[rev_sha] dulwich.index.build_index_from_tree(self.repo_dir, self.repo.index_path(), self.repo.object_store, rev.tree) def git_shell(self, *cmd, stdout=subprocess.DEVNULL, **kwargs): subprocess.check_call(('git', '-C', self.repo_dir) + cmd, stdout=stdout, **kwargs) def commit(self, message='Commit test\n', ref=b'HEAD'): self.git_shell('add', '.') message = message.encode() + b'\n' return self.repo.do_commit(message=message, committer=self.author, ref=ref) def merge(self, parent_sha_list, message='Merge branches.'): self.git_shell('merge', '--allow-unrelated-histories', '-m', message, *[p.decode() for p in parent_sha_list]) return self.repo.refs[b'HEAD'] def print_debug_graph(self, reflog=False): args = ['log', '--all', '--graph', '--decorate'] if reflog: args.append('--reflog') self.git_shell(*args, stdout=None) class BaseTestCookers(VaultTestFixture, StorageTestFixture, DbTestFixture): """Base class of cookers unit tests""" def setUp(self): super().setUp() self.loader = GitLoader() self.loader.storage = self.storage def load(self, repo_path): """Load a repository in the test storage""" self.loader.load('fake_origin', repo_path, datetime.datetime.now()) @contextlib.contextmanager def cook_extract_directory(self, obj_id): """Context manager that cooks a directory and extract it.""" cooker = DirectoryCooker('directory', obj_id) cooker.storage = self.storage cooker.backend = unittest.mock.MagicMock() - cooker.check_exists() # Raises if false - tarball = b''.join(cooker.prepare_bundle()) - with tempfile.TemporaryDirectory('tmp-vault-extract-') as td: - fobj = io.BytesIO(tarball) - with tarfile.open(fileobj=fobj, mode='r') as tar: + cooker.fileobj = io.BytesIO() + assert cooker.check_exists() + cooker.prepare_bundle() + cooker.fileobj.seek(0) + with tempfile.TemporaryDirectory(prefix='tmp-vault-extract-') as td: + with tarfile.open(fileobj=cooker.fileobj, mode='r') as tar: tar.extractall(td) - p = pathlib.Path(td) / hashutil.hash_to_hex(obj_id) - yield p + yield pathlib.Path(td) / hashutil.hash_to_hex(obj_id) @contextlib.contextmanager - def cook_extract_revision_gitfast(self, obj_id): - """Context manager that cooks a revision and extract it.""" + def cook_stream_revision_gitfast(self, obj_id): + """Context manager that cooks a revision and stream its fastexport.""" cooker = RevisionGitfastCooker('revision_gitfast', obj_id) cooker.storage = self.storage cooker.backend = unittest.mock.MagicMock() - cooker.check_exists() # Raises if false - fastexport = b''.join(cooker.prepare_bundle()) - fastexport_stream = gzip.GzipFile(fileobj=io.BytesIO(fastexport)) + cooker.fileobj = io.BytesIO() + assert cooker.check_exists() + cooker.prepare_bundle() + cooker.fileobj.seek(0) + fastexport_stream = gzip.GzipFile(fileobj=cooker.fileobj) + yield fastexport_stream + + @contextlib.contextmanager + def cook_extract_revision_gitfast(self, obj_id): + """Context manager that cooks a revision and extract it.""" test_repo = TestRepo() - with test_repo as p: + with self.cook_stream_revision_gitfast(obj_id) as stream, \ + test_repo as p: processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) - processor.import_stream(fastexport_stream) + processor.import_stream(stream) yield test_repo, p TEST_CONTENT = (" test content\n" "and unicode \N{BLACK HEART SUIT}\n" " and trailing spaces ") TEST_EXECUTABLE = b'\x42\x40\x00\x00\x05' class TestDirectoryCooker(BaseTestCookers, unittest.TestCase): def test_directory_simple(self): repo = TestRepo() with repo as rp: (rp / 'file').write_text(TEST_CONTENT) (rp / 'executable').write_bytes(TEST_EXECUTABLE) (rp / 'executable').chmod(0o755) (rp / 'link').symlink_to('file') (rp / 'dir1/dir2').mkdir(parents=True) (rp / 'dir1/dir2/file').write_text(TEST_CONTENT) c = repo.commit() self.load(str(rp)) obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with self.cook_extract_directory(obj_id) as p: self.assertEqual((p / 'file').stat().st_mode, 0o100644) self.assertEqual((p / 'file').read_text(), TEST_CONTENT) self.assertEqual((p / 'executable').stat().st_mode, 0o100755) self.assertEqual((p / 'executable').read_bytes(), TEST_EXECUTABLE) self.assertTrue((p / 'link').is_symlink) self.assertEqual(os.readlink(str(p / 'link')), 'file') self.assertEqual((p / 'dir1/dir2/file').stat().st_mode, 0o100644) self.assertEqual((p / 'dir1/dir2/file').read_text(), TEST_CONTENT) directory = Directory.from_disk(path=bytes(p)) self.assertEqual(obj_id_hex, hashutil.hash_to_hex(directory.hash)) def test_directory_filtered_objects(self): repo = TestRepo() with repo as rp: file_1, id_1 = hash_content(b'test1') file_2, id_2 = hash_content(b'test2') file_3, id_3 = hash_content(b'test3') (rp / 'file').write_bytes(file_1) (rp / 'hidden_file').write_bytes(file_2) (rp / 'absent_file').write_bytes(file_3) c = repo.commit() self.load(str(rp)) obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) # FIXME: storage.content_update() should be changed to allow things # like that cur = self.storage.db._cursor(None) cur.execute("""update content set status = 'visible' where sha1 = %s""", (id_1,)) cur.execute("""update content set status = 'hidden' where sha1 = %s""", (id_2,)) cur.execute("""update content set status = 'absent' where sha1 = %s""", (id_3,)) cur.close() with self.cook_extract_directory(obj_id) as p: self.assertEqual((p / 'file').read_bytes(), b'test1') self.assertEqual((p / 'hidden_file').read_bytes(), HIDDEN_MESSAGE) self.assertEqual((p / 'absent_file').read_bytes(), SKIPPED_MESSAGE) def test_directory_bogus_perms(self): # Some early git repositories have 664/775 permissions... let's check # if all the weird modes are properly normalized in the directory # cooker. repo = TestRepo() with repo as rp: (rp / 'file').write_text(TEST_CONTENT) (rp / 'file').chmod(0o664) (rp / 'executable').write_bytes(TEST_EXECUTABLE) (rp / 'executable').chmod(0o775) (rp / 'wat').write_text(TEST_CONTENT) (rp / 'wat').chmod(0o604) c = repo.commit() self.load(str(rp)) obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with self.cook_extract_directory(obj_id) as p: self.assertEqual((p / 'file').stat().st_mode, 0o100644) self.assertEqual((p / 'executable').stat().st_mode, 0o100755) self.assertEqual((p / 'wat').stat().st_mode, 0o100644) + def test_directory_revision_data(self): + target_rev = '0e8a3ad980ec179856012b7eecf4327e99cd44cd' + d = hashutil.hash_to_bytes('17a3e48bce37be5226490e750202ad3a9a1a3fe9') + + dir = { + 'id': d, + 'entries': [ + { + 'name': b'submodule', + 'type': 'rev', + 'target': hashutil.hash_to_bytes(target_rev), + 'perms': 0o100644, + } + ], + } + self.storage.directory_add([dir]) + + with self.cook_extract_directory(d) as p: + self.assertTrue((p / 'submodule').is_symlink()) + self.assertEqual(os.readlink(str(p / 'submodule')), target_rev) + class TestRevisionGitfastCooker(BaseTestCookers, unittest.TestCase): def test_revision_simple(self): # # 1--2--3--4--5--6--7 # repo = TestRepo() with repo as rp: (rp / 'file1').write_text(TEST_CONTENT) repo.commit('add file1') (rp / 'file2').write_text(TEST_CONTENT) repo.commit('add file2') (rp / 'dir1/dir2').mkdir(parents=True) (rp / 'dir1/dir2/file').write_text(TEST_CONTENT) repo.commit('add dir1/dir2/file') (rp / 'bin1').write_bytes(TEST_EXECUTABLE) (rp / 'bin1').chmod(0o755) repo.commit('add bin1') (rp / 'link1').symlink_to('file1') repo.commit('link link1 to file1') (rp / 'file2').unlink() repo.commit('remove file2') (rp / 'bin1').rename(rp / 'bin') repo.commit('rename bin1 to bin') self.load(str(rp)) obj_id_hex = repo.repo.refs[b'HEAD'].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with self.cook_extract_revision_gitfast(obj_id) as (ert, p): ert.checkout(b'HEAD') self.assertEqual((p / 'file1').stat().st_mode, 0o100644) self.assertEqual((p / 'file1').read_text(), TEST_CONTENT) self.assertTrue((p / 'link1').is_symlink) self.assertEqual(os.readlink(str(p / 'link1')), 'file1') self.assertEqual((p / 'bin').stat().st_mode, 0o100755) self.assertEqual((p / 'bin').read_bytes(), TEST_EXECUTABLE) self.assertEqual((p / 'dir1/dir2/file').read_text(), TEST_CONTENT) self.assertEqual((p / 'dir1/dir2/file').stat().st_mode, 0o100644) self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex) def test_revision_two_roots(self): # # 1----3---4 # / # 2---- # repo = TestRepo() with repo as rp: (rp / 'file1').write_text(TEST_CONTENT) c1 = repo.commit('Add file1') del repo.repo.refs[b'refs/heads/master'] # git update-ref -d HEAD (rp / 'file2').write_text(TEST_CONTENT) repo.commit('Add file2') repo.merge([c1]) (rp / 'file3').write_text(TEST_CONTENT) repo.commit('add file3') obj_id_hex = repo.repo.refs[b'HEAD'].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) self.load(str(rp)) with self.cook_extract_revision_gitfast(obj_id) as (ert, p): self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex) def test_revision_two_double_fork_merge(self): # # 2---4---6 # / / / # 1---3---5 # repo = TestRepo() with repo as rp: (rp / 'file1').write_text(TEST_CONTENT) c1 = repo.commit('Add file1') repo.repo.refs[b'refs/heads/c1'] = c1 (rp / 'file2').write_text(TEST_CONTENT) repo.commit('Add file2') (rp / 'file3').write_text(TEST_CONTENT) c3 = repo.commit('Add file3', ref=b'refs/heads/c1') repo.repo.refs[b'refs/heads/c3'] = c3 repo.merge([c3]) (rp / 'file5').write_text(TEST_CONTENT) c5 = repo.commit('Add file3', ref=b'refs/heads/c3') repo.merge([c5]) obj_id_hex = repo.repo.refs[b'HEAD'].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) self.load(str(rp)) with self.cook_extract_revision_gitfast(obj_id) as (ert, p): self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex) def test_revision_triple_merge(self): # # .---.---5 # / / / # 2 3 4 # / / / # 1---.---. # repo = TestRepo() with repo as rp: (rp / 'file1').write_text(TEST_CONTENT) c1 = repo.commit('Commit 1') repo.repo.refs[b'refs/heads/b1'] = c1 repo.repo.refs[b'refs/heads/b2'] = c1 repo.commit('Commit 2') c3 = repo.commit('Commit 3', ref=b'refs/heads/b1') c4 = repo.commit('Commit 4', ref=b'refs/heads/b2') repo.merge([c3, c4]) obj_id_hex = repo.repo.refs[b'HEAD'].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) self.load(str(rp)) with self.cook_extract_revision_gitfast(obj_id) as (ert, p): self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex) def test_revision_filtered_objects(self): repo = TestRepo() with repo as rp: file_1, id_1 = hash_content(b'test1') file_2, id_2 = hash_content(b'test2') file_3, id_3 = hash_content(b'test3') (rp / 'file').write_bytes(file_1) (rp / 'hidden_file').write_bytes(file_2) (rp / 'absent_file').write_bytes(file_3) repo.commit() obj_id_hex = repo.repo.refs[b'HEAD'].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) self.load(str(rp)) # FIXME: storage.content_update() should be changed to allow things # like that cur = self.storage.db._cursor(None) cur.execute("""update content set status = 'visible' where sha1 = %s""", (id_1,)) cur.execute("""update content set status = 'hidden' where sha1 = %s""", (id_2,)) cur.execute("""update content set status = 'absent' where sha1 = %s""", (id_3,)) cur.close() with self.cook_extract_revision_gitfast(obj_id) as (ert, p): ert.checkout(b'HEAD') self.assertEqual((p / 'file').read_bytes(), b'test1') self.assertEqual((p / 'hidden_file').read_bytes(), HIDDEN_MESSAGE) self.assertEqual((p / 'absent_file').read_bytes(), SKIPPED_MESSAGE) def test_revision_bogus_perms(self): # Some early git repositories have 664/775 permissions... let's check # if all the weird modes are properly normalized in the revision # cooker. repo = TestRepo() with repo as rp: (rp / 'file').write_text(TEST_CONTENT) (rp / 'file').chmod(0o664) (rp / 'executable').write_bytes(TEST_EXECUTABLE) (rp / 'executable').chmod(0o775) (rp / 'wat').write_text(TEST_CONTENT) (rp / 'wat').chmod(0o604) repo.commit('initial commit') self.load(str(rp)) obj_id_hex = repo.repo.refs[b'HEAD'].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with self.cook_extract_revision_gitfast(obj_id) as (ert, p): ert.checkout(b'HEAD') self.assertEqual((p / 'file').stat().st_mode, 0o100644) self.assertEqual((p / 'executable').stat().st_mode, 0o100755) self.assertEqual((p / 'wat').stat().st_mode, 0o100644) def test_revision_null_fields(self): # Our schema doesn't enforce a lot of non-null revision fields. We need # to check these cases don't break the cooker. repo = TestRepo() with repo as rp: (rp / 'file').write_text(TEST_CONTENT) c = repo.commit('initial commit') self.load(str(rp)) repo.repo.refs[b'HEAD'].decode() dir_id_hex = repo.repo[c].tree.decode() dir_id = hashutil.hash_to_bytes(dir_id_hex) test_id = b'56789012345678901234' test_revision = { 'id': test_id, 'message': None, 'author': {'name': None, 'email': None, 'fullname': ''}, 'date': None, 'committer': {'name': None, 'email': None, 'fullname': ''}, 'committer_date': None, 'parents': [], 'type': 'git', 'directory': dir_id, 'metadata': {}, 'synthetic': True } self.storage.revision_add([test_revision]) with self.cook_extract_revision_gitfast(test_id) as (ert, p): ert.checkout(b'HEAD') self.assertEqual((p / 'file').stat().st_mode, 0o100644) + + def test_revision_revision_data(self): + target_rev = '0e8a3ad980ec179856012b7eecf4327e99cd44cd' + d = hashutil.hash_to_bytes('17a3e48bce37be5226490e750202ad3a9a1a3fe9') + r = hashutil.hash_to_bytes('1ecc9270c4fc61cfddbc65a774e91ef5c425a6f0') + + dir = { + 'id': d, + 'entries': [ + { + 'name': b'submodule', + 'type': 'rev', + 'target': hashutil.hash_to_bytes(target_rev), + 'perms': 0o100644, + } + ], + } + self.storage.directory_add([dir]) + + rev = { + 'id': r, + 'message': None, + 'author': {'name': None, 'email': None, 'fullname': ''}, + 'date': None, + 'committer': {'name': None, 'email': None, 'fullname': ''}, + 'committer_date': None, + 'parents': [], + 'type': 'git', + 'directory': d, + 'metadata': {}, + 'synthetic': True + } + self.storage.revision_add([rev]) + + with self.cook_stream_revision_gitfast(r) as stream: + pattern = 'M 160000 {} submodule'.format(target_rev).encode() + self.assertIn(pattern, stream.read()) diff --git a/swh/vault/tests/test_cookers_base.py b/swh/vault/tests/test_cookers_base.py new file mode 100644 index 0000000..55586b6 --- /dev/null +++ b/swh/vault/tests/test_cookers_base.py @@ -0,0 +1,78 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import unittest +from unittest.mock import MagicMock + +from swh.model import hashutil +from swh.vault.cookers.base import BaseVaultCooker + + +TEST_BUNDLE_CHUNKS = [b"test content 1\n", + b"test content 2\n", + b"test content 3\n"] +TEST_BUNDLE_CONTENT = b''.join(TEST_BUNDLE_CHUNKS) +TEST_OBJ_TYPE = 'test_type' +TEST_HEX_ID = '17a3e48bce37be5226490e750202ad3a9a1a3fe9' +TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) + + +class BaseVaultCookerMock(BaseVaultCooker): + CACHE_TYPE_KEY = TEST_OBJ_TYPE + + def __init__(self, *args, **kwargs): + super().__init__(self.CACHE_TYPE_KEY, TEST_OBJ_ID, *args, **kwargs) + self.storage = MagicMock() + self.backend = MagicMock() + + def check_exists(self): + return True + + def prepare_bundle(self): + for chunk in TEST_BUNDLE_CHUNKS: + self.write(chunk) + + +class TestBaseVaultCooker(unittest.TestCase): + def test_simple_cook(self): + cooker = BaseVaultCookerMock() + cooker.cook() + cooker.backend.put_bundle.assert_called_once_with( + TEST_OBJ_TYPE, TEST_OBJ_ID, TEST_BUNDLE_CONTENT) + cooker.backend.set_status.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID, 'done') + cooker.backend.set_progress.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID, None) + cooker.backend.send_notif.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID) + + def test_code_exception_cook(self): + cooker = BaseVaultCookerMock() + cooker.prepare_bundle = MagicMock() + cooker.prepare_bundle.side_effect = RuntimeError("Nope") + cooker.cook() + + # Potentially remove this when we have objstorage streaming + cooker.backend.put_bundle.assert_not_called() + + cooker.backend.set_status.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed') + self.assertNotIn("Nope", cooker.backend.set_progress.call_args[0][2]) + cooker.backend.send_notif.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID) + + def test_policy_exception_cook(self): + cooker = BaseVaultCookerMock() + cooker.max_bundle_size = 8 + cooker.cook() + + # Potentially remove this when we have objstorage streaming + cooker.backend.put_bundle.assert_not_called() + + cooker.backend.set_status.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed') + self.assertIn("exceeds", cooker.backend.set_progress.call_args[0][2]) + cooker.backend.send_notif.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID) diff --git a/swh/vault/tests/vault_testing.py b/swh/vault/tests/vault_testing.py index 8e1de1e..561c42d 100644 --- a/swh/vault/tests/vault_testing.py +++ b/swh/vault/tests/vault_testing.py @@ -1,70 +1,71 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile import pathlib from swh.model import hashutil from swh.vault.backend import VaultBackend class VaultTestFixture: """Mix this in a test subject class to get Vault Database testing support. This fixture requires to come before DbTestFixture and StorageTestFixture in the inheritance list as it uses their methods to setup its own internal components. Usage example: class TestVault(VaultTestFixture, StorageTestFixture, DbTestFixture): ... """ TEST_VAULT_DB_NAME = 'softwareheritage-test-vault' @classmethod def setUpClass(cls): if not hasattr(cls, 'DB_TEST_FIXTURE_IMPORTED'): raise RuntimeError("VaultTestFixture needs to be followed by " "DbTestFixture in the inheritance list.") test_dir = pathlib.Path(__file__).absolute().parent test_db_dump = test_dir / '../../../sql/swh-vault-schema.sql' test_db_dump = test_db_dump.absolute() cls.add_db(cls.TEST_VAULT_DB_NAME, str(test_db_dump), 'psql') super().setUpClass() def setUp(self): super().setUp() self.cache_root = tempfile.TemporaryDirectory('vault-cache-') self.vault_config = { 'storage': self.storage_config, - 'db': 'postgresql:///' + self.TEST_VAULT_DB_NAME, 'cache': { 'cls': 'pathslicing', 'args': { 'root': self.cache_root.name, 'slicing': '0:1/1:5', 'allow_delete': True, } - } + }, + 'db': 'postgresql:///' + self.TEST_VAULT_DB_NAME, + 'scheduling_db': None, } self.vault_backend = VaultBackend(self.vault_config) def tearDown(self): self.cache_root.cleanup() self.vault_backend.close() self.reset_storage_tables() self.reset_vault_tables() super().tearDown() def reset_vault_tables(self): excluded = {'dbversion'} self.reset_db_tables(self.TEST_VAULT_DB_NAME, excluded=excluded) def hash_content(content): obj_id = hashutil.hash_data(content)['sha1'] return content, obj_id diff --git a/swh/vault/to_disk.py b/swh/vault/to_disk.py new file mode 100644 index 0000000..6c6fc68 --- /dev/null +++ b/swh/vault/to_disk.py @@ -0,0 +1,113 @@ +# Copyright (C) 2016-2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import itertools +import os + +from swh.model import hashutil +from swh.model.from_disk import mode_to_perms, DentryPerms + +SKIPPED_MESSAGE = (b'This content has not been retrieved in the ' + b'Software Heritage archive due to its size.') + +HIDDEN_MESSAGE = (b'This content is hidden.') + + +def get_filtered_file_content(storage, file_data): + """Retrieve the file specified by file_data and apply filters for skipped + and missing contents. + + Args: + storage: the storage from which to retrieve the object + file_data: file entry descriptor as returned by directory_ls() + + Returns: + Bytes containing the specified content. The content will be replaced by + a specific message to indicate that the content could not be retrieved + (either due to privacy policy or because its size was too big for us to + archive it). + """ + assert file_data['type'] == 'file' + + if file_data['status'] == 'absent': + return SKIPPED_MESSAGE + elif file_data['status'] == 'hidden': + return HIDDEN_MESSAGE + else: + return list(storage.content_get([file_data['sha1']]))[0]['data'] + + +class DirectoryBuilder: + """Reconstructs the on-disk representation of a directory in the storage. + """ + + def __init__(self, storage, root, dir_id): + """Initialize the directory builder. + + Args: + storage: the storage object + root: the path where the directory should be reconstructed + dir_id: the identifier of the directory in the storage + """ + self.storage = storage + self.root = root + self.dir_id = dir_id + + def build(self): + """Perform the reconstruction of the directory in the given root.""" + # Retrieve data from the database. + data = self.storage.directory_ls(self.dir_id, recursive=True) + + # Split into files and directory data. + data1, data2 = itertools.tee(data, 2) + dir_data = (entry['name'] for entry in data1 if entry['type'] == 'dir') + file_data = (entry for entry in data2 if entry['type'] != 'dir') + + # Recreate the directory's subtree and then the files into it. + self._create_tree(dir_data) + self._create_files(file_data) + + def _create_tree(self, directory_paths): + """Create a directory tree from the given paths + + The tree is created from `root` and each given path in + `directory_paths` will be created. + + """ + # Directories are sorted by depth so they are created in the + # right order + bsep = bytes(os.path.sep, 'utf8') + dir_names = sorted( + directory_paths, + key=lambda x: len(x.split(bsep))) + for dir_name in dir_names: + os.makedirs(os.path.join(self.root, dir_name)) + + def _create_files(self, file_datas): + """Create the files according to their status.""" + for file_data in file_datas: + path = os.path.join(self.root, file_data['name']) + if file_data['type'] == 'file': + content = get_filtered_file_content(self.storage, file_data) + self._create_file(path, content, file_data['perms']) + elif file_data['type'] == 'rev': + self._create_file(path, + hashutil.hash_to_hex(file_data['target']), + 0o120000) + + def _create_file(self, path, content, mode=0o100644): + """Create the given file and fill it with content.""" + perms = mode_to_perms(mode) + if perms == DentryPerms.symlink: + os.symlink(content, path) + else: + with open(path, 'wb') as f: + f.write(content) + os.chmod(path, perms.value) + + def _get_file_content(self, obj_id): + """Get the content of the given file.""" + content = list(self.storage.content_get([obj_id]))[0]['data'] + return content diff --git a/version.txt b/version.txt index 0b9a192..04ea868 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.6-0-gd95bf6d \ No newline at end of file +v0.0.7-0-ga396ceb \ No newline at end of file