diff --git a/swh/storage/vault/api/cooking_tasks.py b/swh/storage/vault/api/cooking_tasks.py index feee6f1..cbe6ef3 100644 --- a/swh/storage/vault/api/cooking_tasks.py +++ b/swh/storage/vault/api/cooking_tasks.py @@ -1,24 +1,32 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task from swh.core import hashutil from ..cache import VaultCache from ..cooker import DirectoryVaultCooker from ... import get_storage +COOKER_TYPES = { + 'directory': DirectoryVaultCooker +} + + class SWHCookingTask(Task): """ Main task that archive a batch of content. """ task_queue = 'swh_storage_vault_cooking' - def run(self, hex_dir_id, storage_args, cache_args): + def run(self, type, hex_dir_id, storage_args, cache_args): + # Initialize elements storage = get_storage(**storage_args) cache = VaultCache(**cache_args) - directory_cooker = DirectoryVaultCooker(storage, cache) - + # Initialize cooker + vault_cooker_class = COOKER_TYPES[type] + cooker = vault_cooker_class(storage, cache) + # Perform the cooking dir_id = hashutil.hex_to_hash(hex_dir_id) - directory_cooker.cook(dir_id) + cooker.cook(dir_id) diff --git a/swh/storage/vault/api/server.py b/swh/storage/vault/api/server.py index 075c751..ba9e6a6 100644 --- a/swh/storage/vault/api/server.py +++ b/swh/storage/vault/api/server.py @@ -1,92 +1,101 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click from flask import Flask, abort, g +from werkzeug.routing import BaseConverter from swh.core import config from swh.objstorage.api.common import encode_data_server as encode_data from swh.objstorage.api.common import BytesRequest, error_handler from swh.storage import get_storage from swh.storage.vault.api import cooking_tasks # NOQA from swh.storage.vault.cache import VaultCache from swh.storage.vault.cooker import DirectoryVaultCooker from swh.scheduler.celery_backend.config import app as celery_app -from flask_profile import Profiler cooking_task_name = 'swh.storage.vault.api.cooking_tasks.SWHCookingTask' DEFAULT_CONFIG = { 'storage': ('dict', {'storage_class': 'local_storage', 'storage_args': [ 'dbname=softwareheritage-dev', '/tmp/objects' ] }), 'cache': ('dict', {'root': '/tmp/vaultcache'}) } +class RegexConverter(BaseConverter): + def __init__(self, url_map, *items): + super().__init__(url_map) + self.regex = items[0] + + app = Flask(__name__) -Profiler(app) app.request_class = BytesRequest +app.url_map.converters['regex'] = RegexConverter @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) @app.before_request def before_request(): g.cache = VaultCache(**app.config['cache']) g.cooker = DirectoryVaultCooker( get_storage(**app.config['storage']), g.cache ) @app.route('/') def index(): return 'SWH vault API server' -@app.route('/vault/directory/', methods=['GET']) -def ls_directory(): +@app.route('/vault//', + methods=['GET']) +def ls_directory(type): return encode_data(list( - g.cache.ls('directory') + g.cache.ls(type) )) -@app.route('/vault/directory//', methods=['GET']) -def get_cooked_directory(dir_id): - if not g.cache.is_cached('directory', dir_id): +@app.route('/vault///', + methods=['GET']) +def get_cooked_directory(type, id): + if not g.cache.is_cached(type, id): abort(404) - return encode_data(g.cache.get('directory', dir_id).decode()) + return encode_data(g.cache.get(type, id).decode()) -@app.route('/vault/directory//', methods=['POST']) -def cook_request_directory(dir_id): +@app.route('/vault///', + methods=['POST']) +def cook_request_directory(type, id): task = celery_app.tasks[cooking_task_name] - task.delay(dir_id, app.config['storage'], app.config['cache']) + task.delay(type, id, app.config['storage'], app.config['cache']) # Return url to get the content and 201 CREATED - return encode_data('/vault/directory/dir_id/'), 201 + return encode_data('/vault/%s/%s/' % (type, id)), 201 @click.command() @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5000, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): app.config.update(config.read(config_path, DEFAULT_CONFIG)) app.run(host, port=int(port), debug=bool(debug)) if __name__ == '__main__': launch() diff --git a/swh/storage/vault/cache.py b/swh/storage/vault/cache.py index 588682a..cbc662c 100644 --- a/swh/storage/vault/cache.py +++ b/swh/storage/vault/cache.py @@ -1,64 +1,69 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from swh.core import hashutil from swh.objstorage import get_objstorage from swh.objstorage.objstorage_pathslicing import DIR_MODE BUNDLE_TYPES = { 'directory': 'd', + 'revision': 'r', + 'snapshot': 's', } class VaultCache(): """The vault cache is an object storage that stores bundles The current implementation uses a PathSlicingObjStorage to store the bundles. The id of a content if prefixed to specify its type and store different types of bundle in different folders. """ def __init__(self, root): for subdir in BUNDLE_TYPES.values(): fp = os.path.join(root, subdir) if not os.path.isdir(fp): os.makedirs(fp, DIR_MODE, exist_ok=True) self.storages = { type: get_objstorage( 'pathslicing', {'root': os.path.join(root, subdir), 'slicing': '0:1/0:5'} ) for type, subdir in BUNDLE_TYPES.items() } def __contains__(self, obj_id): - return obj_id in self.storage + for storage in self.storages: + if obj_id in storage: + return True + return False def add(self, obj_type, obj_id, content): storage = self._get_storage(obj_type) return storage.add(content, obj_id) def get(self, obj_type, obj_id): storage = self._get_storage(obj_type) return storage.get(hashutil.hex_to_hash(obj_id)) def is_cached(self, obj_type, obj_id): storage = self._get_storage(obj_type) return hashutil.hex_to_hash(obj_id) in storage def ls(self, obj_type): storage = self._get_storage(obj_type) yield from storage def _get_storage(self, obj_type): """Get the storage that corresponds to the object type""" try: return self.storages[obj_type] except: raise ValueError('Wrong bundle type: ' + obj_type) diff --git a/swh/storage/vault/cooker.py b/swh/storage/vault/cooker.py index b48c4a8..e4ae53a 100644 --- a/swh/storage/vault/cooker.py +++ b/swh/storage/vault/cooker.py @@ -1,173 +1,216 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import os import tarfile import tempfile import itertools from swh.core import hashutil + SKIPPED_MESSAGE = (b'This content have not been retrieved in ' b'Software Heritage archive due to its size') HIDDEN_MESSAGE = (b'This content is hidden') class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. """ + def __init__(self, storage, cache): + self.storage = storage + self.cache = cache + @abc.abstractmethod def cook(self, obj_id): """Cook the requested object into a bundle The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: obj_id: id of the object to be cooked into a bundle. """ raise NotImplementedError( 'Vault cookers must implement a `cook` method') + @abc.abstractmethod + def update_cache(self, id, bundle_content): + raise NotImplementedError('Vault cookers must implement a ' + '`update_cache` method') + + @abc.abstractmethod + def notify_bundle_ready(self, notif_data, bundle_id): + raise NotImplementedError( + 'Vault cookers must implement a `notify_bundle_ready` method') + class DirectoryVaultCooker(BaseVaultCooker): """Cooker to create a directory bundle """ def __init__(self, storage, cache): """Initialize a cooker that create directory bundles Args: storage: source storage where content are retrieved. cache: destination storage where the cooked bundle are stored. """ self.storage = storage self.cache = cache def cook(self, dir_id): """Cook the requested directory into a Bundle Args: dir_id (bytes): the id of the directory to be cooked. Returns: bytes that correspond to the bundle """ - root = bytes(tempfile.mkdtemp(prefix='directory.', suffix='.cook'), - 'utf8') - # Retrieve data from the database + # Create the bytes that corresponds to the compressed + # directory. + directory_cooker = DirectoryCooker(self.storage) + bundle_content = directory_cooker.get_directory_bytes(dir_id) + # Cache the bundle + self._cache_bundle(dir_id, bundle_content) + # Make a notification that the bundle have been cooked + # NOT YET IMPLEMENTED see TODO in function. + self._notify_bundle_ready(dir_id) + + def update_cache(self, dir_id, bundle_content): + self.cache.add('directory', dir_id, bundle_content) + + def notify_bundle_ready(self, bundle_id): + # TODO plug this method with the notification method once + # done. + pass + + +class DirectoryCooker(): + """Creates a cooked directory from its sha1_git in the db. + + Warning: This is NOT a directly accessible cooker, but a low-level + one that effectuates the manipulations. + + """ + def __init__(self, storage): + self.storage = storage + + def get_directory_bytes(self, dir_id): + # Create temporary folder to retrieve the files into. + root = bytes(tempfile.mkdtemp(prefix='directory.', + suffix='.cook'), 'utf8') + # Retrieve data from the database. data = list(self.storage.directory_ls(dir_id, recursive=True)) + # Split into files and directory data. data1, data2 = itertools.tee(data, 2) dir_data = (entry['name'] for entry in data1 if entry['type'] == 'dir') file_data = (entry for entry in data2 if entry['type'] == 'file') - # Recreate the directory + # Recreate the directory's subtree and then the files into it. self._create_tree(root, dir_data) self._create_files(root, file_data) - # Use the created directory to get the bundle datas + # Use the created directory to make a bundle with the data as + # a compressed directory. bundle_content = self._create_bundle_content( root, hashutil.hash_to_hex(dir_id) ) - self._cache_bundle(dir_id, bundle_content) - - # Make a notification that the bundle have been cooked - self._notify_bundle_ready(dir_id) + return bundle_content def _create_tree(self, root, directory_paths): """Create a directory tree from the given paths The tree is created from `root` and each given path in `directory_paths` will be created. """ # Directories are sorted by depth so they are created in the # right order bsep = bytes(os.path.sep, 'utf8') dir_names = sorted( directory_paths, key=lambda x: len(x.split(bsep)) ) for dir_name in dir_names: os.makedirs(os.path.join(root, dir_name)) def _create_files(self, root, file_datas): - """Iterates over the file datas and delegate to the right method. + """Create the files according to their status. """ # Then create the files for file_data in file_datas: path = os.path.join(root, file_data['name']) status = file_data['status'] if status == 'absent': self._create_file_absent(path) elif status == 'hidden': self._create_file_hidden(path) else: content = self._get_file_content(file_data['sha1']) self._create_file(path, content) - def _get_file_content(self, obj_id): - content = list(self.storage.content_get([obj_id]))[0]['data'] - return content - def _create_file(self, path, content): - """Create the given file and fill it with content.""" + """Create the given file and fill it with content. + + """ with open(path, 'wb') as f: f.write(content) + def _get_file_content(self, obj_id): + """Get the content of the given file. + + """ + content = list(self.storage.content_get([obj_id]))[0]['data'] + return content + def _create_file_absent(self, path): """Create a file that indicates a skipped content Create the given file but fill it with a specific content to indicates that the content have not been retrieved by the software heritage archive due to its size. """ self._create_file(self, SKIPPED_MESSAGE) def _create_file_hidden(self, path): """Create a file that indicates an hidden content Create the given file but fill it with a specific content to indicates that the content could not be retrieved due to privacy policy. """ self._create_file(self, HIDDEN_MESSAGE) def _create_bundle_content(self, path, hex_dir_id): """Create a bundle from the given directory Args: path: location of the directory to package. hex_dir_id: hex representation of the directory id Returns: - a path to the newly created archive file. + bytes that represents the compressed directory as a + bundle. """ tar_buffer = io.BytesIO() tar = tarfile.open(fileobj=tar_buffer, mode='w') tar.add(path.decode(), arcname=hex_dir_id) return tar_buffer.getbuffer() - - def _cache_bundle(self, dir_id, bundle_content): - self.cache.add('directory', dir_id, bundle_content) - - def _notify_bundle_ready(self, bundle_id): - # TODO plug this method with the notification method once - # done. - pass