diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py index fdf393d..1cbe705 100644 --- a/swh/vault/api/client.py +++ b/swh/vault/api/client.py @@ -1,29 +1,54 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.model import hashutil from swh.core.api import SWHRemoteAPI from swh.storage.exc import StorageAPIError class RemoteVaultClient(SWHRemoteAPI): """Client to the Software Heritage vault cache.""" def __init__(self, base_url): super().__init__(api_exception=StorageAPIError, url=base_url) + # Web API endpoints + def fetch(self, obj_type, obj_id): - return self.get('fetch/{}/{}'.format(obj_type, - hashutil.hash_to_hex(obj_id))) + hex_id = hashutil.hash_to_hex(obj_id) + return self.get('fetch/{}/{}'.format(obj_type, hex_id)) def cook(self, obj_type, obj_id, email=None): - return self.post('cook/{}/{}'.format(obj_type, - hashutil.hash_to_hex(obj_id)), + hex_id = hashutil.hash_to_hex(obj_id) + return self.post('cook/{}/{}'.format(obj_type, hex_id), data={}, params=({'email': email} if email else None)) def progress(self, obj_type, obj_id): - return self.get('progress/{}/{}'.format(obj_type, - hashutil.hash_to_hex(obj_id))) + hex_id = hashutil.hash_to_hex(obj_id) + return self.get('progress/{}/{}'.format(obj_type, hex_id)) + + # Cookers endpoints + + def set_progress(self, obj_type, obj_id, progress): + hex_id = hashutil.hash_to_hex(obj_id) + return self.post('set_progress/{}/{}'.format(obj_type, hex_id), + data=progress) + + def set_status(self, obj_type, obj_id, status): + hex_id = hashutil.hash_to_hex(obj_id) + return self.post('set_status/{}/{}' .format(obj_type, hex_id), + data=status) + + # TODO: handle streaming properly + def put_bundle(self, obj_type, obj_id, bundle): + hex_id = hashutil.hash_to_hex(obj_id) + return self.post('put_bundle/{}/{}' .format(obj_type, hex_id), + data=bundle) + + def send_notif(self, obj_type, obj_id): + hex_id = hashutil.hash_to_hex(obj_id) + return self.post('send_notif/{}/{}' .format(obj_type, hex_id), + data=None) diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py index 300d17a..88b7079 100644 --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -1,113 +1,172 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import aiohttp.web import click from swh.core import config from swh.core.api_async import (SWHRemoteAPI, - encode_data_server as encode_data) + encode_data_server as encode_data, + decode_request) from swh.model import hashutil from swh.vault.cookers import COOKER_TYPES from swh.vault.backend import VaultBackend DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'local', 'args': { 'db': 'dbname=softwareheritage-dev', 'objstorage': { - 'root': '/srv/softwareheritage/objects', - 'slicing': '0:2/2:4/4:6', + 'cls': 'pathslicing', + 'args': { + 'root': '/srv/softwareheritage/objects', + 'slicing': '0:2/2:4/4:6', + }, }, }, }), 'cache': ('dict', { 'cls': 'pathslicing', 'args': { 'root': '/srv/softwareheritage/vault', 'slicing': '0:1/1:5', }, }), 'db': ('str', 'dbname=swh-vault') } @asyncio.coroutine def index(request): return aiohttp.web.Response(body="SWH Vault API server") +# Web API endpoints + @asyncio.coroutine def vault_fetch(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] if not request.app['backend'].is_available(obj_type, obj_id): raise aiohttp.web.HTTPNotFound return encode_data(request.app['backend'].fetch(obj_type, obj_id)) def user_info(task_info): return {'task_uuid': str(task_info['task_uuid']), 'status': task_info['task_status'], 'progress_message': task_info['progress_msg'], 'obj_type': task_info['type'], 'obj_id': hashutil.hash_to_hex(task_info['object_id'])} @asyncio.coroutine def vault_cook(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] email = request.query.get('email') + sticky = request.query.get('sticky') in ('true', '1') if obj_type not in COOKER_TYPES: raise aiohttp.web.HTTPNotFound - info = request.app['backend'].cook_request(obj_type, obj_id, email) + info = request.app['backend'].cook_request(obj_type, obj_id, + email=email, sticky=sticky) - return encode_data(user_info(info), status=201) + # TODO: return 201 status (Created) once the api supports it + return encode_data(user_info(info)) @asyncio.coroutine def vault_progress(request): obj_type = request.match_info['type'] obj_id = request.match_info['id'] info = request.app['backend'].task_info(obj_type, obj_id) if not info: raise aiohttp.web.HTTPNotFound return encode_data(user_info(info)) +# Cookers endpoints + +@asyncio.coroutine +def set_progress(request): + obj_type = request.match_info['type'] + obj_id = request.match_info['id'] + progress = yield from decode_request(request) + request.app['backend'].set_progress(obj_type, obj_id, progress) + return encode_data(True) # FIXME: success value? + + +@asyncio.coroutine +def set_status(request): + obj_type = request.match_info['type'] + obj_id = request.match_info['id'] + status = yield from decode_request(request) + request.app['backend'].set_status(obj_type, obj_id, status) + return encode_data(True) # FIXME: success value? + + +@asyncio.coroutine +def put_bundle(request): + obj_type = request.match_info['type'] + obj_id = request.match_info['id'] + + # TODO: handle streaming properly + content = yield from decode_request(request) + request.app['backend'].cache.add(obj_type, obj_id, content) + return encode_data(True) # FIXME: success value? + + +@asyncio.coroutine +def send_notif(request): + obj_type = request.match_info['type'] + obj_id = request.match_info['id'] + request.app['backend'].send_all_notifications(obj_type, obj_id) + return encode_data(True) # FIXME: success value? + + +# Web server + def make_app(config, **kwargs): app = SWHRemoteAPI(**kwargs) app.router.add_route('GET', '/', index) + + # Endpoints used by the web API app.router.add_route('GET', '/fetch/{type}/{id}', vault_fetch) app.router.add_route('POST', '/cook/{type}/{id}', vault_cook) app.router.add_route('GET', '/progress/{type}/{id}', vault_progress) + + # Endpoints used by the Cookers + app.router.add_route('POST', '/set_progress/{type}/{id}', set_progress) + app.router.add_route('POST', '/set_status/{type}/{id}', set_status) + app.router.add_route('POST', '/put_bundle/{type}/{id}', put_bundle) + app.router.add_route('POST', '/send_notif/{type}/{id}', send_notif) + app['backend'] = VaultBackend(config) return app @click.command() @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5005, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): app = make_app(config.read(config_path, DEFAULT_CONFIG), debug=bool(debug)) aiohttp.web.run_app(app, host=host, port=int(port)) if __name__ == '__main__': launch() diff --git a/swh/vault/backend.py b/swh/vault/backend.py index f306075..c01e0ca 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,308 +1,308 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import smtplib import celery import psycopg2 import psycopg2.extras from functools import wraps from email.mime.text import MIMEText from swh.model import hashutil from swh.scheduler.utils import get_task from swh.vault.cache import VaultCache from swh.vault.cookers import get_cooker from swh.vault.cooking_tasks import SWHCookingTask # noqa cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' NOTIF_EMAIL_FROM = ('"Software Heritage Vault" ' '') NOTIF_EMAIL_SUBJECT = ("Bundle ready: {obj_type} {short_id}") NOTIF_EMAIL_BODY = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ # TODO: Imported from swh.scheduler.backend. Factorization needed. def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False # TODO: I don't like using None, it's confusing for the user. how about # a NEW_CURSOR object()? if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped # TODO: This has to be factorized with other database base classes and helpers # (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) # The three first methods are imported from swh.scheduler.backend. class VaultBackend: """ Backend for the Software Heritage vault. """ def __init__(self, config): self.config = config self.cache = VaultCache(self.config['cache']) self.db = None self.reconnect() self.smtp_server = smtplib.SMTP('localhost') def reconnect(self): """Reconnect to the database.""" if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['db'], cursor_factory=psycopg2.extras.RealDictCursor, ) def close(self): """Close the underlying database connection.""" self.db.close() def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() @autocommit def task_info(self, obj_type, obj_id, cursor=None): """Fetch information from a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' SELECT id, type, object_id, task_uuid, task_status, sticky, ts_created, ts_done, ts_last_access, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) res = cursor.fetchone() if res: res['object_id'] = bytes(res['object_id']) return res @staticmethod def _send_task(task_uuid, args): """Send a cooking task to the celery scheduler""" task = get_task(cooking_task_name) task.apply_async(args, task_id=task_uuid) @autocommit def create_task(self, obj_type, obj_id, sticky=False, cursor=None): """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) - args = [self.config, obj_type, obj_id] + args = [obj_type, obj_id] cooker_class = get_cooker(obj_type) cooker = cooker_class(*args) cooker.check_exists() task_uuid = celery.uuid() cursor.execute(''' INSERT INTO vault_bundle (type, object_id, task_uuid, sticky) VALUES (%s, %s, %s, %s)''', (obj_type, obj_id, task_uuid, sticky)) self.commit() self._send_task(task_uuid, args) @autocommit def add_notif_email(self, obj_type, obj_id, email, cursor=None): """Add an e-mail address to notify when a given bundle is ready""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))''', (email, obj_type, obj_id)) @autocommit def cook_request(self, obj_type, obj_id, *, sticky=False, email=None, cursor=None): """Main entry point for cooking requests. This starts a cooking task if needed, and add the given e-mail to the notify list""" info = self.task_info(obj_type, obj_id) if info is None: self.create_task(obj_type, obj_id, sticky) if email is not None: if info is not None and info['task_status'] == 'done': self.send_notification(None, email, obj_type, obj_id) else: self.add_notif_email(obj_type, obj_id, email) info = self.task_info(obj_type, obj_id) return info @autocommit def is_available(self, obj_type, obj_id, cursor=None): """Check whether a bundle is available for retrieval""" info = self.task_info(obj_type, obj_id, cursor=cursor) return (info is not None and info['task_status'] == 'done' and self.cache.is_cached(obj_type, obj_id)) @autocommit def fetch(self, obj_type, obj_id, cursor=None): """Retrieve a bundle from the cache""" if not self.is_available(obj_type, obj_id, cursor=cursor): return None self.update_access_ts(obj_type, obj_id, cursor=cursor) return self.cache.get(obj_type, obj_id) @autocommit def update_access_ts(self, obj_type, obj_id, cursor=None): """Update the last access timestamp of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) @autocommit def set_status(self, obj_type, obj_id, status, cursor=None): """Set the cooking status of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) req = (''' UPDATE vault_bundle SET task_status = %s ''' + (''', ts_done = NOW() ''' if status == 'done' else '') + '''WHERE type = %s AND object_id = %s''') cursor.execute(req, (status, obj_type, obj_id)) @autocommit def set_progress(self, obj_type, obj_id, progress, cursor=None): """Set the cooking progress of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s''', (progress, obj_type, obj_id)) @autocommit def send_all_notifications(self, obj_type, obj_id, cursor=None): """Send all the e-mails in the notification list of a bundle""" obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' SELECT vault_notif_email.id AS id, email FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', (obj_type, obj_id)) for d in cursor: self.send_notification(d['id'], d['email'], obj_type, obj_id) @autocommit def send_notification(self, n_id, email, obj_type, obj_id, cursor=None): """Send the notification of a bundle to a specific e-mail""" hex_id = hashutil.hash_to_hex(obj_id) short_id = hex_id[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = ('https://archive.softwareheritage.org/api/1/vault/{}/{}/' 'raw'.format(obj_type, hex_id)) text = NOTIF_EMAIL_BODY.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) msg = MIMEText(text) msg['Subject'] = (NOTIF_EMAIL_SUBJECT .format(obj_type=obj_type, short_id=short_id)) msg['From'] = NOTIF_EMAIL_FROM msg['To'] = email self.smtp_server.send_message(msg) if n_id is not None: cursor.execute(''' DELETE FROM vault_notif_email WHERE id = %s''', (n_id,)) @autocommit def _cache_expire(self, cond, *args, cursor=None): """Low-level expiration method, used by cache_expire_* methods""" # Embedded SELECT query to be able to use ORDER BY and LIMIT cursor.execute(''' DELETE FROM vault_bundle WHERE ctid IN ( SELECT ctid FROM vault_bundle WHERE sticky = false {} ) RETURNING type, object_id '''.format(cond), args) for d in cursor: self.cache.delete(d['type'], bytes(d['object_id'])) @autocommit def cache_expire_oldest(self, n=1, by='last_access', cursor=None): """Expire the `n` oldest bundles""" assert by in ('created', 'done', 'last_access') filter = '''ORDER BY ts_{} LIMIT {}'''.format(by, n) return self._cache_expire(filter) @autocommit def cache_expire_until(self, date, by='last_access', cursor=None): """Expire all the bundles until a certain date""" assert by in ('created', 'done', 'last_access') filter = '''AND ts_{} <= %s'''.format(by) return self._cache_expire(filter, date) diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index f83c9bb..19de810 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,234 +1,231 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import itertools -import logging import os import tarfile import tempfile from pathlib import Path +from swh.core import config from swh.model import hashutil +from swh.model.from_disk import mode_to_perms, DentryPerms from swh.storage import get_storage +from swh.vault.api.client import RemoteVaultClient + + +DEFAULT_CONFIG = { + 'storage': ('dict', { + 'cls': 'remote', + 'args': { + 'url': 'http://localhost:5002/', + }, + }), + 'vault_url': ('str', 'http://localhost:5005/') +} class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle """ CACHE_TYPE_KEY = None - def __init__(self, config, obj_type, obj_id): + def __init__(self, obj_type, obj_id): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: storage: the storage object cache: the cache where to store the bundle obj_id: id of the object to be cooked into a bundle. """ - self.config = config + self.config = config.load_named_config('vault-cooker', DEFAULT_CONFIG) self.obj_type = obj_type self.obj_id = hashutil.hash_to_bytes(obj_id) - - def __enter__(self): - # Imported here to avoid circular dependency - from swh.vault.backend import VaultBackend - self.backend = VaultBackend(self.config) + self.backend = RemoteVaultClient(self.config['vault_url']) self.storage = get_storage(**self.config['storage']) - return self - - def __exit__(self, *_): - self.backend.close() @abc.abstractmethod def check_exists(self): """Checks that the requested object exists and can be cooked. Override this in the cooker implementation. """ raise NotImplemented @abc.abstractmethod def prepare_bundle(self): """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplemented def cook(self): """Cook the requested object into a bundle """ self.backend.set_status(self.obj_type, self.obj_id, 'pending') self.backend.set_progress(self.obj_type, self.obj_id, 'Processing...') content_iter = self.prepare_bundle() - self.update_cache(content_iter) + # TODO: use proper content streaming + bundle = b''.join(content_iter) + self.backend.put_bundle(self.CACHE_TYPE_KEY, self.obj_id, bundle) + self.backend.set_status(self.obj_type, self.obj_id, 'done') self.backend.set_progress(self.obj_type, self.obj_id, None) - - self.notify_bundle_ready() - - def update_cache(self, content_iter): - """Update the cache with id and bundle_content. - - """ - self.backend.cache.add_stream(self.CACHE_TYPE_KEY, - self.obj_id, content_iter) - - def notify_bundle_ready(self): - self.backend.send_all_notifications(self.obj_type, self.obj_id) + self.backend.send_notif(self.obj_type, self.obj_id) SKIPPED_MESSAGE = (b'This content has not been retrieved in the ' b'Software Heritage archive due to its size.') HIDDEN_MESSAGE = (b'This content is hidden.') def get_filtered_file_content(storage, file_data): """Retrieve the file specified by file_data and apply filters for skipped and missing contents. Args: storage: the storage from which to retrieve the object file_data: file entry descriptor as returned by directory_ls() Returns: Bytes containing the specified content. The content will be replaced by a specific message to indicate that the content could not be retrieved (either due to privacy policy or because its size was too big for us to archive it). """ assert file_data['type'] == 'file' if file_data['status'] == 'absent': return SKIPPED_MESSAGE elif file_data['status'] == 'hidden': return HIDDEN_MESSAGE else: return list(storage.content_get([file_data['sha1']]))[0]['data'] def get_tar_bytes(path, arcname=None): path = Path(path) if not arcname: arcname = path.name tar_buffer = io.BytesIO() tar = tarfile.open(fileobj=tar_buffer, mode='w') tar.add(str(path), arcname=arcname) return tar_buffer.getbuffer() class DirectoryBuilder: """Creates a cooked directory from its sha1_git in the db. Warning: This is NOT a directly accessible cooker, but a low-level one that executes the manipulations. """ def __init__(self, storage): self.storage = storage def get_directory_bytes(self, dir_id): # Create temporary folder to retrieve the files into. root = bytes(tempfile.mkdtemp(prefix='directory.', suffix='.cook'), 'utf8') self.build_directory(dir_id, root) # Use the created directory to make a bundle with the data as # a compressed directory. bundle_content = self._create_bundle_content( root, hashutil.hash_to_hex(dir_id)) return bundle_content def build_directory(self, dir_id, root): # Retrieve data from the database. data = self.storage.directory_ls(dir_id, recursive=True) # Split into files and directory data. # TODO(seirl): also handle revision data. data1, data2 = itertools.tee(data, 2) dir_data = (entry['name'] for entry in data1 if entry['type'] == 'dir') file_data = (entry for entry in data2 if entry['type'] == 'file') # Recreate the directory's subtree and then the files into it. self._create_tree(root, dir_data) self._create_files(root, file_data) def _create_tree(self, root, directory_paths): """Create a directory tree from the given paths The tree is created from `root` and each given path in `directory_paths` will be created. """ # Directories are sorted by depth so they are created in the # right order bsep = bytes(os.path.sep, 'utf8') dir_names = sorted( directory_paths, key=lambda x: len(x.split(bsep))) for dir_name in dir_names: os.makedirs(os.path.join(root, dir_name)) def _create_files(self, root, file_datas): """Create the files according to their status. """ # Then create the files for file_data in file_datas: path = os.path.join(root, file_data['name']) content = get_filtered_file_content(self.storage, file_data) self._create_file(path, content, file_data['perms']) def _create_file(self, path, content, perms=0o100644): """Create the given file and fill it with content. """ if perms not in (0o100644, 0o100755, 0o120000): logging.warning('File {} has invalid permission {}, ' 'defaulting to 644.'.format(path, perms)) perms = 0o100644 if perms == 0o120000: # Symbolic link os.symlink(content, path) else: with open(path, 'wb') as f: f.write(content) os.chmod(path, perms & 0o777) def _get_file_content(self, obj_id): """Get the content of the given file. """ content = list(self.storage.content_get([obj_id]))[0]['data'] return content def _create_bundle_content(self, path, hex_dir_id): """Create a bundle from the given directory Args: path: location of the directory to package. hex_dir_id: hex representation of the directory id Returns: bytes that represent the compressed directory as a bundle. """ return get_tar_bytes(path.decode(), hex_dir_id) diff --git a/swh/vault/cookers/revision_gitfast.py b/swh/vault/cookers/revision_gitfast.py index e25c865..da3e615 100644 --- a/swh/vault/cookers/revision_gitfast.py +++ b/swh/vault/cookers/revision_gitfast.py @@ -1,225 +1,222 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections import fastimport.commands import functools import os import time import zlib from .base import BaseVaultCooker, get_filtered_file_content from swh.model import hashutil class RevisionGitfastCooker(BaseVaultCooker): """Cooker to create a git fast-import bundle """ CACHE_TYPE_KEY = 'revision_gitfast' def check_exists(self): if list(self.storage.revision_missing([self.obj_id])): raise ValueError("Revision {} not found." .format(hashutil.hash_to_hex(self.obj_id))) def prepare_bundle(self): log = self.storage.revision_log([self.obj_id]) commands = self.fastexport(log) compressobj = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) for command in commands: yield compressobj.compress(bytes(command) + b'\n') yield compressobj.flush() def fastexport(self, log): """Generate all the git fast-import commands from a given log. """ self.rev_by_id = {r['id']: r for r in log} self.rev_sorted = list(self._toposort(self.rev_by_id)) self.obj_done = set() self.obj_to_mark = {} self.next_available_mark = 1 last_progress_report = None - # We want a single transaction for the whole export, so we store a - # cursor and use it during the process. - with self.storage.db.transaction() as self.cursor: - for i, rev in enumerate(self.rev_sorted, 1): - # Update progress if needed - ct = time.time() - if (last_progress_report is None - or last_progress_report + 2 <= ct): - last_progress_report = ct - pg = ('Computing revision {}/{}' - .format(i, len(self.rev_sorted))) - self.backend.set_progress(self.obj_type, self.obj_id, pg) - - # Compute the current commit - yield from self._compute_commit_command(rev) + for i, rev in enumerate(self.rev_sorted, 1): + # Update progress if needed + ct = time.time() + if (last_progress_report is None + or last_progress_report + 2 <= ct): + last_progress_report = ct + pg = ('Computing revision {}/{}' + .format(i, len(self.rev_sorted))) + self.backend.set_progress(self.obj_type, self.obj_id, pg) + + # Compute the current commit + yield from self._compute_commit_command(rev) def _toposort(self, rev_by_id): """Perform a topological sort on the revision graph. """ children = collections.defaultdict(list) # rev -> children in_degree = {} # rev -> numbers of parents left to compute # Compute the in_degrees and the parents of all the revisions. # Add the roots to the processing queue. queue = collections.deque() for rev_id, rev in rev_by_id.items(): in_degree[rev_id] = len(rev['parents']) if not rev['parents']: queue.append(rev_id) for parent in rev['parents']: children[parent].append(rev_id) # Topological sort: yield the 'ready' nodes, decrease the in degree of # their children and add the 'ready' ones to the queue. while queue: rev_id = queue.popleft() yield rev_by_id[rev_id] for child in children[rev_id]: in_degree[child] -= 1 if in_degree[child] == 0: queue.append(child) def mark(self, obj_id): """Get the mark ID as bytes of a git object. If the object has not yet been marked, assign a new ID and add it to the mark dictionary. """ if obj_id not in self.obj_to_mark: self.obj_to_mark[obj_id] = self.next_available_mark self.next_available_mark += 1 return str(self.obj_to_mark[obj_id]).encode() def _compute_blob_command_content(self, file_data): """Compute the blob command of a file entry if it has not been computed yet. """ obj_id = file_data['sha1'] if obj_id in self.obj_done: return content = get_filtered_file_content(self.storage, file_data) yield fastimport.commands.BlobCommand( mark=self.mark(obj_id), data=content, ) self.obj_done.add(obj_id) def _compute_commit_command(self, rev): """Compute a commit command from a specific revision. """ if 'parents' in rev and rev['parents']: from_ = b':' + self.mark(rev['parents'][0]) merges = [b':' + self.mark(r) for r in rev['parents'][1:]] parent = self.rev_by_id[rev['parents'][0]] else: # We issue a reset command before all the new roots so that they # are not automatically added as children of the current branch. yield fastimport.commands.ResetCommand(b'refs/heads/master', None) from_ = None merges = None parent = None # Retrieve the file commands while yielding new blob commands if # needed. files = yield from self._compute_file_commands(rev, parent) # Construct and yield the commit command author = (rev['author']['name'], rev['author']['email'], rev['date']['timestamp']['seconds'], rev['date']['offset'] * 60) committer = (rev['committer']['name'], rev['committer']['email'], rev['committer_date']['timestamp']['seconds'], rev['committer_date']['offset'] * 60) yield fastimport.commands.CommitCommand( ref=b'refs/heads/master', mark=self.mark(rev['id']), author=author, committer=committer, message=rev['message'], from_=from_, merges=merges, file_iter=files, ) @functools.lru_cache(maxsize=4096) def _get_dir_ents(self, dir_id=None): """Get the entities of a directory as a dictionary (name -> entity). This function has a cache to avoid doing multiple requests to retrieve the same entities, as doing a directory_ls() is expensive. """ - data = (self.storage.directory_ls(dir_id, cur=self.cursor) + data = (self.storage.directory_ls(dir_id) if dir_id is not None else []) return {f['name']: f for f in data} def _compute_file_commands(self, rev, parent=None): """Compute all the file commands of a revision. Generate a diff of the files between the revision and its main parent to find the necessary file commands to apply. """ commands = [] # Initialize the stack with the root of the tree. cur_dir = rev['directory'] parent_dir = parent['directory'] if parent else None stack = [(b'', cur_dir, parent_dir)] while stack: # Retrieve the current directory and the directory of the parent # commit in order to compute the diff of the trees. root, cur_dir_id, prev_dir_id = stack.pop() cur_dir = self._get_dir_ents(cur_dir_id) prev_dir = self._get_dir_ents(prev_dir_id) # Find subtrees to delete: # - Subtrees that are not in the new tree (file or directory # deleted). # - Subtrees that do not have the same type in the new tree # (file -> directory or directory -> file) # After this step, every node remaining in the previous directory # has the same type than the one in the current directory. for fname, f in prev_dir.items(): if ((fname not in cur_dir or f['type'] != cur_dir[fname]['type'])): commands.append(fastimport.commands.FileDeleteCommand( path=os.path.join(root, fname) )) # Find subtrees to modify: # - Leaves (files) will be added or modified using `filemodify` # - Other subtrees (directories) will be added to the stack and # processed in the next iteration. for fname, f in cur_dir.items(): # A file is added or modified if it was not in the tree, if its # permissions changed or if its content changed. if (f['type'] == 'file' and (fname not in prev_dir or f['sha1'] != prev_dir[fname]['sha1'] or f['perms'] != prev_dir[fname]['perms'])): # Issue a blob command for the new blobs if needed. yield from self._compute_blob_command_content(f) commands.append(fastimport.commands.FileModifyCommand( path=os.path.join(root, fname), mode=f['perms'], dataref=(b':' + self.mark(f['sha1'])), data=None, )) # A directory is added or modified if it was not in the tree or # if its target changed. elif f['type'] == 'dir': f_prev_target = None if fname in prev_dir and prev_dir[fname]['type'] == 'dir': f_prev_target = prev_dir[fname]['target'] if f_prev_target is None or f['target'] != f_prev_target: stack.append((os.path.join(root, fname), f['target'], f_prev_target)) return commands diff --git a/swh/vault/cooking_tasks.py b/swh/vault/cooking_tasks.py index 33b0cec..cdca9bd 100644 --- a/swh/vault/cooking_tasks.py +++ b/swh/vault/cooking_tasks.py @@ -1,17 +1,17 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task from swh.vault.cookers import get_cooker class SWHCookingTask(Task): """Main task to cook a bundle.""" task_queue = 'swh_vault_cooking' - def run_task(self, config, obj_type, obj_id): - with get_cooker(obj_type)(config, obj_type, obj_id) as cooker: - cooker.cook() + def run_task(self, obj_type, obj_id): + cooker = get_cooker(obj_type)(obj_type, obj_id) + cooker.cook() diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py index d8c1882..4d8a7d7 100644 --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -1,286 +1,284 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime import psycopg2 import unittest from unittest.mock import patch from swh.core.tests.db_testing import DbTestFixture from swh.model import hashutil from swh.storage.tests.storage_testing import StorageTestFixture from swh.vault.tests.vault_testing import VaultTestFixture, hash_content class BaseTestBackend(VaultTestFixture, StorageTestFixture, DbTestFixture): @contextlib.contextmanager def mock_cooking(self): with patch.object(self.vault_backend, '_send_task') as mt: with patch('swh.vault.backend.get_cooker') as mg: mcc = unittest.mock.MagicMock() mc = unittest.mock.MagicMock() mg.return_value = mcc mcc.return_value = mc mc.check_exists.return_value = True yield {'send_task': mt, 'get_cooker': mg, 'cooker_cls': mcc, 'cooker': mc} def assertTimestampAlmostNow(self, ts, tolerance_secs=1.0): # noqa now = datetime.datetime.now(datetime.timezone.utc) creation_delta_secs = (ts - now).total_seconds() self.assertLess(creation_delta_secs, tolerance_secs) def fake_cook(self, obj_type, result_content, sticky=False): content, obj_id = hash_content(result_content) with self.mock_cooking(): self.vault_backend.create_task(obj_type, obj_id, sticky) self.vault_backend.cache.add(obj_type, obj_id, b'content') self.vault_backend.set_status(obj_type, obj_id, 'done') return obj_id, content TEST_TYPE = 'revision_gitfast' TEST_HEX_ID = '4a4b9771542143cf070386f86b4b92d42966bdbc' TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) TEST_PROGRESS = ("Mr. White, You're telling me you're cooking again?" " \N{ASTONISHED FACE} ") TEST_EMAIL = 'ouiche@example.com' class TestBackend(BaseTestBackend, unittest.TestCase): def test_create_task_simple(self): with self.mock_cooking() as m: self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) m['get_cooker'].assert_called_once_with(TEST_TYPE) args = m['cooker_cls'].call_args[0] - self.assertEqual(args[0], self.vault_backend.config) - self.assertEqual(args[1], TEST_TYPE) - self.assertEqual(args[2], TEST_OBJ_ID) + self.assertEqual(args[0], TEST_TYPE) + self.assertEqual(args[1], TEST_OBJ_ID) self.assertEqual(m['cooker'].check_exists.call_count, 1) self.assertEqual(m['send_task'].call_count, 1) args = m['send_task'].call_args[0][1] - self.assertEqual(args[0], self.vault_backend.config) - self.assertEqual(args[1], TEST_TYPE) - self.assertEqual(args[2], TEST_OBJ_ID) + self.assertEqual(args[0], TEST_TYPE) + self.assertEqual(args[1], TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['object_id'], TEST_OBJ_ID) self.assertEqual(info['type'], TEST_TYPE) self.assertEqual(str(info['task_uuid']), m['send_task'].call_args[0][0]) self.assertEqual(info['task_status'], 'new') self.assertTimestampAlmostNow(info['ts_created']) self.assertEqual(info['ts_done'], None) self.assertEqual(info['progress_msg'], None) def test_create_fail_duplicate_task(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) with self.assertRaises(psycopg2.IntegrityError): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) def test_create_fail_nonexisting_object(self): with self.mock_cooking() as m: m['cooker'].check_exists.side_effect = ValueError('Nothing here.') with self.assertRaises(ValueError): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) def test_create_set_progress(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['progress_msg'], None) self.vault_backend.set_progress(TEST_TYPE, TEST_OBJ_ID, TEST_PROGRESS) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['progress_msg'], TEST_PROGRESS) def test_create_set_status(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['task_status'], 'new') self.assertEqual(info['ts_done'], None) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'pending') info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['task_status'], 'pending') self.assertEqual(info['ts_done'], None) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['task_status'], 'done') self.assertTimestampAlmostNow(info['ts_done']) def test_create_update_access_ts(self): with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_1 = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_1) self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_2 = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_2) self.vault_backend.update_access_ts(TEST_TYPE, TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) access_ts_3 = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_3) self.assertLess(access_ts_1, access_ts_2) self.assertLess(access_ts_2, access_ts_3) def test_cook_request_idempotent(self): with self.mock_cooking(): info1 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) info2 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) info3 = self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info1, info2) self.assertEqual(info1, info3) def test_cook_email_pending_done(self): with self.mock_cooking(), \ patch.object(self.vault_backend, 'add_notif_email') as madd, \ patch.object(self.vault_backend, 'send_notification') as msend: self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID) madd.assert_not_called() msend.assert_not_called() madd.reset_mock() msend.reset_mock() self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) madd.assert_called_once_with(TEST_TYPE, TEST_OBJ_ID, TEST_EMAIL) msend.assert_not_called() madd.reset_mock() msend.reset_mock() self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email=TEST_EMAIL) msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_OBJ_ID) madd.assert_not_called() def test_send_all_emails(self): with self.mock_cooking(): emails = ('a@example.com', 'billg@example.com', 'test+42@example.org') for email in emails: self.vault_backend.cook_request(TEST_TYPE, TEST_OBJ_ID, email=email) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') with patch.object(self.vault_backend, 'smtp_server') as m: self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) sent_emails = {k[0][0] for k in m.send_message.call_args_list} self.assertEqual({k['To'] for k in sent_emails}, set(emails)) for e in sent_emails: self.assertIn('info@softwareheritage.org', e['From']) self.assertIn(TEST_TYPE, e['Subject']) self.assertIn(TEST_HEX_ID[:5], e['Subject']) self.assertIn(TEST_TYPE, str(e)) self.assertIn('https://archive.softwareheritage.org/', str(e)) self.assertIn(TEST_HEX_ID[:5], str(e)) self.assertIn('--\x20\n', str(e)) # Well-formated signature!!! # Check that the entries have been deleted and recalling the # function does not re-send the e-mails m.reset_mock() self.vault_backend.send_all_notifications(TEST_TYPE, TEST_OBJ_ID) m.assert_not_called() def test_available(self): self.assertFalse(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) with self.mock_cooking(): self.vault_backend.create_task(TEST_TYPE, TEST_OBJ_ID) self.assertFalse(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) self.vault_backend.cache.add(TEST_TYPE, TEST_OBJ_ID, b'content') self.assertFalse(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) self.vault_backend.set_status(TEST_TYPE, TEST_OBJ_ID, 'done') self.assertTrue(self.vault_backend.is_available(TEST_TYPE, TEST_OBJ_ID)) def test_fetch(self): self.assertEqual(self.vault_backend.fetch(TEST_TYPE, TEST_OBJ_ID), None) obj_id, content = self.fake_cook(TEST_TYPE, b'content') info = self.vault_backend.task_info(TEST_TYPE, obj_id) access_ts_before = info['ts_last_access'] self.assertEqual(self.vault_backend.fetch(TEST_TYPE, obj_id), b'content') info = self.vault_backend.task_info(TEST_TYPE, obj_id) access_ts_after = info['ts_last_access'] self.assertTimestampAlmostNow(access_ts_after) self.assertLess(access_ts_before, access_ts_after) def test_cache_expire_oldest(self): r = range(1, 10) inserted = {} for i in r: sticky = (i == 5) content = b'content%s' % str(i).encode() obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) self.vault_backend.cache_expire_oldest(n=4) should_be_still_here = {2, 3, 5, 8, 9} for i in r: self.assertEqual(self.vault_backend.is_available( TEST_TYPE, inserted[i][0]), i in should_be_still_here) def test_cache_expire_until(self): r = range(1, 10) inserted = {} for i in r: sticky = (i == 5) content = b'content%s' % str(i).encode() obj_id, content = self.fake_cook(TEST_TYPE, content, sticky) inserted[i] = (obj_id, content) if i == 7: cutoff_date = datetime.datetime.now() self.vault_backend.update_access_ts(TEST_TYPE, inserted[2][0]) self.vault_backend.update_access_ts(TEST_TYPE, inserted[3][0]) self.vault_backend.cache_expire_until(date=cutoff_date) should_be_still_here = {2, 3, 5, 8, 9} for i in r: self.assertEqual(self.vault_backend.is_available( TEST_TYPE, inserted[i][0]), i in should_be_still_here) diff --git a/swh/vault/tests/test_cookers.py b/swh/vault/tests/test_cookers.py index aaa604d..1e14dd7 100644 --- a/swh/vault/tests/test_cookers.py +++ b/swh/vault/tests/test_cookers.py @@ -1,339 +1,341 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import datetime import gzip import io import os import pathlib import subprocess import tarfile import tempfile import unittest +import unittest.mock import dulwich.fastexport import dulwich.index import dulwich.objects import dulwich.porcelain import dulwich.repo from swh.core.tests.db_testing import DbTestFixture from swh.loader.git.loader import GitLoader from swh.model import hashutil from swh.model.from_disk import Directory from swh.storage.tests.storage_testing import StorageTestFixture from swh.vault.cookers import DirectoryCooker, RevisionGitfastCooker from swh.vault.cookers.base import SKIPPED_MESSAGE, HIDDEN_MESSAGE from swh.vault.tests.vault_testing import VaultTestFixture, hash_content class TestRepo: """A tiny context manager for a test git repository, with some utility functions to perform basic git stuff. """ def __enter__(self): self.tmp_dir = tempfile.TemporaryDirectory(prefix='tmp-vault-repo-') self.repo_dir = self.tmp_dir.__enter__() self.repo = dulwich.repo.Repo.init(self.repo_dir) self.author = '"Test Author" '.encode() return pathlib.Path(self.repo_dir) def __exit__(self, exc, value, tb): self.tmp_dir.__exit__(exc, value, tb) def checkout(self, rev_sha): rev = self.repo[rev_sha] dulwich.index.build_index_from_tree(self.repo_dir, self.repo.index_path(), self.repo.object_store, rev.tree) def git_shell(self, *cmd, stdout=subprocess.DEVNULL, **kwargs): subprocess.check_call(('git', '-C', self.repo_dir) + cmd, stdout=stdout, **kwargs) def commit(self, message='Commit test\n', ref=b'HEAD'): self.git_shell('add', '.') message = message.encode() + b'\n' return self.repo.do_commit(message=message, committer=self.author, ref=ref) def merge(self, parent_sha_list, message='Merge branches.'): self.git_shell('merge', '--allow-unrelated-histories', '-m', message, *[p.decode() for p in parent_sha_list]) return self.repo.refs[b'HEAD'] def print_debug_graph(self, reflog=False): args = ['log', '--all', '--graph', '--decorate'] if reflog: args.append('--reflog') self.git_shell(*args, stdout=None) class BaseTestCookers(VaultTestFixture, StorageTestFixture, DbTestFixture): """Base class of cookers unit tests""" def setUp(self): super().setUp() self.loader = GitLoader() self.loader.storage = self.storage def load(self, repo_path): """Load a repository in the test storage""" self.loader.load('fake_origin', repo_path, datetime.datetime.now()) @contextlib.contextmanager def cook_extract_directory(self, obj_id): """Context manager that cooks a directory and extract it.""" - cooker = DirectoryCooker(self.vault_config, 'directory', obj_id) - with cooker: - cooker.check_exists() # Raises if false - tarball = b''.join(cooker.prepare_bundle()) + cooker = DirectoryCooker('directory', obj_id) + cooker.storage = self.storage + cooker.backend = unittest.mock.MagicMock() + cooker.check_exists() # Raises if false + tarball = b''.join(cooker.prepare_bundle()) with tempfile.TemporaryDirectory('tmp-vault-extract-') as td: fobj = io.BytesIO(tarball) with tarfile.open(fileobj=fobj, mode='r') as tar: tar.extractall(td) p = pathlib.Path(td) / hashutil.hash_to_hex(obj_id) yield p @contextlib.contextmanager def cook_extract_revision_gitfast(self, obj_id): """Context manager that cooks a revision and extract it.""" - cooker = RevisionGitfastCooker(self.vault_config, 'revision_gitfast', - obj_id) - with cooker: - cooker.check_exists() # Raises if false - fastexport = b''.join(cooker.prepare_bundle()) + cooker = RevisionGitfastCooker('revision_gitfast', obj_id) + cooker.storage = self.storage + cooker.backend = unittest.mock.MagicMock() + cooker.check_exists() # Raises if false + fastexport = b''.join(cooker.prepare_bundle()) fastexport_stream = gzip.GzipFile(fileobj=io.BytesIO(fastexport)) test_repo = TestRepo() with test_repo as p: processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) processor.import_stream(fastexport_stream) yield test_repo, p TEST_CONTENT = (" test content\n" "and unicode \N{BLACK HEART SUIT}\n" " and trailing spaces ") TEST_EXECUTABLE = b'\x42\x40\x00\x00\x05' class TestDirectoryCooker(BaseTestCookers, unittest.TestCase): def test_directory_simple(self): repo = TestRepo() with repo as rp: (rp / 'file').write_text(TEST_CONTENT) (rp / 'executable').write_bytes(TEST_EXECUTABLE) (rp / 'executable').chmod(0o755) (rp / 'link').symlink_to('file') (rp / 'dir1/dir2').mkdir(parents=True) (rp / 'dir1/dir2/file').write_text(TEST_CONTENT) c = repo.commit() self.load(str(rp)) obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with self.cook_extract_directory(obj_id) as p: self.assertEqual((p / 'file').stat().st_mode, 0o100644) self.assertEqual((p / 'file').read_text(), TEST_CONTENT) self.assertEqual((p / 'executable').stat().st_mode, 0o100755) self.assertEqual((p / 'executable').read_bytes(), TEST_EXECUTABLE) self.assertTrue((p / 'link').is_symlink) self.assertEqual(os.readlink(str(p / 'link')), 'file') self.assertEqual((p / 'dir1/dir2/file').stat().st_mode, 0o100644) self.assertEqual((p / 'dir1/dir2/file').read_text(), TEST_CONTENT) directory = Directory.from_disk(path=bytes(p)) self.assertEqual(obj_id_hex, hashutil.hash_to_hex(directory.hash)) def test_filtered_objects(self): repo = TestRepo() with repo as rp: file_1, id_1 = hash_content(b'test1') file_2, id_2 = hash_content(b'test2') file_3, id_3 = hash_content(b'test3') (rp / 'file').write_bytes(file_1) (rp / 'hidden_file').write_bytes(file_2) (rp / 'absent_file').write_bytes(file_3) c = repo.commit() self.load(str(rp)) obj_id_hex = repo.repo[c].tree.decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) # FIXME: storage.content_update() should be changed to allow things # like that cur = self.storage.db._cursor(None) cur.execute("""update content set status = 'visible' where sha1 = %s""", (id_1,)) cur.execute("""update content set status = 'hidden' where sha1 = %s""", (id_2,)) cur.execute("""update content set status = 'absent' where sha1 = %s""", (id_3,)) cur.close() with self.cook_extract_directory(obj_id) as p: self.assertEqual((p / 'file').read_bytes(), b'test1') self.assertEqual((p / 'hidden_file').read_bytes(), HIDDEN_MESSAGE) self.assertEqual((p / 'absent_file').read_bytes(), SKIPPED_MESSAGE) class TestRevisionGitfastCooker(BaseTestCookers, unittest.TestCase): def test_revision_simple(self): # # 1--2--3--4--5--6--7 # repo = TestRepo() with repo as rp: (rp / 'file1').write_text(TEST_CONTENT) repo.commit('add file1') (rp / 'file2').write_text(TEST_CONTENT) repo.commit('add file2') (rp / 'dir1/dir2').mkdir(parents=True) (rp / 'dir1/dir2/file').write_text(TEST_CONTENT) repo.commit('add dir1/dir2/file') (rp / 'bin1').write_bytes(TEST_EXECUTABLE) (rp / 'bin1').chmod(0o755) repo.commit('add bin1') (rp / 'link1').symlink_to('file1') repo.commit('link link1 to file1') (rp / 'file2').unlink() repo.commit('remove file2') (rp / 'bin1').rename(rp / 'bin') repo.commit('rename bin1 to bin') self.load(str(rp)) obj_id_hex = repo.repo.refs[b'HEAD'].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) with self.cook_extract_revision_gitfast(obj_id) as (ert, p): ert.checkout(b'HEAD') self.assertEqual((p / 'file1').stat().st_mode, 0o100644) self.assertEqual((p / 'file1').read_text(), TEST_CONTENT) self.assertTrue((p / 'link1').is_symlink) self.assertEqual(os.readlink(str(p / 'link1')), 'file1') self.assertEqual((p / 'bin').stat().st_mode, 0o100755) self.assertEqual((p / 'bin').read_bytes(), TEST_EXECUTABLE) self.assertEqual((p / 'dir1/dir2/file').read_text(), TEST_CONTENT) self.assertEqual((p / 'dir1/dir2/file').stat().st_mode, 0o100644) self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex) def test_revision_two_roots(self): # # 1----3---4 # / # 2---- # repo = TestRepo() with repo as rp: (rp / 'file1').write_text(TEST_CONTENT) c1 = repo.commit('Add file1') del repo.repo.refs[b'refs/heads/master'] # git update-ref -d HEAD (rp / 'file2').write_text(TEST_CONTENT) repo.commit('Add file2') repo.merge([c1]) (rp / 'file3').write_text(TEST_CONTENT) repo.commit('add file3') obj_id_hex = repo.repo.refs[b'HEAD'].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) self.load(str(rp)) with self.cook_extract_revision_gitfast(obj_id) as (ert, p): self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex) def test_revision_two_double_fork_merge(self): # # 2---4---6 # / / / # 1---3---5 # repo = TestRepo() with repo as rp: (rp / 'file1').write_text(TEST_CONTENT) c1 = repo.commit('Add file1') repo.repo.refs[b'refs/heads/c1'] = c1 (rp / 'file2').write_text(TEST_CONTENT) repo.commit('Add file2') (rp / 'file3').write_text(TEST_CONTENT) c3 = repo.commit('Add file3', ref=b'refs/heads/c1') repo.repo.refs[b'refs/heads/c3'] = c3 repo.merge([c3]) (rp / 'file5').write_text(TEST_CONTENT) c5 = repo.commit('Add file3', ref=b'refs/heads/c3') repo.merge([c5]) obj_id_hex = repo.repo.refs[b'HEAD'].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) self.load(str(rp)) with self.cook_extract_revision_gitfast(obj_id) as (ert, p): self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex) def test_revision_triple_merge(self): # # .---.---5 # / / / # 2 3 4 # / / / # 1---.---. # repo = TestRepo() with repo as rp: (rp / 'file1').write_text(TEST_CONTENT) c1 = repo.commit('Commit 1') repo.repo.refs[b'refs/heads/b1'] = c1 repo.repo.refs[b'refs/heads/b2'] = c1 repo.commit('Commit 2') c3 = repo.commit('Commit 3', ref=b'refs/heads/b1') c4 = repo.commit('Commit 4', ref=b'refs/heads/b2') repo.merge([c3, c4]) obj_id_hex = repo.repo.refs[b'HEAD'].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) self.load(str(rp)) with self.cook_extract_revision_gitfast(obj_id) as (ert, p): self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex) def test_filtered_objects(self): repo = TestRepo() with repo as rp: file_1, id_1 = hash_content(b'test1') file_2, id_2 = hash_content(b'test2') file_3, id_3 = hash_content(b'test3') (rp / 'file').write_bytes(file_1) (rp / 'hidden_file').write_bytes(file_2) (rp / 'absent_file').write_bytes(file_3) repo.commit() obj_id_hex = repo.repo.refs[b'HEAD'].decode() obj_id = hashutil.hash_to_bytes(obj_id_hex) self.load(str(rp)) # FIXME: storage.content_update() should be changed to allow things # like that cur = self.storage.db._cursor(None) cur.execute("""update content set status = 'visible' where sha1 = %s""", (id_1,)) cur.execute("""update content set status = 'hidden' where sha1 = %s""", (id_2,)) cur.execute("""update content set status = 'absent' where sha1 = %s""", (id_3,)) cur.close() with self.cook_extract_revision_gitfast(obj_id) as (ert, p): ert.checkout(b'HEAD') self.assertEqual((p / 'file').read_bytes(), b'test1') self.assertEqual((p / 'hidden_file').read_bytes(), HIDDEN_MESSAGE) self.assertEqual((p / 'absent_file').read_bytes(), SKIPPED_MESSAGE)