diff --git a/swh/storage/vault/api/cooking_tasks.py b/swh/storage/vault/api/cooking_tasks.py --- a/swh/storage/vault/api/cooking_tasks.py +++ b/swh/storage/vault/api/cooking_tasks.py @@ -10,15 +10,23 @@ from ... import get_storage +COOKER_TYPES = { + 'directory': DirectoryVaultCooker +} + + class SWHCookingTask(Task): """ Main task that archive a batch of content. """ task_queue = 'swh_storage_vault_cooking' - def run(self, hex_dir_id, storage_args, cache_args): + def run(self, type, hex_dir_id, storage_args, cache_args): + # Initialize elements storage = get_storage(**storage_args) cache = VaultCache(**cache_args) - directory_cooker = DirectoryVaultCooker(storage, cache) - + # Initialize cooker + vault_cooker_class = COOKER_TYPES[type] + cooker = vault_cooker_class(storage, cache) + # Perform the cooking dir_id = hashutil.hex_to_hash(hex_dir_id) - directory_cooker.cook(dir_id) + cooker.cook(dir_id) diff --git a/swh/storage/vault/api/server.py b/swh/storage/vault/api/server.py --- a/swh/storage/vault/api/server.py +++ b/swh/storage/vault/api/server.py @@ -5,6 +5,7 @@ import click from flask import Flask, abort, g +from werkzeug.routing import BaseConverter from swh.core import config from swh.objstorage.api.common import encode_data_server as encode_data from swh.objstorage.api.common import BytesRequest, error_handler @@ -13,7 +14,6 @@ from swh.storage.vault.cache import VaultCache from swh.storage.vault.cooker import DirectoryVaultCooker from swh.scheduler.celery_backend.config import app as celery_app -from flask_profile import Profiler cooking_task_name = 'swh.storage.vault.api.cooking_tasks.SWHCookingTask' @@ -30,9 +30,15 @@ } +class RegexConverter(BaseConverter): + def __init__(self, url_map, *items): + super().__init__(url_map) + self.regex = items[0] + + app = Flask(__name__) -Profiler(app) app.request_class = BytesRequest +app.url_map.converters['regex'] = RegexConverter @app.errorhandler(Exception) @@ -54,26 +60,29 @@ return 'SWH vault API server' -@app.route('/vault/directory/', methods=['GET']) -def ls_directory(): +@app.route('/vault//', + methods=['GET']) +def ls_directory(type): return encode_data(list( - g.cache.ls('directory') + g.cache.ls(type) )) -@app.route('/vault/directory//', methods=['GET']) -def get_cooked_directory(dir_id): - if not g.cache.is_cached('directory', dir_id): +@app.route('/vault///', + methods=['GET']) +def get_cooked_directory(type, id): + if not g.cache.is_cached(type, id): abort(404) - return encode_data(g.cache.get('directory', dir_id).decode()) + return encode_data(g.cache.get(type, id).decode()) -@app.route('/vault/directory//', methods=['POST']) -def cook_request_directory(dir_id): +@app.route('/vault///', + methods=['POST']) +def cook_request_directory(type, id): task = celery_app.tasks[cooking_task_name] - task.delay(dir_id, app.config['storage'], app.config['cache']) + task.delay(type, id, app.config['storage'], app.config['cache']) # Return url to get the content and 201 CREATED - return encode_data('/vault/directory/dir_id/'), 201 + return encode_data('/vault/%s/%s/' % (type, id)), 201 @click.command() diff --git a/swh/storage/vault/cache.py b/swh/storage/vault/cache.py --- a/swh/storage/vault/cache.py +++ b/swh/storage/vault/cache.py @@ -11,6 +11,8 @@ BUNDLE_TYPES = { 'directory': 'd', + 'revision': 'r', + 'snapshot': 's', } @@ -38,7 +40,10 @@ } def __contains__(self, obj_id): - return obj_id in self.storage + for storage in self.storages: + if obj_id in storage: + return True + return False def add(self, obj_type, obj_id, content): storage = self._get_storage(obj_type) diff --git a/swh/storage/vault/cooker.py b/swh/storage/vault/cooker.py --- a/swh/storage/vault/cooker.py +++ b/swh/storage/vault/cooker.py @@ -11,6 +11,7 @@ import itertools from swh.core import hashutil + SKIPPED_MESSAGE = (b'This content have not been retrieved in ' b'Software Heritage archive due to its size') @@ -24,6 +25,10 @@ This class describes a common API for the cookers. """ + def __init__(self, storage, cache): + self.storage = storage + self.cache = cache + @abc.abstractmethod def cook(self, obj_id): """Cook the requested object into a bundle @@ -39,6 +44,16 @@ raise NotImplementedError( 'Vault cookers must implement a `cook` method') + @abc.abstractmethod + def update_cache(self, id, bundle_content): + raise NotImplementedError('Vault cookers must implement a ' + '`update_cache` method') + + @abc.abstractmethod + def notify_bundle_ready(self, notif_data, bundle_id): + raise NotImplementedError( + 'Vault cookers must implement a `notify_bundle_ready` method') + class DirectoryVaultCooker(BaseVaultCooker): """Cooker to create a directory bundle """ @@ -64,27 +79,57 @@ bytes that correspond to the bundle """ - root = bytes(tempfile.mkdtemp(prefix='directory.', suffix='.cook'), - 'utf8') - # Retrieve data from the database + # Create the bytes that corresponds to the compressed + # directory. + directory_cooker = DirectoryCooker(self.storage) + bundle_content = directory_cooker.get_directory_bytes(dir_id) + # Cache the bundle + self._cache_bundle(dir_id, bundle_content) + # Make a notification that the bundle have been cooked + # NOT YET IMPLEMENTED see TODO in function. + self._notify_bundle_ready(dir_id) + + def update_cache(self, dir_id, bundle_content): + self.cache.add('directory', dir_id, bundle_content) + + def notify_bundle_ready(self, bundle_id): + # TODO plug this method with the notification method once + # done. + pass + + +class DirectoryCooker(): + """Creates a cooked directory from its sha1_git in the db. + + Warning: This is NOT a directly accessible cooker, but a low-level + one that effectuates the manipulations. + + """ + def __init__(self, storage): + self.storage = storage + + def get_directory_bytes(self, dir_id): + # Create temporary folder to retrieve the files into. + root = bytes(tempfile.mkdtemp(prefix='directory.', + suffix='.cook'), 'utf8') + # Retrieve data from the database. data = list(self.storage.directory_ls(dir_id, recursive=True)) + # Split into files and directory data. data1, data2 = itertools.tee(data, 2) dir_data = (entry['name'] for entry in data1 if entry['type'] == 'dir') file_data = (entry for entry in data2 if entry['type'] == 'file') - # Recreate the directory + # Recreate the directory's subtree and then the files into it. self._create_tree(root, dir_data) self._create_files(root, file_data) - # Use the created directory to get the bundle datas + # Use the created directory to make a bundle with the data as + # a compressed directory. bundle_content = self._create_bundle_content( root, hashutil.hash_to_hex(dir_id) ) - self._cache_bundle(dir_id, bundle_content) - - # Make a notification that the bundle have been cooked - self._notify_bundle_ready(dir_id) + return bundle_content def _create_tree(self, root, directory_paths): """Create a directory tree from the given paths @@ -104,7 +149,7 @@ os.makedirs(os.path.join(root, dir_name)) def _create_files(self, root, file_datas): - """Iterates over the file datas and delegate to the right method. + """Create the files according to their status. """ # Then create the files @@ -119,15 +164,20 @@ content = self._get_file_content(file_data['sha1']) self._create_file(path, content) - def _get_file_content(self, obj_id): - content = list(self.storage.content_get([obj_id]))[0]['data'] - return content - def _create_file(self, path, content): - """Create the given file and fill it with content.""" + """Create the given file and fill it with content. + + """ with open(path, 'wb') as f: f.write(content) + def _get_file_content(self, obj_id): + """Get the content of the given file. + + """ + content = list(self.storage.content_get([obj_id]))[0]['data'] + return content + def _create_file_absent(self, path): """Create a file that indicates a skipped content @@ -156,18 +206,11 @@ hex_dir_id: hex representation of the directory id Returns: - a path to the newly created archive file. + bytes that represents the compressed directory as a + bundle. """ tar_buffer = io.BytesIO() tar = tarfile.open(fileobj=tar_buffer, mode='w') tar.add(path.decode(), arcname=hex_dir_id) return tar_buffer.getbuffer() - - def _cache_bundle(self, dir_id, bundle_content): - self.cache.add('directory', dir_id, bundle_content) - - def _notify_bundle_ready(self, bundle_id): - # TODO plug this method with the notification method once - # done. - pass