diff --git a/PKG-INFO b/PKG-INFO index 6aea873..81e2334 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.vault -Version: 0.0.10 +Version: 0.0.11 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/debian/changelog b/debian/changelog index 198da53..874ff8b 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,66 +1,66 @@ -swh-vault (0.0.10-1~swh1~bpo9+1) stretch-swh; urgency=medium +swh-vault (0.0.11-1~swh1) unstable-swh; urgency=medium - * Rebuild for stretch-backports. + * version 0.0.11 - -- Antoine Pietri Thu, 15 Feb 2018 16:08:05 +0100 + -- Antoine Pietri Fri, 16 Feb 2018 16:09:10 +0100 swh-vault (0.0.10-1~swh1) unstable-swh; urgency=medium * version 0.0.10 -- Antoine Pietri Thu, 15 Feb 2018 16:08:05 +0100 swh-vault (0.0.9-1~swh1) unstable-swh; urgency=medium * version 0.0.9 -- Antoine Pietri Thu, 01 Feb 2018 18:21:29 +0100 swh-vault (0.0.8-1~swh1) unstable-swh; urgency=medium * version 0.0.8 -- Antoine Pietri Wed, 31 Jan 2018 17:54:55 +0100 swh-vault (0.0.7-1~swh1) unstable-swh; urgency=medium * version 0.0.7 -- Antoine Pietri Tue, 30 Jan 2018 18:21:07 +0100 swh-vault (0.0.6-1~swh1) unstable-swh; urgency=medium * version 0.0.6 -- Antoine Pietri Tue, 09 Jan 2018 16:37:41 +0100 swh-vault (0.0.5-1~swh1) unstable-swh; urgency=medium * version 0.0.5 -- Antoine Pietri Thu, 14 Dec 2017 19:33:01 +0100 swh-vault (0.0.4-1~swh1) unstable-swh; urgency=medium * version 0.0.4 -- Antoine Pietri Fri, 08 Dec 2017 15:33:54 +0100 swh-vault (0.0.3-1~swh1) unstable-swh; urgency=medium * version 0.0.3 -- Antoine Pietri Fri, 01 Dec 2017 15:31:34 +0100 swh-vault (0.0.2-1~swh1) unstable-swh; urgency=medium * version 0.0.2 -- Antoine Pietri Thu, 30 Nov 2017 15:50:43 +0100 swh-vault (0.0.1-1~swh1) unstable-swh; urgency=medium * Initial release * version 0.0.1 -- Antoine Pietri Mon, 13 Nov 2017 16:22:47 +0100 diff --git a/swh.vault.egg-info/PKG-INFO b/swh.vault.egg-info/PKG-INFO index 6aea873..81e2334 100644 --- a/swh.vault.egg-info/PKG-INFO +++ b/swh.vault.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.vault -Version: 0.0.10 +Version: 0.0.11 Summary: Software Heritage vault Home-page: https://forge.softwareheritage.org/diffusion/DVAU/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py index 89b3439..e2c419a 100644 --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -1,182 +1,186 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import aiohttp.web import click from swh.core import config from swh.core.api_async import (SWHRemoteAPI, encode_data_server as encode_data, decode_request) from swh.model import hashutil from swh.vault.cookers import COOKER_TYPES from swh.vault.backend import VaultBackend, NotFoundExc DEFAULT_CONFIG_PATH = 'vault/server' DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'local', 'args': { 'db': 'dbname=softwareheritage-dev', 'objstorage': { 'cls': 'pathslicing', 'args': { 'root': '/srv/softwareheritage/objects', 'slicing': '0:2/2:4/4:6', }, }, }, }), 'cache': ('dict', { 'cls': 'pathslicing', 'args': { 'root': '/srv/softwareheritage/vault', 'slicing': '0:1/1:5', }, }), + 'client_max_size': ('int', 1024 ** 3), 'db': ('str', 'dbname=softwareheritage-vault-dev'), 'scheduling_db': ('str', 'dbname=softwareheritage-scheduler-dev'), } @asyncio.coroutine def index(request): return aiohttp.web.Response(body="SWH Vault API server") # Web API endpoints @asyncio.coroutine def vault_fetch(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] if not request.app['backend'].is_available(obj_type, obj_id): raise aiohttp.web.HTTPNotFound return encode_data(request.app['backend'].fetch(obj_type, obj_id)) def user_info(task_info): return {'id': task_info['id'], 'status': task_info['task_status'], 'progress_message': task_info['progress_msg'], 'obj_type': task_info['type'], 'obj_id': hashutil.hash_to_hex(task_info['object_id'])} @asyncio.coroutine def vault_cook(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] email = request.query.get('email') sticky = request.query.get('sticky') in ('true', '1') if obj_type not in COOKER_TYPES: raise aiohttp.web.HTTPNotFound try: info = request.app['backend'].cook_request(obj_type, obj_id, email=email, sticky=sticky) except NotFoundExc: raise aiohttp.web.HTTPNotFound # TODO: return 201 status (Created) once the api supports it return encode_data(user_info(info)) @asyncio.coroutine def vault_progress(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] info = request.app['backend'].task_info(obj_type, obj_id) if not info: raise aiohttp.web.HTTPNotFound return encode_data(user_info(info)) # Cookers endpoints @asyncio.coroutine def set_progress(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] progress = yield from decode_request(request) request.app['backend'].set_progress(obj_type, obj_id, progress) return encode_data(True) # FIXME: success value? @asyncio.coroutine def set_status(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] status = yield from decode_request(request) request.app['backend'].set_status(obj_type, obj_id, status) return encode_data(True) # FIXME: success value? @asyncio.coroutine def put_bundle(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] # TODO: handle streaming properly content = yield from decode_request(request) request.app['backend'].cache.add(obj_type, obj_id, content) return encode_data(True) # FIXME: success value? @asyncio.coroutine def send_notif(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] request.app['backend'].send_all_notifications(obj_type, obj_id) return encode_data(True) # FIXME: success value? # Web server def make_app(config, **kwargs): - app = SWHRemoteAPI(client_max_size=2 ** 29, **kwargs) + if 'client_max_size' in config: + kwargs['client_max_size'] = config['client_max_size'] + + app = SWHRemoteAPI(**kwargs) app.router.add_route('GET', '/', index) # Endpoints used by the web API app.router.add_route('GET', '/fetch/{type}/{id}', vault_fetch) app.router.add_route('POST', '/cook/{type}/{id}', vault_cook) app.router.add_route('GET', '/progress/{type}/{id}', vault_progress) # Endpoints used by the Cookers app.router.add_route('POST', '/set_progress/{type}/{id}', set_progress) app.router.add_route('POST', '/set_status/{type}/{id}', set_status) app.router.add_route('POST', '/put_bundle/{type}/{id}', put_bundle) app.router.add_route('POST', '/send_notif/{type}/{id}', send_notif) app['backend'] = VaultBackend(config) return app def make_app_from_configfile(config_path=DEFAULT_CONFIG_PATH, **kwargs): cfg = config.load_named_config(config_path, DEFAULT_CONFIG) return make_app(cfg, **kwargs) @click.command() @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5005, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): app = make_app(config.read(config_path, DEFAULT_CONFIG), debug=bool(debug)) aiohttp.web.run_app(app, host=host, port=int(port)) if __name__ == '__main__': launch() diff --git a/swh/vault/backend.py b/swh/vault/backend.py index 90018fe..4af962a 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,383 +1,384 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import smtplib import psycopg2 import psycopg2.extras from functools import wraps from email.mime.text import MIMEText from swh.model import hashutil from swh.scheduler.backend import SchedulerBackend from swh.scheduler.utils import create_oneshot_task_dict from swh.vault.cache import VaultCache from swh.vault.cookers import get_cooker from swh.vault.cooking_tasks import SWHCookingTask # noqa cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' NOTIF_EMAIL_FROM = ('"Software Heritage Vault" ' '') NOTIF_EMAIL_SUBJECT_SUCCESS = ("Bundle ready: {obj_type} {short_id}") NOTIF_EMAIL_SUBJECT_FAILURE = ("Bundle failed: {obj_type} {short_id}") NOTIF_EMAIL_BODY_SUCCESS = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ NOTIF_EMAIL_BODY_FAILURE = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle could not be cooked for the following reason: {progress_msg} We apologize for the inconvenience. --\x20 The Software Heritage Developers """ class NotFoundExc(Exception): """Bundle was not found.""" pass # TODO: Imported from swh.scheduler.backend. Factorization needed. def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False # TODO: I don't like using None, it's confusing for the user. how about # a NEW_CURSOR object()? if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped # TODO: This has to be factorized with other database base classes and helpers # (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) # The three first methods are imported from swh.scheduler.backend. class VaultBackend: """ Backend for the Software Heritage vault. """ def __init__(self, config): self.config = config self.cache = VaultCache(self.config['cache']) self.db = None self.reconnect() self.smtp_server = smtplib.SMTP('localhost', 25) if self.config['scheduling_db'] is not None: self.scheduler = SchedulerBackend( scheduling_db=self.config['scheduling_db']) def reconnect(self): """Reconnect to the database.""" if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['db'], cursor_factory=psycopg2.extras.RealDictCursor, ) def close(self): """Close the underlying database connection.""" self.db.close() def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() @autocommit def task_info(self, obj_type, obj_id, cursor=None): """Fetch information from a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' SELECT id, type, object_id, task_id, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) res = cursor.fetchone() if res: res['object_id'] = bytes(res['object_id']) return res def _send_task(self, args): """Send a cooking task to the celery scheduler""" task = create_oneshot_task_dict('swh-vault-cooking', *args) added_tasks = self.scheduler.create_tasks([task]) return added_tasks[0]['id'] @autocommit def create_task(self, obj_type, obj_id, sticky=False, cursor=None): """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) hex_id = hashutil.hash_to_hex(obj_id) args = [obj_type, hex_id] backend_storage_config = {'storage': self.config['storage']} cooker_class = get_cooker(obj_type) cooker = cooker_class(*args, override_cfg=backend_storage_config) if not cooker.check_exists(): raise NotFoundExc("Object {} was not found.".format(hex_id)) cursor.execute(''' INSERT INTO vault_bundle (type, object_id, sticky) VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky)) self.commit() task_id = self._send_task(args) cursor.execute(''' UPDATE vault_bundle SET task_id = %s WHERE type = %s AND object_id = %s''', (task_id, obj_type, obj_id)) @autocommit def add_notif_email(self, obj_type, obj_id, email, cursor=None): """Add an e-mail address to notify when a given bundle is ready""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))''', (email, obj_type, obj_id)) @autocommit def cook_request(self, obj_type, obj_id, *, sticky=False, email=None, cursor=None): """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" + obj_id = hashutil.hash_to_bytes(obj_id) info = self.task_info(obj_type, obj_id) # If there's a failed bundle entry, delete it first. if info is not None and info['task_status'] == 'failed': cursor.execute('''DELETE FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) self.commit() info = None # If there's no bundle entry, create the task. if info is None: self.create_task(obj_type, obj_id, sticky) if email is not None: # If the task is already done, send the email directly if info is not None and info['task_status'] == 'done': self.send_notification(None, email, obj_type, obj_id) # Else, add it to the notification queue else: self.add_notif_email(obj_type, obj_id, email) info = self.task_info(obj_type, obj_id) return info @autocommit def is_available(self, obj_type, obj_id, cursor=None): """Check whether a bundle is available for retrieval""" info = self.task_info(obj_type, obj_id, cursor=cursor) return (info is not None and info['task_status'] == 'done' and self.cache.is_cached(obj_type, obj_id)) @autocommit def fetch(self, obj_type, obj_id, cursor=None): """Retrieve a bundle from the cache""" if not self.is_available(obj_type, obj_id, cursor=cursor): return None self.update_access_ts(obj_type, obj_id, cursor=cursor) return self.cache.get(obj_type, obj_id) @autocommit def update_access_ts(self, obj_type, obj_id, cursor=None): """Update the last access timestamp of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) @autocommit def set_status(self, obj_type, obj_id, status, cursor=None): """Set the cooking status of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) req = (''' UPDATE vault_bundle SET task_status = %s ''' + (''', ts_done = NOW() ''' if status == 'done' else '') + '''WHERE type = %s AND object_id = %s''') cursor.execute(req, (status, obj_type, obj_id)) @autocommit def set_progress(self, obj_type, obj_id, progress, cursor=None): """Set the cooking progress of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s''', (progress, obj_type, obj_id)) @autocommit def send_all_notifications(self, obj_type, obj_id, cursor=None): """Send all the e-mails in the notification list of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' SELECT vault_notif_email.id AS id, email, task_status, progress_msg FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', (obj_type, obj_id)) for d in cursor: self.send_notification(d['id'], d['email'], obj_type, obj_id, status=d['task_status'], progress_msg=d['progress_msg']) @autocommit def send_notification(self, n_id, email, obj_type, obj_id, status, progress_msg=None, cursor=None): """Send the notification of a bundle to a specific e-mail""" hex_id = hashutil.hash_to_hex(obj_id) short_id = hex_id[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = ('https://archive.softwareheritage.org/api/1/vault/{}/{}/' 'raw'.format(obj_type, hex_id)) if status == 'done': text = NOTIF_EMAIL_BODY_SUCCESS.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) msg = MIMEText(text) msg['Subject'] = (NOTIF_EMAIL_SUBJECT_SUCCESS .format(obj_type=obj_type, short_id=short_id)) elif status == 'failed': text = NOTIF_EMAIL_BODY_FAILURE.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, progress_msg=progress_msg) msg = MIMEText(text) msg['Subject'] = (NOTIF_EMAIL_SUBJECT_FAILURE .format(obj_type=obj_type, short_id=short_id)) else: raise RuntimeError("send_notification called on a '{}' bundle" .format(status)) msg['From'] = NOTIF_EMAIL_FROM msg['To'] = email self._smtp_send(msg) if n_id is not None: cursor.execute(''' DELETE FROM vault_notif_email WHERE id = %s''', (n_id,)) def _smtp_send(self, msg): # Reconnect if needed try: status = self.smtp_server.noop()[0] except: # smtplib.SMTPServerDisconnected status = -1 if status != 250: self.smtp_server.connect() # Send the message self.smtp_server.send_message(msg) @autocommit def _cache_expire(self, cond, *args, cursor=None): """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cursor.execute(''' DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, object_id '''.format(cond), args) for d in cursor: self.cache.delete(d['type'], bytes(d['object_id'])) @autocommit def cache_expire_oldest(self, n=1, by='last_access', cursor=None): """Expire the `n` oldest bundles""" assert by in ('created', 'done', 'last_access') filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n) return self._cache_expire(filter) @autocommit def cache_expire_until(self, date, by='last_access', cursor=None): """Expire all the bundles until a certain date""" assert by in ('created', 'done', 'last_access') filter = '''AND ts_{} <= %s'''.format(by) return self._cache_expire(filter, date) diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index 65fdb19..ecf5277 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,131 +1,131 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import logging from swh.core import config from swh.model import hashutil from swh.storage import get_storage from swh.vault.api.client import RemoteVaultClient DEFAULT_CONFIG_PATH = 'vault/cooker' DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/', }, }), 'vault_url': ('str', 'http://localhost:5005/'), 'max_bundle_size': ('int', 2 ** 29), # 512 MiB } class PolicyError(Exception): """Raised when the bundle violates the cooking policy.""" pass class BundleTooLargeError(PolicyError): """Raised when the bundle is too large to be cooked.""" pass class BytesIOBundleSizeLimit(io.BytesIO): def __init__(self, *args, size_limit=None, **kwargs): super().__init__(*args, **kwargs) self.size_limit = size_limit def write(self, chunk): if ((self.size_limit is not None and self.getbuffer().nbytes + len(chunk) > self.size_limit)): raise BundleTooLargeError( "The requested bundle exceeds the maximum allowed " "size of {} bytes.".format(self.size_limit)) return super().write(chunk) class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle """ CACHE_TYPE_KEY = None def __init__(self, obj_type, obj_id, *, override_cfg=None): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: storage: the storage object cache: the cache where to store the bundle obj_id: id of the object to be cooked into a bundle. """ self.config = config.load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG) if override_cfg is not None: self.config.update(override_cfg) self.obj_type = obj_type self.obj_id = hashutil.hash_to_bytes(obj_id) self.backend = RemoteVaultClient(self.config['vault_url']) self.storage = get_storage(**self.config['storage']) self.max_bundle_size = self.config['max_bundle_size'] @abc.abstractmethod def check_exists(self): """Checks that the requested object exists and can be cooked. Override this in the cooker implementation. """ raise NotImplemented @abc.abstractmethod def prepare_bundle(self): """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplemented def write(self, chunk): self.fileobj.write(chunk) def cook(self): """Cook the requested object into a bundle """ self.backend.set_status(self.obj_type, self.obj_id, 'pending') self.backend.set_progress(self.obj_type, self.obj_id, 'Processing...') self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) try: self.prepare_bundle() bundle = self.fileobj.getvalue() + # TODO: use proper content streaming instead of put_bundle() + self.backend.put_bundle(self.CACHE_TYPE_KEY, self.obj_id, bundle) except PolicyError as e: self.backend.set_status(self.obj_type, self.obj_id, 'failed') self.backend.set_progress(self.obj_type, self.obj_id, str(e)) except Exception as e: self.backend.set_status(self.obj_type, self.obj_id, 'failed') self.backend.set_progress( self.obj_type, self.obj_id, "Internal Server Error. This incident will be reported.") logging.exception("Bundle cooking failed.") else: - # TODO: use proper content streaming instead of put_bundle() - self.backend.put_bundle(self.CACHE_TYPE_KEY, self.obj_id, bundle) self.backend.set_status(self.obj_type, self.obj_id, 'done') self.backend.set_progress(self.obj_type, self.obj_id, None) finally: self.backend.send_notif(self.obj_type, self.obj_id) diff --git a/swh/vault/cookers/directory.py b/swh/vault/cookers/directory.py index 70d9da0..ebcd346 100644 --- a/swh/vault/cookers/directory.py +++ b/swh/vault/cookers/directory.py @@ -1,27 +1,27 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tarfile import tempfile from swh.model import hashutil from swh.vault.cookers.base import BaseVaultCooker from swh.vault.to_disk import DirectoryBuilder class DirectoryCooker(BaseVaultCooker): """Cooker to create a directory bundle """ CACHE_TYPE_KEY = 'directory' def check_exists(self): return not list(self.storage.directory_missing([self.obj_id])) def prepare_bundle(self): with tempfile.TemporaryDirectory(prefix='tmp-vault-directory-') as td: directory_builder = DirectoryBuilder( self.storage, td.encode(), self.obj_id) directory_builder.build() - tar = tarfile.open(fileobj=self.fileobj, mode='w') - tar.add(td, arcname=hashutil.hash_to_hex(self.obj_id)) + with tarfile.open(fileobj=self.fileobj, mode='w:gz') as tar: + tar.add(td, arcname=hashutil.hash_to_hex(self.obj_id)) diff --git a/swh/vault/cookers/revision_flat.py b/swh/vault/cookers/revision_flat.py index 0fc8579..242c998 100644 --- a/swh/vault/cookers/revision_flat.py +++ b/swh/vault/cookers/revision_flat.py @@ -1,32 +1,32 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tarfile import tempfile from pathlib import Path from swh.model import hashutil from swh.vault.cookers.base import BaseVaultCooker from swh.vault.to_disk import DirectoryBuilder class RevisionFlatCooker(BaseVaultCooker): """Cooker to create a revision_flat bundle """ CACHE_TYPE_KEY = 'revision_flat' def check_exists(self): return not list(self.storage.revision_missing([self.obj_id])) def prepare_bundle(self): with tempfile.TemporaryDirectory(prefix='tmp-vault-revision-') as td: root = Path(td) for revision in self.storage.revision_log([self.obj_id]): revdir = root / hashutil.hash_to_hex(revision['id']) revdir.mkdir() directory_builder = DirectoryBuilder( self.storage, str(revdir).encode(), revision['directory']) directory_builder.build() - tar = tarfile.open(fileobj=self.fileobj, mode='w') - tar.add(td, arcname=hashutil.hash_to_hex(self.obj_id)) + with tarfile.open(fileobj=self.fileobj, mode='w:gz') as tar: + tar.add(td, arcname=hashutil.hash_to_hex(self.obj_id)) diff --git a/version.txt b/version.txt index 7f2b1d3..0d09eb3 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.10-0-g3c5d89a \ No newline at end of file +v0.0.11-0-g2de4eb0 \ No newline at end of file