diff --git a/swh/storage/vault/api/client.py b/swh/storage/vault/api/client.py new file mode 100644 --- /dev/null +++ b/swh/storage/vault/api/client.py @@ -0,0 +1,71 @@ +# Copyright (C) 2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pickle +import requests + +from swh.core import hashutil +from swh.storage.exc import StorageAPIError +from swh.objstorage.api.common import (decode_response, + encode_data_client as encode_data) + + +class RemoteVaultCache(): + """Client to the Software Heritage vault cache.""" + + def __init__(self, base_url): + self.base_url = base_url if base_url.endswith('/') else base_url + '/' + self.session = requests.Session() + + def url(self, endpoint): + return'%s%s' % (self.base_url, endpoint) + + def post(self, endpoint, data): + try: + response = self.session.post( + self.url(endpoint), + data=encode_data(data), + headers={'content-type': 'application/x-msgpack'}, + ) + except ConnectionError as e: + print(str(e)) + raise StorageAPIError(e) + + # XXX: this breaks language-independence and should be + # replaced by proper unserialization + if response.status_code == 400: + raise pickle.loads(decode_response(response)) + + return decode_response(response) + + def get(self, endpoint, data=None): + try: + response = self.session.get( + self.url(endpoint), + params=data, + ) + except ConnectionError as e: + print(str(e)) + raise StorageAPIError(e) + + if response.status_code == 404: + return None + + # XXX: this breaks language-independence and should be + # replaced by proper unserialization + if response.status_code == 400: + raise pickle.loads(decode_response(response)) + else: + return decode_response(response) + + def directory_ls(self): + return self.get('vault/directory/') + + def directory_get(self, obj_id): + return self.get('vault/directory/%s/' % (hashutil.hash_to_hex(obj_id))) + + def directory_cook(self, obj_id): + return self.post('vault/directory/%s/' % hashutil.hash_to_hex(obj_id), + data={}) diff --git a/swh/storage/vault/api/cooking_tasks.py b/swh/storage/vault/api/cooking_tasks.py new file mode 100644 --- /dev/null +++ b/swh/storage/vault/api/cooking_tasks.py @@ -0,0 +1,22 @@ +# Copyright (C) 2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.scheduler.task import Task +from ..cache import VaultCache +from ..cooker import DirectoryVaultCooker +from ... import get_storage + + +class SWHCookingTask(Task): + """ Main task that archive a batch of content. + """ + task_queue = 'swh_storage_vault_cooking' + + def run(self, dir_id, storage_args, cache_args): + storage = get_storage(**storage_args) + cache = VaultCache(**cache_args) + directory_cooker = DirectoryVaultCooker(storage, cache) + + directory_cooker.cook(dir_id) diff --git a/swh/storage/vault/api/server.py b/swh/storage/vault/api/server.py new file mode 100644 --- /dev/null +++ b/swh/storage/vault/api/server.py @@ -0,0 +1,92 @@ +# Copyright (C) 2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import click +from flask import Flask, abort, g +from swh.core import config +from swh.objstorage.api.common import encode_data_server as encode_data +from swh.objstorage.api.common import BytesRequest, error_handler +from swh.storage import get_storage +from swh.storage.vault.api import cooking_tasks # NOQA +from swh.storage.vault.cache import VaultCache +from swh.storage.vault.cooker import DirectoryVaultCooker + +# from swh.scheduler.celery_backend.config import app as celery_app + + +cooking_task_name = 'swh.storage.vault.api.cooking_task.SWHCookingTask' + + +DEFAULT_CONFIG = { + 'storage': ('dict', {'storage_class': 'local_storage', + 'storage_args': [ + 'dbname=softwareheritage-dev', + '/tmp/objects' + ] + }), + 'cache': ('dict', {'root': '/tmp/vaultcache'}) +} + + +app = Flask(__name__) +app.request_class = BytesRequest + + +@app.errorhandler(Exception) +def my_error_handler(exception): + return error_handler(exception, encode_data) + + +@app.before_request +def before_request(): + g.cache = VaultCache(**app.config['cache']) + g.cooker = DirectoryVaultCooker( + get_storage(**app.config['storage']), + g.cache + ) + + +@app.route('/') +def index(): + return 'SWH vault API server' + + +@app.route('/vault/directory/', methods=['GET']) +def ls_directory(): + return encode_data(list( + g.cache.ls('directory') + )) + + +@app.route('/vault/directory//', methods=['GET']) +def get_directory(dir_id): + if not g.cache.is_cached('directory', dir_id): + abort(404) + return encode_data(g.cache.get('directory', dir_id).decode()) + + +@app.route('/vault/directory//', methods=['POST']) +def cook_request_directory(dir_id): + # TODO use this code when celery is fully configured for now, do + # in synchronously. + # task = celery_app.tasks[cooking_task_name] + # task.delay(dir_id, app.config['storage']) + return encode_data(g.cooker.cook(dir_id)) + + +@click.command() +@click.argument('config-path', required=1) +@click.option('--host', default='0.0.0.0', help="Host to run the server") +@click.option('--port', default=5000, type=click.INT, + help="Binding port of the server") +@click.option('--debug/--nodebug', default=True, + help="Indicates if the server should run in debug mode") +def launch(config_path, host, port, debug): + app.config.update(config.read(config_path, DEFAULT_CONFIG)) + app.run(host, port=int(port), debug=bool(debug)) + + +if __name__ == '__main__': + launch()