diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py --- a/swh/vault/api/client.py +++ b/swh/vault/api/client.py @@ -14,16 +14,41 @@ def __init__(self, base_url): super().__init__(api_exception=StorageAPIError, url=base_url) + # Web API endpoints + def fetch(self, obj_type, obj_id): - return self.get('fetch/{}/{}'.format(obj_type, - hashutil.hash_to_hex(obj_id))) + hex_id = hashutil.hash_to_hex(obj_id) + return self.get('fetch/{}/{}'.format(obj_type, hex_id)) def cook(self, obj_type, obj_id, email=None): - return self.post('cook/{}/{}'.format(obj_type, - hashutil.hash_to_hex(obj_id)), + hex_id = hashutil.hash_to_hex(obj_id) + return self.post('cook/{}/{}'.format(obj_type, hex_id), data={}, params=({'email': email} if email else None)) def progress(self, obj_type, obj_id): - return self.get('progress/{}/{}'.format(obj_type, - hashutil.hash_to_hex(obj_id))) + hex_id = hashutil.hash_to_hex(obj_id) + return self.get('progress/{}/{}'.format(obj_type, hex_id)) + + # Cookers endpoints + + def set_progress(self, obj_type, obj_id, progress): + hex_id = hashutil.hash_to_hex(obj_id) + return self.post('set_progress/{}/{}'.format(obj_type, hex_id), + data=progress) + + def set_status(self, obj_type, obj_id, status): + hex_id = hashutil.hash_to_hex(obj_id) + return self.post('set_status/{}/{}' .format(obj_type, hex_id), + data=status) + + # TODO: handle streaming properly + def put_bundle(self, obj_type, obj_id, bundle): + hex_id = hashutil.hash_to_hex(obj_id) + return self.post('put_bundle/{}/{}' .format(obj_type, hex_id), + data=bundle) + + def send_notif(self, obj_type, obj_id): + hex_id = hashutil.hash_to_hex(obj_id) + return self.post('send_notif/{}/{}' .format(obj_type, hex_id), + data=None) diff --git a/swh/vault/api/server.py b/swh/vault/api/server.py --- a/swh/vault/api/server.py +++ b/swh/vault/api/server.py @@ -9,7 +9,8 @@ from swh.core import config from swh.core.api_async import (SWHRemoteAPI, - encode_data_server as encode_data) + encode_data_server as encode_data, + decode_request) from swh.model import hashutil from swh.vault.cookers import COOKER_TYPES from swh.vault.backend import VaultBackend @@ -21,8 +22,11 @@ 'args': { 'db': 'dbname=softwareheritage-dev', 'objstorage': { - 'root': '/srv/softwareheritage/objects', - 'slicing': '0:2/2:4/4:6', + 'cls': 'pathslicing', + 'args': { + 'root': '/srv/softwareheritage/objects', + 'slicing': '0:2/2:4/4:6', + }, }, }, }), @@ -42,6 +46,8 @@ return aiohttp.web.Response(body="SWH Vault API server") +# Web API endpoints + @asyncio.coroutine def vault_fetch(request): obj_type = request.match_info['type'] @@ -66,13 +72,16 @@ obj_type = request.match_info['type'] obj_id = request.match_info['id'] email = request.query.get('email') + sticky = request.query.get('sticky') in ('true', '1') if obj_type not in COOKER_TYPES: raise aiohttp.web.HTTPNotFound - info = request.app['backend'].cook_request(obj_type, obj_id, email) + info = request.app['backend'].cook_request(obj_type, obj_id, + email=email, sticky=sticky) - return encode_data(user_info(info), status=201) + # TODO: return 201 status (Created) once the api supports it + return encode_data(user_info(info)) @asyncio.coroutine @@ -87,12 +96,62 @@ return encode_data(user_info(info)) +# Cookers endpoints + +@asyncio.coroutine +def set_progress(request): + obj_type = request.match_info['type'] + obj_id = request.match_info['id'] + progress = yield from decode_request(request) + request.app['backend'].set_progress(obj_type, obj_id, progress) + return encode_data(True) # FIXME: success value? + + +@asyncio.coroutine +def set_status(request): + obj_type = request.match_info['type'] + obj_id = request.match_info['id'] + status = yield from decode_request(request) + request.app['backend'].set_status(obj_type, obj_id, status) + return encode_data(True) # FIXME: success value? + + +@asyncio.coroutine +def put_bundle(request): + obj_type = request.match_info['type'] + obj_id = request.match_info['id'] + + # TODO: handle streaming properly + content = yield from decode_request(request) + request.app['backend'].cache.add(obj_type, obj_id, content) + return encode_data(True) # FIXME: success value? + + +@asyncio.coroutine +def send_notif(request): + obj_type = request.match_info['type'] + obj_id = request.match_info['id'] + request.app['backend'].send_all_notifications(obj_type, obj_id) + return encode_data(True) # FIXME: success value? + + +# Web server + def make_app(config, **kwargs): app = SWHRemoteAPI(**kwargs) app.router.add_route('GET', '/', index) + + # Endpoints used by the web API app.router.add_route('GET', '/fetch/{type}/{id}', vault_fetch) app.router.add_route('POST', '/cook/{type}/{id}', vault_cook) app.router.add_route('GET', '/progress/{type}/{id}', vault_progress) + + # Endpoints used by the Cookers + app.router.add_route('POST', '/set_progress/{type}/{id}', set_progress) + app.router.add_route('POST', '/set_status/{type}/{id}', set_status) + app.router.add_route('POST', '/put_bundle/{type}/{id}', put_bundle) + app.router.add_route('POST', '/send_notif/{type}/{id}', send_notif) + app['backend'] = VaultBackend(config) return app diff --git a/swh/vault/backend.py b/swh/vault/backend.py --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -145,7 +145,7 @@ def create_task(self, obj_type, obj_id, sticky=False, cursor=None): """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) - args = [self.config, obj_type, obj_id] + args = [obj_type, obj_id] cooker_class = get_cooker(obj_type) cooker = cooker_class(*args) cooker.check_exists() diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -6,15 +6,28 @@ import abc import io import itertools -import logging import os import tarfile import tempfile from pathlib import Path +from swh.core import config from swh.model import hashutil +from swh.model.from_disk import mode_to_perms, DentryPerms from swh.storage import get_storage +from swh.vault.api.client import RemoteVaultClient + + +DEFAULT_CONFIG = { + 'storage': ('dict', { + 'cls': 'remote', + 'args': { + 'url': 'http://localhost:5002/', + }, + }), + 'vault_url': ('str', 'http://localhost:5005/') +} class BaseVaultCooker(metaclass=abc.ABCMeta): @@ -28,7 +41,7 @@ """ CACHE_TYPE_KEY = None - def __init__(self, config, obj_type, obj_id): + def __init__(self, obj_type, obj_id): """Initialize the cooker. The type of the object represented by the id depends on the @@ -40,19 +53,11 @@ cache: the cache where to store the bundle obj_id: id of the object to be cooked into a bundle. """ - self.config = config + self.config = config.load_named_config('vault-cooker', DEFAULT_CONFIG) self.obj_type = obj_type self.obj_id = hashutil.hash_to_bytes(obj_id) - - def __enter__(self): - # Imported here to avoid circular dependency - from swh.vault.backend import VaultBackend - self.backend = VaultBackend(self.config) + self.backend = RemoteVaultClient(self.config['vault_url']) self.storage = get_storage(**self.config['storage']) - return self - - def __exit__(self, *_): - self.backend.close() @abc.abstractmethod def check_exists(self): @@ -77,21 +82,13 @@ self.backend.set_progress(self.obj_type, self.obj_id, 'Processing...') content_iter = self.prepare_bundle() - self.update_cache(content_iter) + # TODO: use proper content streaming + bundle = b''.join(content_iter) + self.backend.put_bundle(self.CACHE_TYPE_KEY, self.obj_id, bundle) + self.backend.set_status(self.obj_type, self.obj_id, 'done') self.backend.set_progress(self.obj_type, self.obj_id, None) - - self.notify_bundle_ready() - - def update_cache(self, content_iter): - """Update the cache with id and bundle_content. - - """ - self.backend.cache.add_stream(self.CACHE_TYPE_KEY, - self.obj_id, content_iter) - - def notify_bundle_ready(self): - self.backend.send_all_notifications(self.obj_type, self.obj_id) + self.backend.send_notif(self.obj_type, self.obj_id) SKIPPED_MESSAGE = (b'This content has not been retrieved in the ' diff --git a/swh/vault/cookers/revision_gitfast.py b/swh/vault/cookers/revision_gitfast.py --- a/swh/vault/cookers/revision_gitfast.py +++ b/swh/vault/cookers/revision_gitfast.py @@ -43,21 +43,18 @@ last_progress_report = None - # We want a single transaction for the whole export, so we store a - # cursor and use it during the process. - with self.storage.db.transaction() as self.cursor: - for i, rev in enumerate(self.rev_sorted, 1): - # Update progress if needed - ct = time.time() - if (last_progress_report is None - or last_progress_report + 2 <= ct): - last_progress_report = ct - pg = ('Computing revision {}/{}' - .format(i, len(self.rev_sorted))) - self.backend.set_progress(self.obj_type, self.obj_id, pg) - - # Compute the current commit - yield from self._compute_commit_command(rev) + for i, rev in enumerate(self.rev_sorted, 1): + # Update progress if needed + ct = time.time() + if (last_progress_report is None + or last_progress_report + 2 <= ct): + last_progress_report = ct + pg = ('Computing revision {}/{}' + .format(i, len(self.rev_sorted))) + self.backend.set_progress(self.obj_type, self.obj_id, pg) + + # Compute the current commit + yield from self._compute_commit_command(rev) def _toposort(self, rev_by_id): """Perform a topological sort on the revision graph. @@ -156,7 +153,7 @@ This function has a cache to avoid doing multiple requests to retrieve the same entities, as doing a directory_ls() is expensive. """ - data = (self.storage.directory_ls(dir_id, cur=self.cursor) + data = (self.storage.directory_ls(dir_id) if dir_id is not None else []) return {f['name']: f for f in data} diff --git a/swh/vault/cooking_tasks.py b/swh/vault/cooking_tasks.py --- a/swh/vault/cooking_tasks.py +++ b/swh/vault/cooking_tasks.py @@ -12,6 +12,6 @@ task_queue = 'swh_vault_cooking' - def run_task(self, config, obj_type, obj_id): - with get_cooker(obj_type)(config, obj_type, obj_id) as cooker: - cooker.cook() + def run_task(self, obj_type, obj_id): + cooker = get_cooker(obj_type)(obj_type, obj_id) + cooker.cook() diff --git a/swh/vault/tests/test_backend.py b/swh/vault/tests/test_backend.py --- a/swh/vault/tests/test_backend.py +++ b/swh/vault/tests/test_backend.py @@ -62,17 +62,15 @@ m['get_cooker'].assert_called_once_with(TEST_TYPE) args = m['cooker_cls'].call_args[0] - self.assertEqual(args[0], self.vault_backend.config) - self.assertEqual(args[1], TEST_TYPE) - self.assertEqual(args[2], TEST_OBJ_ID) + self.assertEqual(args[0], TEST_TYPE) + self.assertEqual(args[1], TEST_OBJ_ID) self.assertEqual(m['cooker'].check_exists.call_count, 1) self.assertEqual(m['send_task'].call_count, 1) args = m['send_task'].call_args[0][1] - self.assertEqual(args[0], self.vault_backend.config) - self.assertEqual(args[1], TEST_TYPE) - self.assertEqual(args[2], TEST_OBJ_ID) + self.assertEqual(args[0], TEST_TYPE) + self.assertEqual(args[1], TEST_OBJ_ID) info = self.vault_backend.task_info(TEST_TYPE, TEST_OBJ_ID) self.assertEqual(info['object_id'], TEST_OBJ_ID) diff --git a/swh/vault/tests/test_cookers.py b/swh/vault/tests/test_cookers.py --- a/swh/vault/tests/test_cookers.py +++ b/swh/vault/tests/test_cookers.py @@ -13,6 +13,7 @@ import tarfile import tempfile import unittest +import unittest.mock import dulwich.fastexport import dulwich.index @@ -87,10 +88,11 @@ @contextlib.contextmanager def cook_extract_directory(self, obj_id): """Context manager that cooks a directory and extract it.""" - cooker = DirectoryCooker(self.vault_config, 'directory', obj_id) - with cooker: - cooker.check_exists() # Raises if false - tarball = b''.join(cooker.prepare_bundle()) + cooker = DirectoryCooker('directory', obj_id) + cooker.storage = self.storage + cooker.backend = unittest.mock.MagicMock() + cooker.check_exists() # Raises if false + tarball = b''.join(cooker.prepare_bundle()) with tempfile.TemporaryDirectory('tmp-vault-extract-') as td: fobj = io.BytesIO(tarball) with tarfile.open(fileobj=fobj, mode='r') as tar: @@ -101,11 +103,11 @@ @contextlib.contextmanager def cook_extract_revision_gitfast(self, obj_id): """Context manager that cooks a revision and extract it.""" - cooker = RevisionGitfastCooker(self.vault_config, 'revision_gitfast', - obj_id) - with cooker: - cooker.check_exists() # Raises if false - fastexport = b''.join(cooker.prepare_bundle()) + cooker = RevisionGitfastCooker('revision_gitfast', obj_id) + cooker.storage = self.storage + cooker.backend = unittest.mock.MagicMock() + cooker.check_exists() # Raises if false + fastexport = b''.join(cooker.prepare_bundle()) fastexport_stream = gzip.GzipFile(fileobj=io.BytesIO(fastexport)) test_repo = TestRepo() with test_repo as p: