diff --git a/swh/storage/vault/api/client.py b/swh/storage/vault/api/client.py index c158245b2..0ea2d79b8 100644 --- a/swh/storage/vault/api/client.py +++ b/swh/storage/vault/api/client.py @@ -1,35 +1,27 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.core import hashutil from swh.core.api import SWHRemoteAPI from swh.storage.exc import StorageAPIError class RemoteVaultCache(SWHRemoteAPI): """Client to the Software Heritage vault cache.""" def __init__(self, base_url): super().__init__(api_exception=StorageAPIError, url=base_url) - def directory_ls(self): - return self.get('vault/directory/') + def ls(self, obj_type): + return self.get('vault/{}/'.format(obj_type)) - def directory_get(self, obj_id): - return self.get('vault/directory/%s/' % (hashutil.hash_to_hex(obj_id))) + def get(self, obj_type, obj_id): + return self.get('vault/{}/{}/'.format(obj_type, + hashutil.hash_to_hex(obj_id))) - def directory_cook(self, obj_id): - return self.post('vault/directory/%s/' % hashutil.hash_to_hex(obj_id), - data={}) - - def revision_ls(self): - return self.get('vault/revision/') - - def revision_get(self, obj_id): - return self.get('vault/revision/%s/' % (hashutil.hash_to_hex(obj_id))) - - def revision_cook(self, obj_id): - return self.post('vault/revision/%s/' % hashutil.hash_to_hex(obj_id), + def cook(self, obj_type, obj_id): + return self.post('vault/{}/{}/'.format(obj_type, + hashutil.hash_to_hex(obj_id)), data={}) diff --git a/swh/storage/vault/api/cooking_tasks.py b/swh/storage/vault/api/cooking_tasks.py index ac08f7475..4a6ccc685 100644 --- a/swh/storage/vault/api/cooking_tasks.py +++ b/swh/storage/vault/api/cooking_tasks.py @@ -1,32 +1,26 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task from swh.core import hashutil from ..cache import VaultCache -from ..cooker import DirectoryVaultCooker, RevisionVaultCooker +from ..cooker import COOKER_TYPES from ... import get_storage -COOKER_TYPES = { - 'directory': DirectoryVaultCooker, - 'revision': RevisionVaultCooker, -} - - class SWHCookingTask(Task): """Main task which archives a contents batch. """ task_queue = 'swh_storage_vault_cooking' def run(self, type, hex_id, storage_args, cache_args): # Initialize elements storage = get_storage(**storage_args) cache = VaultCache(**cache_args) # Initialize cooker cooker = COOKER_TYPES[type](storage, cache) # Perform the cooking cooker.cook(obj_id=hashutil.hex_to_hash(hex_id)) diff --git a/swh/storage/vault/api/server.py b/swh/storage/vault/api/server.py index 6ec9843df..dd572fe87 100644 --- a/swh/storage/vault/api/server.py +++ b/swh/storage/vault/api/server.py @@ -1,104 +1,101 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click +import re from flask import abort, g from werkzeug.routing import BaseConverter from swh.core import config from swh.core.api import (SWHServerAPIApp, error_handler, encode_data_server as encode_data) -from swh.storage import get_storage -from swh.storage.vault.api import cooking_tasks # NOQA +from swh.scheduler.utils import get_task +from swh.storage.vault.api.cooking_tasks import COOKER_TYPES +from swh.storage.vault.api.cooking_tasks import SWHCookingTask # noqa from swh.storage.vault.cache import VaultCache -from swh.storage.vault.cooker import DirectoryVaultCooker -from swh.scheduler.celery_backend.config import app as celery_app cooking_task_name = 'swh.storage.vault.api.cooking_tasks.SWHCookingTask' DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'local', 'args': { 'db': 'dbname=softwareheritage-dev', 'objstorage': { 'root': '/tmp/objects', 'slicing': '0:2/2:4/4:6', }, }, }), 'cache': ('dict', {'root': '/tmp/vaultcache'}) } -class RegexConverter(BaseConverter): +class CookerConverter(BaseConverter): def __init__(self, url_map, *items): super().__init__(url_map) - self.regex = items[0] + types = [re.escape(c) for c in COOKER_TYPES] + self.regex = '({})'.format('|'.join(types)) app = SWHServerAPIApp(__name__) -app.url_map.converters['regex'] = RegexConverter +app.url_map.converters['cooker'] = CookerConverter @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) @app.before_request def before_request(): g.cache = VaultCache(**app.config['cache']) - g.cooker = DirectoryVaultCooker( - get_storage(**app.config['storage']), - g.cache - ) @app.route('/') def index(): return 'SWH vault API server' -@app.route('/vault//', +@app.route('/vault//', methods=['GET']) def ls_directory(type): return encode_data(list( g.cache.ls(type) )) -@app.route('/vault///', +@app.route('/vault///', methods=['GET']) def get_cooked_directory(type, id): if not g.cache.is_cached(type, id): abort(404) return encode_data(g.cache.get(type, id).decode()) -@app.route('/vault///', +@app.route('/vault///', methods=['POST']) def cook_request_directory(type, id): - task = celery_app.tasks[cooking_task_name] + task = get_task(cooking_task_name) task.delay(type, id, app.config['storage'], app.config['cache']) # Return url to get the content and 201 CREATED return encode_data('/vault/%s/%s/' % (type, id)), 201 @click.command() @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5005, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def launch(config_path, host, port, debug): app.config.update(config.read(config_path, DEFAULT_CONFIG)) app.run(host, port=int(port), debug=bool(debug)) if __name__ == '__main__': launch() diff --git a/swh/storage/vault/cache.py b/swh/storage/vault/cache.py index cbc662c8a..b555fd6ca 100644 --- a/swh/storage/vault/cache.py +++ b/swh/storage/vault/cache.py @@ -1,69 +1,53 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from swh.core import hashutil from swh.objstorage import get_objstorage from swh.objstorage.objstorage_pathslicing import DIR_MODE -BUNDLE_TYPES = { - 'directory': 'd', - 'revision': 'r', - 'snapshot': 's', -} - class VaultCache(): """The vault cache is an object storage that stores bundles The current implementation uses a PathSlicingObjStorage to store the bundles. The id of a content if prefixed to specify its type and store different types of bundle in different folders. """ def __init__(self, root): - for subdir in BUNDLE_TYPES.values(): - fp = os.path.join(root, subdir) - if not os.path.isdir(fp): - os.makedirs(fp, DIR_MODE, exist_ok=True) - - self.storages = { - type: get_objstorage( - 'pathslicing', {'root': os.path.join(root, subdir), - 'slicing': '0:1/0:5'} - ) - for type, subdir in BUNDLE_TYPES.items() - } - - def __contains__(self, obj_id): - for storage in self.storages: - if obj_id in storage: - return True - return False + self.root = root + self.storages = {} def add(self, obj_type, obj_id, content): storage = self._get_storage(obj_type) return storage.add(content, obj_id) def get(self, obj_type, obj_id): storage = self._get_storage(obj_type) return storage.get(hashutil.hex_to_hash(obj_id)) def is_cached(self, obj_type, obj_id): storage = self._get_storage(obj_type) return hashutil.hex_to_hash(obj_id) in storage def ls(self, obj_type): storage = self._get_storage(obj_type) yield from storage def _get_storage(self, obj_type): """Get the storage that corresponds to the object type""" - try: - return self.storages[obj_type] - except: - raise ValueError('Wrong bundle type: ' + obj_type) + + if obj_type not in self.storages: + fp = os.path.join(self.root, obj_type) + if not os.path.isdir(fp): + os.makedirs(fp, DIR_MODE, exist_ok=True) + + self.storages[obj_type] = get_objstorage( + 'pathslicing', {'root': fp, 'slicing': '0:1/0:5'}) + + return self.storages[obj_type] diff --git a/swh/storage/vault/cooker.py b/swh/storage/vault/cooker.py index 8210db934..a84e19843 100644 --- a/swh/storage/vault/cooker.py +++ b/swh/storage/vault/cooker.py @@ -1,296 +1,280 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import itertools import logging import os import tarfile import tempfile from pathlib import Path from swh.core import hashutil SKIPPED_MESSAGE = (b'This content have not been retrieved in ' b'Software Heritage archive due to its size') HIDDEN_MESSAGE = (b'This content is hidden') def get_tar_bytes(path, arcname=None): path = Path(path) if not arcname: arcname = path.name tar_buffer = io.BytesIO() tar = tarfile.open(fileobj=tar_buffer, mode='w') tar.add(str(path), arcname=arcname) return tar_buffer.getbuffer() class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(obj_id): cook the object into a bundle - def notify_bundle_ready(notif_data, bundle_id): notify the bundle is ready. """ CACHE_TYPE_KEY = None def __init__(self, storage, cache): self.storage = storage self.cache = cache @abc.abstractmethod def cook(self, obj_id): """Cook the requested object into a bundle The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: obj_id: id of the object to be cooked into a bundle. """ pass def update_cache(self, id, bundle_content): """Update the cache with id and bundle_content. """ self.cache.add(self.CACHE_TYPE_KEY, id, bundle_content) @abc.abstractmethod def notify_bundle_ready(self, notif_data, bundle_id): """Notify the bundle bundle_id is ready. """ pass -class DirectoryVaultCooker(BaseVaultCooker): +class DirectoryCooker(BaseVaultCooker): """Cooker to create a directory bundle """ CACHE_TYPE_KEY = 'directory' - def __init__(self, storage, cache): - """Initialize a cooker that create directory bundles - - Args: - storage: source storage where content are retrieved. - cache: destination storage where the cooked bundle are stored. - - """ - self.storage = storage - self.cache = cache - def cook(self, obj_id): """Cook the requested directory into a Bundle Args: obj_id (bytes): the id of the directory to be cooked. Returns: bytes that correspond to the bundle """ # Create the bytes that corresponds to the compressed # directory. - directory_cooker = DirectoryCooker(self.storage) + directory_cooker = DirectoryBuilder(self.storage) bundle_content = directory_cooker.get_directory_bytes(obj_id) # Cache the bundle self.update_cache(obj_id, bundle_content) # Make a notification that the bundle have been cooked # NOT YET IMPLEMENTED see TODO in function. self.notify_bundle_ready( notif_data='Bundle %s ready' % hashutil.hash_to_hex(obj_id), bundle_id=obj_id) def notify_bundle_ready(self, notif_data, bundle_id): # TODO plug this method with the notification method once # done. pass -class RevisionVaultCooker(BaseVaultCooker): +class RevisionFlatCooker(BaseVaultCooker): """Cooker to create a directory bundle """ - CACHE_TYPE_KEY = 'revision' - - def __init__(self, storage, cache): - """Initialize a cooker that create revision bundles - - Args: - storage: source storage where content are retrieved. - cache: destination storage where the cooked bundle are stored. - - """ - self.storage = storage - self.cache = cache + CACHE_TYPE_KEY = 'revision_flat' def cook(self, obj_id): """Cook the requested revision into a Bundle Args: obj_id (bytes): the id of the revision to be cooked. Returns: bytes that correspond to the bundle """ - directory_cooker = DirectoryCooker(self.storage) + directory_cooker = DirectoryBuilder(self.storage) with tempfile.TemporaryDirectory(suffix='.cook') as root_tmp: root = Path(root_tmp) for revision in self.storage.revision_log([obj_id]): revdir = root / hashutil.hash_to_hex(revision['id']) revdir.mkdir() directory_cooker.build_directory(revision['directory'], str(revdir).encode()) bundle_content = get_tar_bytes(root_tmp, hashutil.hash_to_hex(obj_id)) # Cache the bundle self.update_cache(obj_id, bundle_content) # Make a notification that the bundle have been cooked # NOT YET IMPLEMENTED see TODO in function. self.notify_bundle_ready( notif_data='Bundle %s ready' % hashutil.hash_to_hex(obj_id), bundle_id=obj_id) def notify_bundle_ready(self, notif_data, bundle_id): # TODO plug this method with the notification method once # done. pass -class DirectoryCooker(): +class DirectoryBuilder: """Creates a cooked directory from its sha1_git in the db. Warning: This is NOT a directly accessible cooker, but a low-level one that executes the manipulations. """ def __init__(self, storage): self.storage = storage def get_directory_bytes(self, dir_id): # Create temporary folder to retrieve the files into. root = bytes(tempfile.mkdtemp(prefix='directory.', suffix='.cook'), 'utf8') self.build_directory(dir_id, root) # Use the created directory to make a bundle with the data as # a compressed directory. bundle_content = self._create_bundle_content( root, hashutil.hash_to_hex(dir_id)) return bundle_content def build_directory(self, dir_id, root): # Retrieve data from the database. data = self.storage.directory_ls(dir_id, recursive=True) # Split into files and directory data. # TODO(seirl): also handle revision data. data1, data2 = itertools.tee(data, 2) dir_data = (entry['name'] for entry in data1 if entry['type'] == 'dir') file_data = (entry for entry in data2 if entry['type'] == 'file') # Recreate the directory's subtree and then the files into it. self._create_tree(root, dir_data) self._create_files(root, file_data) def _create_tree(self, root, directory_paths): """Create a directory tree from the given paths The tree is created from `root` and each given path in `directory_paths` will be created. """ # Directories are sorted by depth so they are created in the # right order bsep = bytes(os.path.sep, 'utf8') dir_names = sorted( directory_paths, key=lambda x: len(x.split(bsep))) for dir_name in dir_names: os.makedirs(os.path.join(root, dir_name)) def _create_files(self, root, file_datas): """Create the files according to their status. """ # Then create the files for file_data in file_datas: path = os.path.join(root, file_data['name']) status = file_data['status'] perms = file_data['perms'] if status == 'absent': self._create_file_absent(path) elif status == 'hidden': self._create_file_hidden(path) else: content = self._get_file_content(file_data['sha1']) self._create_file(path, content, perms) def _create_file(self, path, content, perms=0o100644): """Create the given file and fill it with content. """ if perms not in (0o100644, 0o100755, 0o120000): logging.warning('File {} has invalid permission {}, ' 'defaulting to 644.'.format(path, perms)) if perms == 0o120000: # Symbolic link os.symlink(content, path) else: with open(path, 'wb') as f: f.write(content) os.chmod(path, perms & 0o777) def _get_file_content(self, obj_id): """Get the content of the given file. """ content = list(self.storage.content_get([obj_id]))[0]['data'] return content def _create_file_absent(self, path): """Create a file that indicates a skipped content Create the given file but fill it with a specific content to indicate that the content have not been retrieved by the software heritage archive due to its size. """ self._create_file(self, SKIPPED_MESSAGE) def _create_file_hidden(self, path): """Create a file that indicates an hidden content Create the given file but fill it with a specific content to indicate that the content could not be retrieved due to privacy policy. """ self._create_file(self, HIDDEN_MESSAGE) def _create_bundle_content(self, path, hex_dir_id): """Create a bundle from the given directory Args: path: location of the directory to package. hex_dir_id: hex representation of the directory id Returns: bytes that represent the compressed directory as a bundle. """ return get_tar_bytes(path.decode(), hex_dir_id) + + +COOKER_TYPES = { + 'directory': DirectoryCooker, + 'revision_flat': RevisionFlatCooker, +}