diff --git a/sql/swh-schema.sql b/sql/swh-schema.sql --- a/sql/swh-schema.sql +++ b/sql/swh-schema.sql @@ -14,7 +14,7 @@ ); insert into dbversion(version, release, description) - values(68, now(), 'Work In Progress'); + values(69, now(), 'Work In Progress'); -- a SHA1 checksum (not necessarily originating from Git) create domain sha1 as bytea check (length(value) = 20); @@ -426,3 +426,29 @@ ); create index on release(target, target_type); + + +-- In order to archive the content of the object storage, add +-- some tables to keep trace of what have already been archived. + +CREATE DOMAIN archive_id AS TEXT; + +CREATE TABLE archives ( + id archive_id PRIMARY KEY, + url TEXT +); + +CREATE TYPE archive_status AS ENUM ( + 'missing', + 'ongoing', + 'present' +); + +CREATE TABLE content_archive ( + content_id sha1 REFERENCES content(sha1), + archive_id archive_id REFERENCES archives(id), + status archive_status, + mtime timestamptz, + PRIMARY KEY (content_id, archive_id) +); + diff --git a/sql/upgrades/069.sql b/sql/upgrades/069.sql new file mode 100644 --- /dev/null +++ b/sql/upgrades/069.sql @@ -0,0 +1,28 @@ +-- SWH DB schema upgrade +-- from_version: 68 +-- to_version: 69 +-- description: add tables for the archiver. + +insert into dbversion(version, release, description) + values(69, now(), 'Work In Progress'); + +CREATE DOMAIN archive_id AS TEXT; + +CREATE TABLE archives ( + id archive_id PRIMARY KEY, + url TEXT +); + +CREATE TYPE archive_status AS ENUM ( + 'missing', + 'ongoing', + 'present' +); + +CREATE TABLE content_archive ( + content_id sha1 REFERENCES content(sha1), + archive_id archive_id REFERENCES archives(id), + status archive_status, + mtime timestamptz, + PRIMARY KEY (content_id, archive_id) +); diff --git a/swh/storage/objstorage/__init__.py b/swh/storage/objstorage/__init__.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage/__init__.py @@ -0,0 +1 @@ +from .objstorage import ObjStorage # NOQA diff --git a/swh/storage/objstorage/api/__init__.py b/swh/storage/objstorage/api/__init__.py new file mode 100644 diff --git a/swh/storage/objstorage/api/client.py b/swh/storage/objstorage/api/client.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage/api/client.py @@ -0,0 +1,111 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +import pickle + +import requests + +from requests.exceptions import ConnectionError +from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder +from swh.storage.exc import StorageAPIError + + +def encode_data(data): + try: + return msgpack_dumps(data) + except OverflowError as e: + raise ValueError('Limits were reached. Please, check your input.\n' + + str(e)) + + +def decode_response(response): + content_type = response.headers['content-type'] + + if content_type.startswith('application/x-msgpack'): + r = msgpack_loads(response.content) + elif content_type.startswith('application/json'): + r = response.json(cls=SWHJSONDecoder) + else: + raise ValueError('Wrong content type `%s` for API response' + % content_type) + + return r + + +class RemoteArchive(): + """ proxy to a remote archive storage. """ + def __init__(self, base_url): + self.base_url = base_url + self.session = requests.Session() + + def url(self, endpoint): + return '%s%s' % (self.base_url, endpoint) + + def post(self, endpoint, data): + try: + response = self.session.post( + self.url(endpoint), + data=encode_data(data), + headers={'content-type': 'application/x-msgpack'}, + ) + except ConnectionError as e: + print(str(e)) + raise StorageAPIError(e) + + # XXX: this breaks language-independence and should be + # replaced by proper unserialization + if response.status_code == 400: + raise pickle.loads(decode_response(response)) + + return decode_response(response) + + def content_add(self, bytes, obj_id=None): + """ Add a new object to the object storage. + + Args: + bytes: content of the object to be added to the storage. + obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When + given, obj_id will be trusted to match bytes. If missing, + obj_id will be computed on the fly. + + """ + return self.post('content/add', {'bytes': bytes, 'obj_id': obj_id}) + + def content_get(self, obj_id): + """ Retrieve the content of a given object. + + Args: + obj_id: The id of the object. + + Returns: + The content of the requested objects as bytes. + + Raises: + ObjNotFoundError: if the requested object is missing + """ + return self.post('content/get', {'obj_id': obj_id}) + + def content_check(self, obj_id): + """ Integrity check for a given object + + verify that the file object is in place, and that the gzipped content + matches the object id + + Args: + obj_id: The id of the object. + + Raises: + ObjNotFoundError: if the requested object is missing + Error: if the requested object is corrupt + """ + self.post('content/check', {'obj_id': obj_id}) + + +if __name__ == '__main__': + c = RemoteArchive('http://localhost:8080/') + id = c.content_add(bytes('ceci est un contenu de test', 'utf8')) + c.content_check(id) + content = c.content_get(id) diff --git a/swh/storage/objstorage/api/load_balancing_client.py b/swh/storage/objstorage/api/load_balancing_client.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage/api/load_balancing_client.py @@ -0,0 +1,132 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from . import client + +from swh.core import hashutil +from swh.storage.exc import ObjNotFoundError + + +class LoadBalancingClient(): + """ A proxy for multiple remote object storages. + + This proxy client does some load balancing between the remote + backup servers. + WARNING This is a work in progress and does not yet update + the archive database ('content_archive' table) + + Attributes: + db: the database which contains data about where the content is + saved. + server_queue (ClientBalancer): a container of remote storages that + allows to get the best suited remote storage. + """ + class ClientBalancer(): + """ Container of remote objstorage that selects the best one. + + Actually this queue does not take care of servers capacities + and only return the server that have been unused for the + largest amount of time. + + Attributes: + objstorages: The list of object storages, as a dict that + associates an archive id to the corresponding remote + objstorage. + """ + def __init__(self, *servers): + """ Initializes the remote storage servers queue. + + Args: + *servers: a list of tuples (server_id, server_url) + that represents the servers. + """ + storages = [] + for server_id, server_url in servers: + storages.append((server_id, client.RemoteStorage(server_url))) + self.objstorages = storages + + def get_any(self): + _id, storage = self.storages.pop(0) + self.storages.append((_id, storage)) + return storage + + def get_all_in(self, storage_list): + return [remote for id, remote in self.storages + if id in storage_list] + + def get_one_of(self, allowed_list): + pass + + def __init__(self, db, *servers): + self.db = db + self.balancer = self.ClientQueue(servers) + + def __get_storages_for(self, id): + """ Get the storages that contains id. + + Return: + All the storages with a content matching the given id. + """ + with self.db.transaction() as cur: + cur.execute("""SELECT archive_id + FROM content_archive + WHERE content_id='%s' and status='present' + """ % id) + res = list(cur.fetchall()) + return res + + def content_add(self, bytes, obj_id=None): + """ Add a new object to the object storage. + + Args: + bytes: content of the object to be added to the storage. + obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When + given, obj_id will be trusted to match bytes. If missing, + obj_id will be computed on the fly. + + """ + self.balancer.get_any.content_add(bytes, obj_id) + + def content_get(self, obj_id): + """ Retrieve the content of a given object. + + Args: + obj_id: The id of the object. + + Returns: + The content of the requested objects as bytes. + + Raises: + ObjNotFoundError: if the requested object is missing + """ + # Get a server where the content is present. + id = r'\x' + hashutil.hash_to_hex(obj_id['sha1']) + res = self.__get_storages_for(id) + if not res: + raise ObjNotFoundError(id) + return self.balancer.get_one_of(res).content_get(bytes, obj_id) + + def content_check(self, obj_id): + """ Integrity check for a given object + + Verify that the file object is in place, and that the gzipped content + matches the object id + + Args: + obj_id: The id of the object. + + Raises: + ObjNotFoundError: if the requested object is missing + Error: if the requested object is corrupt + """ + # Get the servers that contains the content. + id = r'\x' + hashutil.hash_to_hex(obj_id) + res = self.__get_storages_for(id) + # And do the check on each of them + # TODO This is a good place to do some self_healing + # because we have access to all the storages. + for remote in self.balancer.get_all_in(res): + remote.content_check(obj_id) diff --git a/swh/storage/objstorage/api/server.py b/swh/storage/objstorage/api/server.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage/api/server.py @@ -0,0 +1,96 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pickle +import json + +from flask import Flask, Request, Response, g, request + +from swh.core import config +from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder +from swh.storage.objstorage import ObjStorage + +DEFAULT_CONFIG = { + 'storage_base': ('str', '/tmp/swh/backup/'), + 'storage_depth': ('int', 3), +} + + +class BytesRequest(Request): + """Request with proper escaping of arbitrary byte sequences.""" + encoding = 'utf-8' + encoding_errors = 'surrogateescape' + +app = Flask(__name__) +app.request_class = BytesRequest + + +def encode_data(data): + return Response( + msgpack_dumps(data), + mimetype='application/x-msgpack', + ) + + +def decode_request(request): + content_type = request.mimetype + data = request.get_data() + + if content_type == 'application/x-msgpack': + r = msgpack_loads(data) + elif content_type == 'application/json': + r = json.loads(data, cls=SWHJSONDecoder) + else: + raise ValueError('Wrong content type `%s` for API request' + % content_type) + + return r + + +@app.errorhandler(Exception) +def error_handler(exception): + # XXX: this breaks language-independence and should be + # replaced by proper serialization of errors + response = encode_data(pickle.dumps(exception)) + response.status_code = 400 + return response + + +@app.before_request +def before_request(): + g.objstorage = ObjStorage(app.config['storage_base'], + app.config['storage_depth']) + + +@app.route('/') +def index(): + return "Helloworld!" + + +@app.route('/content') +def content(): + return str(list(g.storage)) + + +@app.route('/content/add', methods=['POST']) +def add_bytes(): + return encode_data(g.objstorage.add_bytes(**decode_request(request))) + + +@app.route('/content/get', methods=['POST']) +def get_bytes(): + return encode_data(g.objstorage.get_bytes(**decode_request(request))) + + +@app.route('/content/check', methods=['POST']) +def check(): + # TODO verify that an error on this content will be properly intercepted + # by @app.errorhandler and the answer will be sent to client. + return encode_data(g.objstorage.check(**decode_request(request))) + + +if __name__ == '__main__': + app.config.update(config.read(None, DEFAULT_CONFIG)) + app.run(host='0.0.0.0', port=8080, debug=True) diff --git a/swh/storage/objstorage.py b/swh/storage/objstorage/objstorage.py rename from swh/storage/objstorage.py rename to swh/storage/objstorage/objstorage.py --- a/swh/storage/objstorage.py +++ b/swh/storage/objstorage/objstorage.py @@ -10,7 +10,7 @@ from contextlib import contextmanager -from .exc import ObjNotFoundError, Error +from swh.storage.exc import ObjNotFoundError, Error from swh.core import hashutil diff --git a/swh/storage/tests/test_objstorage_api.py b/swh/storage/tests/test_objstorage_api.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_objstorage_api.py @@ -0,0 +1,128 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import multiprocessing +import socket +import time +import tempfile +import unittest + +from urllib.request import urlopen + +from nose.tools import istest +from nose.plugins.attrib import attr + +from swh.core import hashutil +from swh.storage.exc import ObjNotFoundError, Error +from swh.storage.objstorage.objstorage import _obj_path +from swh.storage.objstorage.api.client import RemoteArchive +from swh.storage.objstorage.api.server import app + + +@attr('!db') +class TestRemoteArchive(unittest.TestCase): + """ Test the remote archive API. + """ + + def setUp(self): + self.start_server() + self.objstorage = RemoteArchive(self.url()) + + def tearDown(self): + self.stop_server() + + def url(self): + return 'http://127.0.0.1:%d/' % self.port + + def start_server(self): + """ Spawn the API server using multiprocessing. + """ + self.process = None + + # WSGI app configuration + self.app = app + self.app.config['storage_base'] = tempfile.mkdtemp() + self.app.config['storage_depth'] = 3 + # Get an available port number + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(('127.0.0.1', 0)) + self.port = sock.getsockname()[1] + sock.close() + + # Worker function for multiprocessing + def worker(app, port): + return app.run(port=port, use_reloader=False) + + self.process = multiprocessing.Process( + target=worker, args=(self.app, self.port) + ) + self.process.start() + + # Wait max 5 seconds for server to spawn + i = 0 + while i < 20: + try: + urlopen(self.url()) + except Exception: + i += 1 + time.sleep(0.25) + else: + return + + def stop_server(self): + """ Terminate the API server's process. + """ + if self.process: + self.process.terminate() + + @istest + def content_add(self): + content = bytes('Test content', 'utf8') + id = self.objstorage.content_add(content) + self.assertEquals(self.objstorage.content_get(id), content) + + @istest + def content_get_present(self): + content = bytes('content_get_present', 'utf8') + content_hash = hashutil.hashdata(content) + id = self.objstorage.content_add(content) + self.assertEquals(content_hash['sha1'], id) + + @istest + def content_get_missing(self): + content = bytes('content_get_missing', 'utf8') + content_hash = hashutil.hashdata(content) + with self.assertRaises(ObjNotFoundError): + self.objstorage.content_get(content_hash['sha1']) + + @istest + def content_check_invalid(self): + content = bytes('content_check_invalid', 'utf8') + id = self.objstorage.content_add(content) + path = _obj_path(hashutil.hash_to_hex(id), + self.app.config['storage_base'], + self.app.config['storage_depth']) + content = list(content) + with open(path, 'bw') as f: + content[0] = (content[0] + 1) % 128 + f.write(bytes(content)) + with self.assertRaises(Error): + self.objstorage.content_check(id) + + @istest + def content_check_valid(self): + content = bytes('content_check_valid', 'utf8') + id = self.objstorage.content_add(content) + try: + self.objstorage.content_check(id) + except: + self.fail('Integrity check failed') + + @istest + def content_check_missing(self): + content = bytes('content_check_valid', 'utf8') + content_hash = hashutil.hashdata(content) + with self.assertRaises(ObjNotFoundError): + self.objstorage.content_check(content_hash['sha1'])