diff --git a/bin/swh-objstorage-add-dir b/bin/swh-objstorage-add-dir new file mode 100755 --- /dev/null +++ b/bin/swh-objstorage-add-dir @@ -0,0 +1,37 @@ +#!/usr/bin/python3 + +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +import os +import sys + +from swh.storage import objstorage + +if __name__ == '__main__': + try: + root_dir = sys.argv[1] + dirname = sys.argv[2] + except IndexError: + print("Usage: swh-objstorage-add-dir OBJ_STORAGE_DIR DATA_DIR") + sys.exit(1) + + logging.basicConfig(level=logging.INFO) + + objs = objstorage.ObjStorage(root_dir) + + dups = 0 + for root, _dirs, files in os.walk(dirname): + for name in files: + path = os.path.join(root, name) + with open(path, 'rb') as f: + try: + objs.add(f.read()) + except objstorage.DuplicateObjError: + dups += 1 + + if dups: + logging.info('skipped %d duplicate(s) file(s)' % dups) diff --git a/bin/swh-objstorage-fsck b/bin/swh-objstorage-fsck new file mode 100755 --- /dev/null +++ b/bin/swh-objstorage-fsck @@ -0,0 +1,28 @@ +#!/usr/bin/python3 + +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +import sys + +from swh.storage import objstorage + +if __name__ == '__main__': + try: + root_dir = sys.argv[1] + except IndexError: + print("Usage: swh-objstorage-add-dir OBJ_STORAGE_DIR") + sys.exit(1) + + logging.basicConfig(level=logging.INFO) + + objs = objstorage.ObjStorage(root_dir) + + for obj_id in objs: + try: + objs.check(obj_id) + except objstorage.Error as err: + logging.error(err) diff --git a/debian/changelog b/debian/changelog --- a/debian/changelog +++ b/debian/changelog @@ -1,4 +1,4 @@ - (0.0.1-1) unstable; urgency=low +swh-objstorage (0.0.1-1) unstable; urgency=low * diff --git a/debian/control b/debian/control --- a/debian/control +++ b/debian/control @@ -1,19 +1,21 @@ -Source: +Source: swh-objstorage Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-all, + python3-flask, python3-nose, + python3-requests, python3-setuptools, python3-swh.core, - python3-swh.storage, + python3-click, python3-vcversioner Standards-Version: 3.9.6 -Homepage: https://forge.softwareheritage.org/diffusion// +Homepage: https://forge.softwareheritage.org/diffusion/DOBJS/ -Package: python3- +Package: python3-swh.objstorage Architecture: all Depends: ${misc:Depends}, ${python3:Depends} -Description: Software Heritage +Description: Software Heritage Object Storage diff --git a/debian/rules b/debian/rules --- a/debian/rules +++ b/debian/rules @@ -1,6 +1,6 @@ #!/usr/bin/make -f -export PYBUILD_NAME= +export PYBUILD_NAME=swh-objstorage %: dh $@ --with python3 --buildsystem=pybuild diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,14 @@ # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html vcversioner + +# remote storage API client +requests + +# remote storage API server +flask + +# Internal dependencies +swh.core >= 0.0.20 + +click diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -16,13 +16,21 @@ # Edit this part to match your module # full sample: https://forge.softwareheritage.org/diffusion/DCORE/browse/master/setup.py setup( - name='swh.', - description='Software Heritage ', + name='swh.objstorage', + description='Software Heritage Object Storage', author='Software Heritage developers', author_email='swh-devel@inria.fr', - url='https://forge.softwareheritage.org/diffusion/', - packages=[], # packages's modules - scripts=[], # scripts to package + url='https://forge.softwareheritage.org/diffusion/DOBJS', + packages=[ + 'swh.objstorage', + 'swh.objstorage.api', + 'swh.objstorage.multiplexer', + 'swh.objstorage.multiplexer.filter' + ], # packages's modules + scripts=[ + 'bin/swh-objstorage-add-dir', + 'bin/swh-objstorage-fsck' + ], # scripts to package install_requires=parse_requirements(), setup_requires=['vcversioner'], vcversioner={}, diff --git a/swh/foo/bar.py b/swh/foo/bar.py deleted file mode 100644 --- a/swh/foo/bar.py +++ /dev/null @@ -1,4 +0,0 @@ -# Copyright (C) 2015 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/__init__.py @@ -0,0 +1,4 @@ +from .objstorage import ObjStorage +from .objstorage_pathslicing import PathSlicingObjStorage + +__all__ = [ObjStorage, PathSlicingObjStorage] diff --git a/swh/objstorage/api/__init__.py b/swh/objstorage/api/__init__.py new file mode 100644 diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/api/client.py @@ -0,0 +1,103 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +import pickle + +import requests + +from requests.exceptions import ConnectionError +from ..exc import ObjStorageAPIError +from .common import (decode_response, + encode_data_client as encode_data) + + +class RemoteObjStorage(): + """ Proxy to a remote object storage. + + This class allows to connect to an object storage server via + http protocol. + + Attributes: + base_url (string): The url of the server to connect. Must end + with a '/' + session: The session to send requests. + """ + def __init__(self, base_url): + self.base_url = base_url + self.session = requests.Session() + + def url(self, endpoint): + return '%s%s' % (self.base_url, endpoint) + + def post(self, endpoint, data): + try: + response = self.session.post( + self.url(endpoint), + data=encode_data(data), + headers={'content-type': 'application/x-msgpack'}, + ) + except ConnectionError as e: + print(str(e)) + raise ObjStorageAPIError(e) + + # XXX: this breaks language-independence and should be + # replaced by proper unserialization + if response.status_code == 400: + raise pickle.loads(decode_response(response)) + + return decode_response(response) + + def content_add(self, bytes, obj_id=None): + """ Add a new object to the object storage. + + Args: + bytes: content of the object to be added to the storage. + obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When + given, obj_id will be trusted to match bytes. If missing, + obj_id will be computed on the fly. + + """ + return self.post('content/add', {'bytes': bytes, 'obj_id': obj_id}) + + def content_get(self, obj_id): + """ Retrieve the content of a given object. + + Args: + obj_id: The id of the object. + + Returns: + The content of the requested objects as bytes. + + Raises: + ObjNotFoundError: if the requested object is missing + """ + return self.post('content/get', {'obj_id': obj_id}) + + def content_get_random(self, batch_size): + """ Retrieve a random sample of existing content. + + Args: + batch_size: Number of content requested. + + Returns: + A list of random ids that represents existing contents. + """ + return self.post('content/get/random', {'batch_size': batch_size}) + + def content_check(self, obj_id): + """ Integrity check for a given object + + verify that the file object is in place, and that the gzipped content + matches the object id + + Args: + obj_id: The id of the object. + + Raises: + ObjNotFoundError: if the requested object is missing + Error: if the requested object is corrupt + """ + self.post('content/check', {'obj_id': obj_id}) diff --git a/swh/objstorage/api/common.py b/swh/objstorage/api/common.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/api/common.py @@ -0,0 +1,69 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +import pickle + +from flask import Request, Response + +from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder + + +class BytesRequest(Request): + """Request with proper escaping of arbitrary byte sequences.""" + encoding = 'utf-8' + encoding_errors = 'surrogateescape' + + +def encode_data_server(data): + return Response( + msgpack_dumps(data), + mimetype='application/x-msgpack', + ) + + +def encode_data_client(data): + try: + return msgpack_dumps(data) + except OverflowError as e: + raise ValueError('Limits were reached. Please, check your input.\n' + + str(e)) + + +def decode_request(request): + content_type = request.mimetype + data = request.get_data() + + if content_type == 'application/x-msgpack': + r = msgpack_loads(data) + elif content_type == 'application/json': + r = json.loads(data, cls=SWHJSONDecoder) + else: + raise ValueError('Wrong content type `%s` for API request' + % content_type) + + return r + + +def decode_response(response): + content_type = response.headers['content-type'] + + if content_type.startswith('application/x-msgpack'): + r = msgpack_loads(response.content) + elif content_type.startswith('application/json'): + r = response.json(cls=SWHJSONDecoder) + else: + raise ValueError('Wrong content type `%s` for API response' + % content_type) + + return r + + +def error_handler(exception, encoder): + # XXX: this breaks language-independence and should be + # replaced by proper serialization of errors + response = encoder(pickle.dumps(exception)) + response.status_code = 400 + return response diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/api/server.py @@ -0,0 +1,97 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import click +import logging + +from flask import Flask, g, request + +from swh.core import config +from swh.objstorage import PathSlicingObjStorage + +from swh.objstorage.api.common import (BytesRequest, decode_request, + error_handler, + encode_data_server as encode_data) + +DEFAULT_CONFIG = { + 'storage_base': ('str', '/tmp/swh-storage/objects/'), + 'storage_slicing': ('str', '0:2/2:4/4:6') +} + +app = Flask(__name__) +app.request_class = BytesRequest + + +@app.errorhandler(Exception) +def my_error_handler(exception): + return error_handler(exception, encode_data) + + +@app.before_request +def before_request(): + g.objstorage = PathSlicingObjStorage(app.config['storage_base'], + app.config['storage_slicing']) + + +@app.route('/') +def index(): + return "SWH Objstorage API server" + + +@app.route('/content') +def content(): + return str(list(g.storage)) + + +@app.route('/content/add', methods=['POST']) +def add_bytes(): + return encode_data(g.objstorage.add(**decode_request(request))) + + +@app.route('/content/get', methods=['POST']) +def get_bytes(): + return encode_data(g.objstorage.get(**decode_request(request))) + + +@app.route('/content/get/random', methods=['POST']) +def get_random_contents(): + return encode_data( + g.objstorage.get_random(**decode_request(request)) + ) + + +@app.route('/content/check', methods=['POST']) +def check(): + return encode_data(g.objstorage.check(**decode_request(request))) + + +def run_from_webserver(environ, start_response): + """Run the WSGI app from the webserver, loading the configuration. + + """ + config_path = '/etc/softwareheritage/storage/objstorage.ini' + + app.config.update(config.read(config_path, DEFAULT_CONFIG)) + + handler = logging.StreamHandler() + app.logger.addHandler(handler) + + return app(environ, start_response) + + +@click.command() +@click.argument('config-path', required=1) +@click.option('--host', default='0.0.0.0', help="Host to run the server") +@click.option('--port', default=5000, type=click.INT, + help="Binding port of the server") +@click.option('--debug/--nodebug', default=True, + help="Indicates if the server should run in debug mode") +def launch(config_path, host, port, debug): + app.config.update(config.read(config_path, DEFAULT_CONFIG)) + app.run(host, port=int(port), debug=bool(debug)) + + +if __name__ == '__main__': + launch() diff --git a/swh/objstorage/checker.py b/swh/objstorage/checker.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/checker.py @@ -0,0 +1,172 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import click +import logging + +from swh.core import config, hashutil +from swh.storage import get_storage + +from .objstorage_pathslicing import PathSlicingObjStorage +from .exc import ObjNotFoundError, Error + +DEFAULT_CONFIG = { + 'storage_path': ('str', '/srv/softwareheritage/objects'), + 'storage_depth': ('int', 3), + 'backup_url': ('str', 'http://uffizi:5002/'), + + 'batch_size': ('int', 1000), +} + + +class ContentChecker(): + """ Content integrity checker that will check local objstorage content. + + The checker will check the data of an object storage in order to verify + that no file have been corrupted. + + Attributes: + config: dictionary that contains this + checker configuration + objstorage (ObjStorage): Local object storage that will be checked. + master_storage (RemoteStorage): A distant storage that will be used to + restore corrupted content. + """ + + def __init__(self, config, root, slicing, backup_urls): + """ Create a checker that ensure the objstorage have no corrupted file. + + Args: + config (dict): Dictionary that contains the following keys : + batch_size: Number of content that should be tested each + time the content checker runs. + root (string): Path to the objstorage directory + depth (int): Depth of the object storage. + backup_urls: List of url that can be contacted in order to + get a content. + """ + self.config = config + self.objstorage = PathSlicingObjStorage(root, slicing) + self.backup_storages = [get_storage('remote_storage', [backup_url]) + for backup_url in backup_urls] + + def run(self): + """ Start the check routine + """ + corrupted_contents = [] + batch_size = self.config['batch_size'] + + for content_id in self.get_content_to_check(batch_size): + if not self.check_content(content_id): + corrupted_contents.append(content_id) + logging.error('The content', content_id, 'have been corrupted') + + self.repair_contents(corrupted_contents) + + def run_as_daemon(self): + """ Start the check routine and perform it forever. + + Use this method to run the checker when it's done as a daemon that + will iterate over the content forever in background. + """ + while True: + try: + self.run() + except Exception as e: + logging.error('An error occured while verifing the content: %s' + % e) + + def get_content_to_check(self, batch_size): + """ Get the content that should be verified. + + Returns: + An iterable of the content's id that need to be checked. + """ + contents = self.objstorage.get_random_contents(batch_size) + yield from contents + + def check_content(self, content_id): + """ Check the validity of the given content. + + Returns: + True if the content was valid, false if it was corrupted. + """ + try: + self.objstorage.check(content_id) + except (ObjNotFoundError, Error) as e: + logging.warning(e) + return False + else: + return True + + def repair_contents(self, content_ids): + """ Try to restore the given contents. + + Ask the backup storages for the contents that are corrupted on + the local object storage. + If the first storage does not contain the missing contents, send + a request to the second one with only the content that couldn't be + retrieved, and so on until there is no remaining content or servers. + + If a content couldn't be retrieved on all the servers, then log it as + an error. + """ + contents_to_get = set(content_ids) + # Iterates over the backup storages. + for backup_storage in self.backup_storages: + # Try to get all the contents that still need to be retrieved. + contents = backup_storage.content_get(list(contents_to_get)) + for content in contents: + if content: + hash = content['sha1'] + data = content['data'] + # When a content is retrieved, remove it from the set + # of needed contents. + contents_to_get.discard(hash) + self.objstorage.restore(data) + + # Contents still in contents_to_get couldn't be retrieved. + if contents_to_get: + logging.error( + "Some corrupted contents could not be retrieved : %s" + % [hashutil.hash_to_hex(id) for id in contents_to_get] + ) + + +@click.command() +@click.argument('config-path', required=1) +@click.option('--storage-path', default=DEFAULT_CONFIG['storage_path'][1], + help='Path to the storage to verify') +@click.option('--depth', default=DEFAULT_CONFIG['storage_depth'][1], + type=click.INT, help='Depth of the object storage') +@click.option('--backup-url', default=DEFAULT_CONFIG['backup_url'][1], + help='Url of a remote storage to retrieve corrupted content') +@click.option('--daemon/--nodaemon', default=True, + help='Indicates if the checker should run forever ' + 'or on a single batch of content') +def launch(config_path, storage_path, depth, backup_url, is_daemon): + # The configuration have following priority : + # command line > file config > default config + cl_config = { + 'storage_path': storage_path, + 'storage_depth': depth, + 'backup_url': backup_url + } + conf = config.read(config_path, DEFAULT_CONFIG) + conf.update(cl_config) + # Create the checker and run + checker = ContentChecker( + {'batch_size': conf['batch_size']}, + conf['storage_path'], + conf['storage_depth'], + map(lambda x: x.strip(), conf['backup_url'].split(',')) + ) + if is_daemon: + checker.run_as_daemon() + else: + checker.run() + +if __name__ == '__main__': + launch() diff --git a/swh/objstorage/exc.py b/swh/objstorage/exc.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/exc.py @@ -0,0 +1,25 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +class Error(Exception): + + def __str__(self): + return 'storage error on object: %s' % self.args + + +class ObjNotFoundError(Error): + + def __str__(self): + return 'object not found: %s' % self.args + + +class ObjStorageAPIError(Exception): + """ Specific internal exception of an object storage (mainly connection). + """ + + def __str__(self): + args = self.args + return 'An unexpected error occurred in the api backend: %s' % args diff --git a/swh/objstorage/multiplexer/__init__.py b/swh/objstorage/multiplexer/__init__.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/multiplexer/__init__.py @@ -0,0 +1,4 @@ +from .multiplexer_objstorage import MultiplexerObjStorage + + +__all__ = [MultiplexerObjStorage] diff --git a/swh/objstorage/multiplexer/filter/__init__.py b/swh/objstorage/multiplexer/filter/__init__.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/multiplexer/filter/__init__.py @@ -0,0 +1,98 @@ +import functools + +from .read_write_filter import ReadObjStorageFilter +from .id_filter import RegexIdObjStorageFilter, PrefixIdObjStorageFilter + + +_FILTERS_CLASSES = { + 'readonly': ReadObjStorageFilter, + 'regex': RegexIdObjStorageFilter, + 'prefix': PrefixIdObjStorageFilter +} + + +_FILTERS_PRIORITY = { + 'readonly': 0, + 'prefix': 1, + 'regex': 2 +} + + +def read_only(): + return {'type': 'readonly'} + + +def id_prefix(prefix): + return {'type': 'prefix', 'prefix': prefix} + + +def id_regex(regex): + return {'type': 'regex', 'regex': regex} + + +def _filter_priority(self, filter_type): + """ Get the priority of this filter. + + Priority is a value that indicates if the operation of the + filter is time-consuming (smaller values means quick execution), + or very likely to be almost always the same value (False being small, + and True high). + + In case the filters are chained, they will be ordered in a way that + small priorities (quick execution or instantly break the chain) are + executed first. + + Default value is 1. Value 0 is recommended for storages that change + behavior only by disabling some operations (making the method return + None). + """ + return _FILTERS_PRIORITY.get(filter_type, 1) + + +def add_filter(storage, filter_conf): + """ Add a filter to the given storage. + + Args: + storage (ObjStorage): storage which will be filtered. + filter_conf (dict): configuration of an ObjStorageFilter, given as + a dictionnary that contains the keys: + - type: which represent the type of filter, one of the keys + of FILTERS + - Every arguments that this type of filter require. + + Returns: + A filtered storage that perform only the valid operations. + """ + type = filter_conf['type'] + args = {k: v for k, v in filter_conf.items() if k is not 'type'} + filter = _FILTERS_CLASSES[type](storage=storage, **args) + return filter + + +def add_filters(storage, *filter_confs): + """ Add multiple filters to the given storage. + + (See filter.add_filter) + + Args: + storage (ObjStorage): storage which will be filtered. + filter_confs (list): any number of filter conf, as a dict with: + - type: which represent the type of filter, one of the keys of + FILTERS. + - Every arguments that this type of filter require. + + Returns: + A filtered storage that fulfill the requirement of all the given + filters. + """ + # Reverse sorting in order to put the filter with biggest priority first. + filter_confs.sort(key=lambda conf: _filter_priority(conf['type']), + reverse=True) + + # Add the bigest filter to the storage, and reduce it to accumulate filters + # on top of it, until the smallest (fastest, see filter.filter_priority) is + # added. + return functools.reduce( + lambda stor, conf: add_filter(stor, conf), + [storage] + filter_confs + ) diff --git a/swh/objstorage/multiplexer/filter/filter.py b/swh/objstorage/multiplexer/filter/filter.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/multiplexer/filter/filter.py @@ -0,0 +1,48 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from ...objstorage import ObjStorage + + +class ObjStorageFilter(ObjStorage): + """ Base implementation of a filter that allow inputs on ObjStorage or not + + This class copy the API of ...objstorage in order to filter the inputs + of this class. + If the operation is allowed, return the result of this operation + applied to the destination implementation. Otherwise, just return + without any operation. + + This class is an abstract base class for a classic read/write storage. + Filters can inherit from it and only redefine some methods in order + to change behavior. + """ + + def __init__(self, storage): + self.storage = storage + + def __contains__(self, *args, **kwargs): + return self.storage.__contains__(*args, **kwargs) + + def __iter__(self): + return self.storage.__iter__() + + def __len__(self): + return self.storage.__len__() + + def add(self, *args, **kwargs): + return self.storage.add(*args, **kwargs) + + def restore(self, *args, **kwargs): + return self.storage.restore(*args, **kwargs) + + def get(self, *args, **kwargs): + return self.storage.get(*args, **kwargs) + + def check(self, *args, **kwargs): + return self.storage.check(*args, **kwargs) + + def get_random(self, *args, **kwargs): + return self.storage.get_random(*args, **kwargs) diff --git a/swh/objstorage/multiplexer/filter/id_filter.py b/swh/objstorage/multiplexer/filter/id_filter.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/multiplexer/filter/id_filter.py @@ -0,0 +1,99 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import re + +from swh.core import hashutil + +from .filter import ObjStorageFilter +from ...objstorage import ID_HASH_ALGO +from ...exc import ObjNotFoundError + + +def compute_hash(bytes): + """ Compute the hash of the given content. + """ + # Checksum is missing, compute it on the fly. + h = hashutil._new_hash(ID_HASH_ALGO, len(bytes)) + h.update(bytes) + return h.digest() + + +class IdObjStorageFilter(ObjStorageFilter): + """ Filter that only allow operations if the object id match a requirement. + + Even for read operations, check before if the id match the requirements. + This may prevent for unnecesary disk access. + """ + + def is_valid(self, obj_id): + """ Indicates if the given id is valid. + """ + raise NotImplementedError('Implementations of an IdObjStorageFilter ' + 'must have a "is_valid" method') + + def __contains__(self, obj_id, *args, **kwargs): + if self.is_valid(obj_id): + return self.storage.__contains__(*args, obj_id=obj_id, **kwargs) + return False + + def __len__(self): + return sum(1 for i in [id for id in self.storage if self.is_valid(id)]) + + def __iter__(self): + yield from filter(lambda id: self.is_valid(id), iter(self.storage)) + + def add(self, content, obj_id=None, check_presence=True, *args, **kwargs): + if obj_id is None: + obj_id = compute_hash(content) + if self.is_valid(obj_id): + return self.storage.add(content, *args, obj_id=obj_id, **kwargs) + + def restore(self, content, obj_id=None, *args, **kwargs): + if obj_id is None: + obj_id = compute_hash(content) + if self.is_valid(obj_id): + return self.storage.restore(content, *args, + obj_id=obj_id, **kwargs) + + def get(self, obj_id, *args, **kwargs): + if self.is_valid(obj_id): + return self.storage.get(*args, obj_id=obj_id, **kwargs) + raise ObjNotFoundError(obj_id) + + def check(self, obj_id, *args, **kwargs): + if self.is_valid(obj_id): + return self.storage.check(*args, obj_id=obj_id, **kwargs) + raise ObjNotFoundError(obj_id) + + def get_random(self, *args, **kwargs): + yield from filter(lambda id: self.is_valid(id), + self.storage.get_random(*args, **kwargs)) + + +class RegexIdObjStorageFilter(IdObjStorageFilter): + """ Filter that allow operations if the content's id as hex match a regex. + """ + + def __init__(self, storage, regex): + super().__init__(storage) + self.regex = re.compile(regex) + + def is_valid(self, obj_id): + hex_obj_id = hashutil.hash_to_hex(obj_id) + return self.regex.match(hex_obj_id) is not None + + +class PrefixIdObjStorageFilter(IdObjStorageFilter): + """ Filter that allow operations if the hexlified id have a given prefix. + """ + + def __init__(self, storage, prefix): + super().__init__(storage) + self.prefix = str(prefix) + + def is_valid(self, obj_id): + hex_obj_id = hashutil.hash_to_hex(obj_id) + return str(hex_obj_id).startswith(self.prefix) diff --git a/swh/objstorage/multiplexer/filter/read_write_filter.py b/swh/objstorage/multiplexer/filter/read_write_filter.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/multiplexer/filter/read_write_filter.py @@ -0,0 +1,17 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from .filter import ObjStorageFilter + + +class ReadObjStorageFilter(ObjStorageFilter): + """ Filter that disable write operation of the storage. + """ + + def add(self, *args, **kwargs): + return + + def restore(self, *args, **kwargs): + return diff --git a/swh/objstorage/multiplexer/multiplexer_objstorage.py b/swh/objstorage/multiplexer/multiplexer_objstorage.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/multiplexer/multiplexer_objstorage.py @@ -0,0 +1,194 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import random + +from ..objstorage import ObjStorage +from ..exc import ObjNotFoundError + + +class MultiplexerObjStorage(ObjStorage): + """ Implementation of ObjStorage that distribute between multiple storages + + The multiplexer object storage allows an input to be demultiplexed + among multiple storages that will or will not accept it by themselves + (see .filter package). + + As the ids can be differents, no pre-computed ids should be submitted. + Also, there are no guarantees that the returned ids can be used directly + into the storages that the multiplexer manage. + + + Use case examples could be: + Example 1: + storage_v1 = filter.read_only(PathSlicingObjStorage('/dir1', + '0:2/2:4/4:6')) + storage_v2 = PathSlicingObjStorage('/dir2', '0:1/0:5') + storage = MultiplexerObjStorage([storage_v1, storage_v2]) + + When using 'storage', all the new contents will only be added to the v2 + storage, while it will be retrievable from both. + + Example 2: + storage_v1 = filter.id_regex( + PathSlicingObjStorage('/dir1', '0:2/2:4/4:6'), + r'[^012].*' + ) + storage_v2 = filter.if_regex( + PathSlicingObjStorage('/dir2', '0:1/0:5'), + r'[012]/*' + ) + storage = MultiplexerObjStorage([storage_v1, storage_v2]) + + When using this storage, the contents with a sha1 starting with 0, 1 or + 2 will be redirected (read AND write) to the storage_v2, while the + others will be redirected to the storage_v1. + If a content starting with 0, 1 or 2 is present in the storage_v1, it + would be ignored anyway. + """ + + def __init__(self, storages): + self.storages = storages + + def __contains__(self, obj_id): + for storage in self.storages: + if obj_id in storage: + return True + return False + + def __iter__(self): + for storage in self.storages: + yield from storage + + def __len__(self): + """ Returns the number of files in the storage. + + Warning: Multiple files may represent the same content, so this method + does not indicate how many different contents are in the storage. + """ + return sum(map(len, self.storages)) + + def add(self, content, obj_id=None, check_presence=True): + """ Add a new object to the object storage. + + If the adding step works in all the storages that accept this content, + this is a success. Otherwise, the full adding step is an error even if + it succeed in some of the storages. + + Args: + content: content of the object to be added to the storage. + obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When + given, obj_id will be trusted to match the bytes. If missing, + obj_id will be computed on the fly. + check_presence: indicate if the presence of the content should be + verified before adding the file. + + Returns: + an id of the object into the storage. As the write-storages are + always readable as well, any id will be valid to retrieve a + content. + """ + return [storage.add(content, obj_id, check_presence) + for storage in self.storages].pop() + + def restore(self, content, obj_id=None): + """ Restore a content that have been corrupted. + + This function is identical to add_bytes but does not check if + the object id is already in the file system. + + (see "add" method) + + Args: + content: content of the object to be added to the storage + obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When + given, obj_id will be trusted to match bytes. If missing, + obj_id will be computed on the fly. + + Returns: + an id of the object into the storage. As the write-storages are + always readable as well, any id will be valid to retrieve a + content. + """ + return [storage.restore(content, obj_id) + for storage in self.storages].pop() + + def get(self, obj_id): + """ Retrieve the content of a given object. + + Args: + obj_id: object id. + + Returns: + the content of the requested object as bytes. + + Raises: + ObjNotFoundError: if the requested object is missing. + """ + for storage in self.storages: + try: + return storage.get(obj_id) + except ObjNotFoundError: + continue + # If no storage contains this content, raise the error + raise ObjNotFoundError(obj_id) + + def check(self, obj_id): + """ Perform an integrity check for a given object. + + Verify that the file object is in place and that the gziped content + matches the object id. + + Args: + obj_id: object id. + + Raises: + ObjNotFoundError: if the requested object is missing. + Error: if the request object is corrupted. + """ + nb_present = 0 + for storage in self.storages: + try: + storage.check(obj_id) + except ObjNotFoundError: + continue + else: + nb_present += 1 + # If there is an Error because of a corrupted file, then let it pass. + + # Raise the ObjNotFoundError only if the content coulnd't be found in + # all the storages. + if nb_present == 0: + raise ObjNotFoundError(obj_id) + + def get_random(self, batch_size): + """ Get random ids of existing contents + + This method is used in order to get random ids to perform + content integrity verifications on random contents. + + Attributes: + batch_size (int): Number of ids that will be given + + Yields: + An iterable of ids of contents that are in the current object + storage. + """ + storages_set = [storage for storage in self.storages + if len(storage) > 0] + if len(storages_set) <= 0: + return [] + + while storages_set: + storage = random.choice(storages_set) + try: + return storage.get_random(batch_size) + except NotImplementedError: + storages_set.remove(storage) + # There is no storage that allow the get_random operation + raise NotImplementedError( + "There is no storage implementation into the multiplexer that " + "support the 'get_random' operation" + ) diff --git a/swh/objstorage/objstorage.py b/swh/objstorage/objstorage.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/objstorage.py @@ -0,0 +1,119 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +ID_HASH_ALGO = 'sha1' +ID_HASH_LENGTH = 40 # Size in bytes of the hash hexadecimal representation. + + +class ObjStorage(): + """ High-level API to manipulate the Software Heritage object storage. + + Conceptually, the object storage offers 5 methods: + + - __contains__() check if an object is present, by object id + - add() add a new object, returning an object id + - restore() same as add() but erase an already existed content + - get() retrieve the content of an object, by object id + - check() check the integrity of an object, by object id + + And some management methods: + + - get_random() get random object id of existing contents (used for the + content integrity checker). + + Each implementation of this interface can have a different behavior and + its own way to store the contents. + """ + + def __contains__(self, *args, **kwargs): + raise NotImplementedError( + "Implementations of ObjStorage must have a '__contains__' method" + ) + + def add(self, content, obj_id=None, check_presence=True, *args, **kwargs): + """ Add a new object to the object storage. + + Args: + content: content of the object to be added to the storage. + obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When + given, obj_id will be trusted to match the bytes. If missing, + obj_id will be computed on the fly. + check_presence: indicate if the presence of the content should be + verified before adding the file. + + Returns: + the id of the object into the storage. + """ + raise NotImplementedError( + "Implementations of ObjStorage must have a 'add' method" + ) + + def restore(self, content, obj_id=None, *args, **kwargs): + """ Restore a content that have been corrupted. + + This function is identical to add_bytes but does not check if + the object id is already in the file system. + + Args: + content: content of the object to be added to the storage + obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When + given, obj_id will be trusted to match bytes. If missing, + obj_id will be computed on the fly. + """ + raise NotImplemented( + "Implementations of ObjStorage must have a 'restore' method" + ) + + def get(self, obj_id, *args, **kwargs): + """ Retrieve the content of a given object. + + Args: + obj_id: object id. + + Returns: + the content of the requested object as bytes. + + Raises: + ObjNotFoundError: if the requested object is missing. + """ + raise NotImplementedError( + "Implementations of ObjStorage must have a 'get' method" + ) + + def check(self, obj_id, *args, **kwargs): + """ Perform an integrity check for a given object. + + Verify that the file object is in place and that the gziped content + matches the object id. + + Args: + obj_id: object id. + + Raises: + ObjNotFoundError: if the requested object is missing. + Error: if the request object is corrupted. + """ + raise NotImplementedError( + "Implementations of ObjStorage must have a 'check' method" + ) + + def get_random(self, batch_size, *args, **kwargs): + """ Get random ids of existing contents + + This method is used in order to get random ids to perform + content integrity verifications on random contents. + + Attributes: + batch_size (int): Number of ids that will be given + + Yields: + An iterable of ids of contents that are in the current object + storage. + """ + raise NotImplementedError( + "The current implementation of ObjStorage does not support " + "'get_random' operation" + ) diff --git a/swh/objstorage/objstorage_pathslicing.py b/swh/objstorage/objstorage_pathslicing.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/objstorage_pathslicing.py @@ -0,0 +1,347 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +import gzip +import tempfile +import random + +from contextlib import contextmanager + +from swh.core import hashutil + +from .objstorage import ObjStorage, ID_HASH_ALGO, ID_HASH_LENGTH +from .exc import ObjNotFoundError, Error + + +GZIP_BUFSIZ = 1048576 + +DIR_MODE = 0o755 +FILE_MODE = 0o644 + + +@contextmanager +def _write_obj_file(hex_obj_id, objstorage): + """ Context manager for writing object files to the object storage. + + During writing, data are written to a temporary file, which is atomically + renamed to the right file name after closing. This context manager also + takes care of (gzip) compressing the data on the fly. + + Usage sample: + with _write_obj_file(hex_obj_id, objstorage): + f.write(obj_data) + + Yields: + a file-like object open for writing bytes. + """ + # Get the final paths and create the directory if absent. + dir = objstorage._obj_dir(hex_obj_id) + if not os.path.isdir(dir): + os.makedirs(dir, DIR_MODE, exist_ok=True) + path = os.path.join(dir, hex_obj_id) + + # Create a temporary file. + (tmp, tmp_path) = tempfile.mkstemp(suffix='.tmp', prefix='hex_obj_id.', + dir=dir) + + # Open the file and yield it for writing. + tmp_f = os.fdopen(tmp, 'wb') + with gzip.GzipFile(filename=tmp_path, fileobj=tmp_f) as f: + yield f + + # Then close the temporary file and move it to the right directory. + tmp_f.close() + os.chmod(tmp_path, FILE_MODE) + os.rename(tmp_path, path) + + +@contextmanager +def _read_obj_file(hex_obj_id, objstorage): + """ Context manager for reading object file in the object storage. + + Usage sample: + with _read_obj_file(hex_obj_id, objstorage) as f: + b = f.read() + + Yields: + a file-like object open for reading bytes. + """ + path = objstorage._obj_path(hex_obj_id) + with gzip.GzipFile(path, 'rb') as f: + yield f + + +class PathSlicingObjStorage(ObjStorage): + """ Implementation of the ObjStorage API based on the hash of the content. + + On disk, an object storage is a directory tree containing files named after + their object IDs. An object ID is a checksum of its content, depending on + the value of the ID_HASH_ALGO constant (see hashutil for its meaning). + + To avoid directories that contain too many files, the object storage has a + given slicing. Each slicing correspond to a directory that is named + according to the hash of its content. + + So for instance a file with SHA1 34973274ccef6ab4dfaaf86599792fa9c3fe4689 + will be stored in the given object storages : + + - 0:2/2:4/4:6 : 34/97/32/34973274ccef6ab4dfaaf86599792fa9c3fe4689 + - 0:1/0:5/ : 3/34973/34973274ccef6ab4dfaaf86599792fa9c3fe4689 + + The files in the storage are stored in gzipped compressed format. + + Attributes: + root (string): path to the root directory of the storage on the disk. + bounds: list of tuples that indicates the beginning and the end of + each subdirectory for a content. + """ + + def __init__(self, root, slicing): + """ Create an object to access a hash-slicing based object storage. + + Args: + root (string): path to the root directory of the storage on + the disk. + slicing (string): string that indicates the slicing to perform + on the hash of the content to know the path where it should + be stored. + """ + if not os.path.isdir(root): + raise ValueError( + 'PathSlicingObjStorage root "%s" is not a directory' % root + ) + + self.root = root + # Make a list of tuples where each tuple contains the beginning + # and the end of each slicing. + self.bounds = [ + slice(*map(int, sbounds.split(':'))) + for sbounds in slicing.split('/') + if sbounds + ] + + max_endchar = max(map(lambda bound: bound.stop, self.bounds)) + if ID_HASH_LENGTH < max_endchar: + raise ValueError( + 'Algorithm %s has too short hash for slicing to char %d' + % (ID_HASH_ALGO, max_endchar) + ) + + def __contains__(self, obj_id): + """ Check whether the given object is present in the storage or not. + + Returns: + True iff the object is present in the storage. + """ + hex_obj_id = hashutil.hash_to_hex(obj_id) + return os.path.exists(self._obj_path(hex_obj_id)) + + def __iter__(self): + """iterate over the object identifiers currently available in the storage + + Warning: with the current implementation of the object storage, this + method will walk the filesystem to list objects, meaning that listing + all objects will be very slow for large storages. You almost certainly + don't want to use this method in production. + + Return: + iterator over object IDs + """ + def obj_iterator(): + # XXX hackish: it does not verify that the depth of found files + # matches the slicing depth of the storage + for root, _dirs, files in os.walk(self.root): + for f in files: + yield bytes.fromhex(f) + + return obj_iterator() + + def __len__(self): + """compute the number of objects available in the storage + + Warning: this currently uses `__iter__`, its warning about bad + performances applies + + Return: + number of objects contained in the storage + + """ + return sum(1 for i in self) + + def _obj_dir(self, hex_obj_id): + """ Compute the storage directory of an object. + + See also: PathSlicingObjStorage::_obj_path + + Args: + hex_obj_id: object id as hexlified string. + + Returns: + Path to the directory that contains the required object. + """ + slices = [hex_obj_id[bound] for bound in self.bounds] + return os.path.join(self.root, *slices) + + def _obj_path(self, hex_obj_id): + """ Compute the full path to an object into the current storage. + + See also: PathSlicingObjStorage::_obj_dir + + Args: + hex_obj_id: object id as hexlified string. + + Returns: + Path to the actual object corresponding to the given id. + """ + return os.path.join(self._obj_dir(hex_obj_id), hex_obj_id) + + def add(self, bytes, obj_id=None, check_presence=True): + """ Add a new object to the object storage. + + Args: + bytes: content of the object to be added to the storage. + obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When + given, obj_id will be trusted to match the bytes. If missing, + obj_id will be computed on the fly. + check_presence: indicate if the presence of the content should be + verified before adding the file. + + Returns: + the id of the object into the storage. + """ + if obj_id is None: + # Checksum is missing, compute it on the fly. + h = hashutil._new_hash(ID_HASH_ALGO, len(bytes)) + h.update(bytes) + obj_id = h.digest() + + if check_presence and obj_id in self: + # If the object is already present, return immediatly. + return obj_id + + hex_obj_id = hashutil.hash_to_hex(obj_id) + with _write_obj_file(hex_obj_id, self) as f: + f.write(bytes) + + return obj_id + + def restore(self, bytes, obj_id=None): + """ Restore a content that have been corrupted. + + This function is identical to add_bytes but does not check if + the object id is already in the file system. + + Args: + bytes: content of the object to be added to the storage + obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When + given, obj_id will be trusted to match bytes. If missing, + obj_id will be computed on the fly. + """ + return self.add(bytes, obj_id, check_presence=False) + + def get(self, obj_id): + """ Retrieve the content of a given object. + + Args: + obj_id: object id. + + Returns: + the content of the requested object as bytes. + + Raises: + ObjNotFoundError: if the requested object is missing. + """ + if obj_id not in self: + raise ObjNotFoundError(obj_id) + + # Open the file and return its content as bytes + hex_obj_id = hashutil.hash_to_hex(obj_id) + with _read_obj_file(hex_obj_id, self) as f: + return f.read() + + def check(self, obj_id): + """ Perform an integrity check for a given object. + + Verify that the file object is in place and that the gziped content + matches the object id. + + Args: + obj_id: object id. + + Raises: + ObjNotFoundError: if the requested object is missing. + Error: if the request object is corrupted. + """ + if obj_id not in self: + raise ObjNotFoundError(obj_id) + + hex_obj_id = hashutil.hash_to_hex(obj_id) + + try: + with gzip.open(self._obj_path(hex_obj_id)) as f: + length = None + if ID_HASH_ALGO.endswith('_git'): + # if the hashing algorithm is git-like, we need to know the + # content size to hash on the fly. Do a first pass here to + # compute the size + length = 0 + while True: + chunk = f.read(GZIP_BUFSIZ) + length += len(chunk) + if not chunk: + break + f.rewind() + + checksums = hashutil._hash_file_obj(f, length, + algorithms=[ID_HASH_ALGO]) + actual_obj_id = checksums[ID_HASH_ALGO] + if obj_id != actual_obj_id: + raise Error( + 'Corrupt object %s should have id %s' + % (hashutil.hash_to_hex(obj_id), + hashutil.hash_to_hex(actual_obj_id)) + ) + except (OSError, IOError): + # IOError is for compatibility with older python versions + raise Error('Corrupt object %s is not a gzip file' % obj_id) + + def get_random(self, batch_size): + """ Get random ids of existing contents + + This method is used in order to get random ids to perform + content integrity verifications on random contents. + + Attributes: + batch_size (int): Number of ids that will be given + + Yields: + An iterable of ids of contents that are in the current object + storage. + """ + def get_random_content(self, batch_size): + """ Get a batch of content inside a single directory. + + Returns: + a tuple (batch size, batch). + """ + dirs = [] + for level in range(len(self.bounds)): + path = os.path.join(self.root, *dirs) + dir_list = next(os.walk(path))[1] + if 'tmp' in dir_list: + dir_list.remove('tmp') + dirs.append(random.choice(dir_list)) + + path = os.path.join(self.root, *dirs) + content_list = next(os.walk(path))[2] + length = min(batch_size, len(content_list)) + return length, map(hashutil.hex_to_hash, + random.sample(content_list, length)) + + while batch_size: + length, it = get_random_content(self, batch_size) + batch_size = batch_size - length + yield from it diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/objstorage_testing.py @@ -0,0 +1,70 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from nose.tools import istest + +from swh.core import hashutil +from swh.objstorage import exc + + +class ObjStorageTestFixture(): + + def setUp(self): + super().setUp() + + def hash_content(self, content): + obj_id = hashutil.hashdata(content)['sha1'] + return content, obj_id + + def assertContentMatch(self, obj_id, expected_content): + content = self.storage.get(obj_id) + self.assertEqual(content, expected_content) + + @istest + def add_get_w_id(self): + content, obj_id = self.hash_content(b'add_get_w_id') + r = self.storage.add(content, obj_id=obj_id) + self.assertEqual(obj_id, r) + self.assertContentMatch(obj_id, content) + + @istest + def add_get_wo_id(self): + content, obj_id = self.hash_content(b'add_get_wo_id') + r = self.storage.add(content) + self.assertEqual(obj_id, r) + self.assertContentMatch(obj_id, content) + + @istest + def restore_content(self): + valid_content, valid_obj_id = self.hash_content(b'restore_content') + invalid_content = b'unexpected content' + id_adding = self.storage.add(invalid_content, valid_obj_id) + id_restore = self.storage.restore(valid_content) + # Adding a false content then restore it to the right one and + # then perform a verification should result in a successful check. + self.assertEqual(id_adding, valid_obj_id) + self.assertEqual(id_restore, valid_obj_id) + self.assertContentMatch(valid_obj_id, valid_content) + + @istest + def get_missing(self): + content, obj_id = self.hash_content(b'get_missing') + with self.assertRaises(exc.Error): + self.storage.get(obj_id) + + @istest + def check_missing(self): + content, obj_id = self.hash_content(b'check_missing') + with self.assertRaises(exc.Error): + self.storage.check(obj_id) + + @istest + def check_present(self): + content, obj_id = self.hash_content(b'check_missing') + self.storage.add(content) + try: + self.storage.check(obj_id) + except: + self.fail('Integrity check failed') diff --git a/swh/objstorage/tests/server_testing.py b/swh/objstorage/tests/server_testing.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/server_testing.py @@ -0,0 +1,80 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import multiprocessing +import socket +import time + +from urllib.request import urlopen + + +class ServerTestFixture(): + """ Base class for http client/server testing. + + Mix this in a test class in order to have access to an http flask + server running in background. + + Note that the subclass should define a dictionary in self.config + that contains the flask server config. + And a flask application in self.app that corresponds to the type of + server the tested client needs. + + To ensure test isolation, each test will run in a different server + and a different repertory. + + In order to correctly work, the subclass must call the parents class's + setUp() and tearDown() methods. + """ + + def setUp(self): + super().setUp() + self.start_server() + + def tearDown(self): + self.stop_server() + super().tearDown() + + def url(self): + return 'http://127.0.0.1:%d/' % self.port + + def start_server(self): + """ Spawn the API server using multiprocessing. + """ + self.process = None + + # WSGI app configuration + for key, value in self.config.items(): + self.app.config[key] = value + # Get an available port number + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(('127.0.0.1', 0)) + self.port = sock.getsockname()[1] + sock.close() + + # Worker function for multiprocessing + def worker(app, port): + return app.run(port=port, use_reloader=False) + + self.process = multiprocessing.Process( + target=worker, args=(self.app, self.port) + ) + self.process.start() + + # Wait max 5 seconds for server to spawn + i = 0 + while i < 20: + try: + urlopen(self.url()) + except Exception: + i += 1 + time.sleep(0.25) + else: + return + + def stop_server(self): + """ Terminate the API server's process. + """ + if self.process: + self.process.terminate() diff --git a/swh/objstorage/tests/test_checker.py b/swh/objstorage/tests/test_checker.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/test_checker.py @@ -0,0 +1,128 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import gzip +import tempfile +import unittest + +from nose.tools import istest +from nose.plugins.attrib import attr + +from swh.core import hashutil +from swh.objstorage.checker import ContentChecker + + +class MockBackupStorage(): + + def __init__(self): + self.values = {} + + def content_add(self, id, value): + self.values[id] = value + + def content_get(self, ids): + for id in ids: + try: + data = self.values[id] + except KeyError: + yield None + continue + + yield {'sha1': id, 'data': data} + + +@attr('fs') +class TestChecker(unittest.TestCase): + """ Test the content integrity checker + """ + + def setUp(self): + super().setUp() + # Connect to an objstorage + config = {'batch_size': 10} + path = tempfile.mkdtemp() + slicing = '0:2/2:4/4:6' + self.checker = ContentChecker(config, path, slicing, 'http://None') + self.checker.backup_storages = [MockBackupStorage(), + MockBackupStorage()] + + def corrupt_content(self, id): + """ Make the given content invalid. + """ + hex_id = hashutil.hash_to_hex(id) + file_path = self.checker.objstorage._obj_path(hex_id) + with gzip.open(file_path, 'wb') as f: + f.write(b'Unexpected content') + + @istest + def check_valid_content(self): + # Check that a valid content is valid. + content = b'check_valid_content' + id = self.checker.objstorage.add(content) + self.assertTrue(self.checker.check_content(id)) + + @istest + def check_invalid_content(self): + # Check that an invalid content is noticed. + content = b'check_invalid_content' + id = self.checker.objstorage.add(content) + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + + @istest + def repair_content_present_first(self): + # Try to repair a content that is in the backup storage. + content = b'repair_content_present_first' + id = self.checker.objstorage.add(content) + # Add a content to the mock + self.checker.backup_storages[0].content_add(id, content) + # Corrupt and repair it. + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + self.checker.repair_contents([id]) + self.assertTrue(self.checker.check_content(id)) + + @istest + def repair_content_present_second(self): + # Try to repair a content that is not in the first backup storage. + content = b'repair_content_present_second' + id = self.checker.objstorage.add(content) + # Add a content to the mock + self.checker.backup_storages[1].content_add(id, content) + # Corrupt and repair it. + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + self.checker.repair_contents([id]) + self.assertTrue(self.checker.check_content(id)) + + @istest + def repair_content_present_distributed(self): + # Try to repair two contents that are in separate backup storages. + content1 = b'repair_content_present_distributed_2' + content2 = b'repair_content_present_distributed_1' + id1 = self.checker.objstorage.add(content1) + id2 = self.checker.objstorage.add(content2) + # Add content to the mock. + self.checker.backup_storages[0].content_add(id1, content1) + self.checker.backup_storages[0].content_add(id2, content2) + # Corrupt and repair it + self.corrupt_content(id1) + self.corrupt_content(id2) + self.assertFalse(self.checker.check_content(id1)) + self.assertFalse(self.checker.check_content(id2)) + self.checker.repair_contents([id1, id2]) + self.assertTrue(self.checker.check_content(id1)) + self.assertTrue(self.checker.check_content(id2)) + + @istest + def repair_content_missing(self): + # Try to repair a content that is NOT in the backup storage. + content = b'repair_content_present' + id = self.checker.objstorage.add(content) + # Corrupt and repair it. + self.corrupt_content(id) + self.assertFalse(self.checker.check_content(id)) + self.checker.repair_contents([id]) + self.assertFalse(self.checker.check_content(id)) diff --git a/swh/objstorage/tests/test_multiplexer_filter.py b/swh/objstorage/tests/test_multiplexer_filter.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/test_multiplexer_filter.py @@ -0,0 +1,373 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import random +import unittest + +from string import ascii_lowercase + +from nose.tools import istest +from nose.plugins.attrib import attr + +from swh.core import hashutil +from swh.objstorage.exc import ObjNotFoundError, Error +from swh.objstorage import ObjStorage +from swh.objstorage.multiplexer.filter import (add_filter, read_only, + id_prefix, id_regex) + + +def get_random_content(): + return bytes(''.join(random.sample(ascii_lowercase, 10)), 'utf8') + + +class MockObjStorage(ObjStorage): + """ Mock an object storage for testing the filters. + """ + def __init__(self): + self.objects = {} + + def __contains__(self, obj_id): + return obj_id in self.objects + + def __len__(self): + return len(self.objects) + + def __iter__(self): + return iter(self.objects) + + def id(self, content): + # Id is the content itself for easily choose the id of + # a content for filtering. + return hashutil.hashdata(content)['sha1'] + + def add(self, content, obj_id=None, check_presence=True): + if obj_id is None: + obj_id = self.id(content) + + if check_presence and obj_id in self.objects: + return obj_id + + self.objects[obj_id] = content + return obj_id + + def restore(self, content, obj_id=None): + return self.add(content, obj_id, check_presence=False) + + def get(self, obj_id): + if obj_id not in self: + raise ObjNotFoundError(obj_id) + return self.objects[obj_id] + + def check(self, obj_id): + if obj_id not in self: + raise ObjNotFoundError(obj_id) + if obj_id != self.id(self.objects[obj_id]): + raise Error(obj_id) + + def get_random(self, batch_size): + batch_size = min(len(self), batch_size) + return random.sample(list(self.objects), batch_size) + + +@attr('!db') +class MixinTestReadFilter(unittest.TestCase): + # Read only filter should not allow writing + + def setUp(self): + super().setUp() + storage = MockObjStorage() + + self.valid_content = b'pre-existing content' + self.invalid_content = b'invalid_content' + self.true_invalid_content = b'Anything that is not correct' + self.absent_content = b'non-existent content' + # Create a valid content. + self.valid_id = storage.add(self.valid_content) + # Create an invalid id and add a content with it. + self.invalid_id = storage.id(self.true_invalid_content) + storage.add(self.invalid_content, obj_id=self.invalid_id) + # Compute an id for a non-existing content. + self.absent_id = storage.id(self.absent_content) + + self.storage = add_filter(storage, read_only()) + + @istest + def can_contains(self): + self.assertTrue(self.valid_id in self.storage) + self.assertTrue(self.invalid_id in self.storage) + self.assertFalse(self.absent_id in self.storage) + + @istest + def can_iter(self): + self.assertIn(self.valid_id, iter(self.storage)) + self.assertIn(self.invalid_id, iter(self.storage)) + + @istest + def can_len(self): + self.assertEqual(2, len(self.storage)) + + @istest + def can_get(self): + self.assertEqual(self.valid_content, self.storage.get(self.valid_id)) + self.assertEqual(self.invalid_content, + self.storage.get(self.invalid_id)) + + @istest + def can_check(self): + with self.assertRaises(ObjNotFoundError): + self.storage.check(self.absent_id) + with self.assertRaises(Error): + self.storage.check(self.invalid_id) + self.storage.check(self.valid_id) + + @istest + def can_get_random(self): + self.assertEqual(1, len(self.storage.get_random(1))) + self.assertEqual(len(self.storage), len(self.storage.get_random(1000))) + + @istest + def cannot_add(self): + new_id = self.storage.add(b'New content') + result = self.storage.add(self.valid_content, self.valid_id) + self.assertNotIn(new_id, self.storage) + self.assertIsNone(result) + + @istest + def cannot_restore(self): + new_id = self.storage.restore(b'New content') + result = self.storage.restore(self.valid_content, self.valid_id) + self.assertNotIn(new_id, self.storage) + self.assertIsNone(result) + + +class MixinTestIdFilter(): + """ Mixin class that tests the filters based on filter.IdFilter + + Methods "make_valid", "make_invalid" and "filter_storage" must be + implemented by subclasses. + """ + + def setUp(self): + super().setUp() + # Use a hack here : as the mock uses the content as id, it is easy to + # create contents that are filtered or not. + self.prefix = '71' + storage = MockObjStorage() + + # Make the storage filtered + self.base_storage = storage + self.storage = self.filter_storage(storage) + + # Present content with valid id + self.present_valid_content = self.ensure_valid(b'yroqdtotji') + self.present_valid_id = storage.id(self.present_valid_content) + + # Present content with invalid id + self.present_invalid_content = self.ensure_invalid(b'glxddlmmzb') + self.present_invalid_id = storage.id(self.present_invalid_content) + + # Missing content with valid id + self.missing_valid_content = self.ensure_valid(b'rmzkdclkez') + self.missing_valid_id = storage.id(self.missing_valid_content) + + # Missing content with invalid id + self.missing_invalid_content = self.ensure_invalid(b'hlejfuginh') + self.missing_invalid_id = storage.id(self.missing_invalid_content) + + # Present corrupted content with valid id + self.present_corrupted_valid_content = self.ensure_valid(b'cdsjwnpaij') + self.true_present_corrupted_valid_content = self.ensure_valid( + b'mgsdpawcrr') + self.present_corrupted_valid_id = storage.id( + self.true_present_corrupted_valid_content) + + # Present corrupted content with invalid id + self.present_corrupted_invalid_content = self.ensure_invalid( + b'pspjljnrco') + self.true_present_corrupted_invalid_content = self.ensure_invalid( + b'rjocbnnbso') + self.present_corrupted_invalid_id = storage.id( + self.true_present_corrupted_invalid_content) + + # Missing (potentially) corrupted content with valid id + self.missing_corrupted_valid_content = self.ensure_valid( + b'zxkokfgtou') + self.true_missing_corrupted_valid_content = self.ensure_valid( + b'royoncooqa') + self.missing_corrupted_valid_id = storage.id( + self.true_missing_corrupted_valid_content) + + # Missing (potentially) corrupted content with invalid id + self.missing_corrupted_invalid_content = self.ensure_invalid( + b'hxaxnrmnyk') + self.true_missing_corrupted_invalid_content = self.ensure_invalid( + b'qhbolyuifr') + self.missing_corrupted_invalid_id = storage.id( + self.true_missing_corrupted_invalid_content) + + # Add the content that are supposed to be present + storage.add(self.present_valid_content) + storage.add(self.present_invalid_content) + storage.add(self.present_corrupted_valid_content, + obj_id=self.present_corrupted_valid_id) + storage.add(self.present_corrupted_invalid_content, + obj_id=self.present_corrupted_invalid_id) + + def filter_storage(self, storage): + raise NotImplementedError( + 'Id_filter test class must have a filter_storage method') + + def ensure_valid(self, content=None): + if content is None: + content = get_random_content() + while not self.storage.is_valid(self.base_storage.id(content)): + content = get_random_content() + return content + + def ensure_invalid(self, content=None): + if content is None: + content = get_random_content() + while self.storage.is_valid(self.base_storage.id(content)): + content = get_random_content() + return content + + @istest + def contains(self): + # Both contents are present, but the invalid one should be ignored. + self.assertTrue(self.present_valid_id in self.storage) + self.assertFalse(self.present_invalid_id in self.storage) + self.assertFalse(self.missing_valid_id in self.storage) + self.assertFalse(self.missing_invalid_id in self.storage) + self.assertTrue(self.present_corrupted_valid_id in self.storage) + self.assertFalse(self.present_corrupted_invalid_id in self.storage) + self.assertFalse(self.missing_corrupted_valid_id in self.storage) + self.assertFalse(self.missing_corrupted_invalid_id in self.storage) + + @istest + def iter(self): + self.assertIn(self.present_valid_id, iter(self.storage)) + self.assertNotIn(self.present_invalid_id, iter(self.storage)) + self.assertNotIn(self.missing_valid_id, iter(self.storage)) + self.assertNotIn(self.missing_invalid_id, iter(self.storage)) + self.assertIn(self.present_corrupted_valid_id, iter(self.storage)) + self.assertNotIn(self.present_corrupted_invalid_id, iter(self.storage)) + self.assertNotIn(self.missing_corrupted_valid_id, iter(self.storage)) + self.assertNotIn(self.missing_corrupted_invalid_id, iter(self.storage)) + + @istest + def len(self): + # Four contents are present, but only two should be valid. + self.assertEqual(2, len(self.storage)) + + @istest + def get(self): + self.assertEqual(self.present_valid_content, + self.storage.get(self.present_valid_id)) + with self.assertRaises(ObjNotFoundError): + self.storage.get(self.present_invalid_id) + with self.assertRaises(ObjNotFoundError): + self.storage.get(self.missing_valid_id) + with self.assertRaises(ObjNotFoundError): + self.storage.get(self.missing_invalid_id) + self.assertEqual(self.present_corrupted_valid_content, + self.storage.get(self.present_corrupted_valid_id)) + with self.assertRaises(ObjNotFoundError): + self.storage.get(self.present_corrupted_invalid_id) + with self.assertRaises(ObjNotFoundError): + self.storage.get(self.missing_corrupted_valid_id) + with self.assertRaises(ObjNotFoundError): + self.storage.get(self.missing_corrupted_invalid_id) + + @istest + def check(self): + self.storage.check(self.present_valid_id) + with self.assertRaises(ObjNotFoundError): + self.storage.check(self.present_invalid_id) + with self.assertRaises(ObjNotFoundError): + self.storage.check(self.missing_valid_id) + with self.assertRaises(ObjNotFoundError): + self.storage.check(self.missing_invalid_id) + with self.assertRaises(Error): + self.storage.check(self.present_corrupted_valid_id) + with self.assertRaises(ObjNotFoundError): + self.storage.check(self.present_corrupted_invalid_id) + with self.assertRaises(ObjNotFoundError): + self.storage.check(self.missing_corrupted_valid_id) + with self.assertRaises(ObjNotFoundError): + self.storage.check(self.missing_corrupted_invalid_id) + + @istest + def get_random(self): + self.assertEqual(0, len(list(self.storage.get_random(0)))) + + random_content = list(self.storage.get_random(1000)) + self.assertIn(self.present_valid_id, random_content) + self.assertNotIn(self.present_invalid_id, random_content) + self.assertNotIn(self.missing_valid_id, random_content) + self.assertNotIn(self.missing_invalid_id, random_content) + self.assertIn(self.present_corrupted_valid_id, random_content) + self.assertNotIn(self.present_corrupted_invalid_id, random_content) + self.assertNotIn(self.missing_corrupted_valid_id, random_content) + self.assertNotIn(self.missing_corrupted_invalid_id, random_content) + + @istest + def add(self): + # Add valid and invalid contents to the storage and check their + # presence with the unfiltered storage. + valid_content = self.ensure_valid(b'ulepsrjbgt') + valid_id = self.base_storage.id(valid_content) + invalid_content = self.ensure_invalid(b'znvghkjked') + invalid_id = self.base_storage.id(invalid_content) + self.storage.add(valid_content) + self.storage.add(invalid_content) + self.assertTrue(valid_id in self.base_storage) + self.assertFalse(invalid_id in self.base_storage) + + @istest + def restore(self): + # Add corrupted content to the storage and the try to restore it + valid_content = self.ensure_valid(b'ulepsrjbgt') + valid_id = self.base_storage.id(valid_content) + corrupted_content = self.ensure_valid(b'ltjkjsloyb') + corrupted_id = self.base_storage.id(corrupted_content) + self.storage.add(corrupted_content, obj_id=valid_id) + with self.assertRaises(ObjNotFoundError): + self.storage.check(corrupted_id) + with self.assertRaises(Error): + self.storage.check(valid_id) + self.storage.restore(valid_content) + self.storage.check(valid_id) + + +@attr('!db') +class TestPrefixFilter(MixinTestIdFilter, unittest.TestCase): + def setUp(self): + self.prefix = b'71' + super().setUp() + + def ensure_valid(self, content): + obj_id = hashutil.hashdata(content)['sha1'] + hex_obj_id = hashutil.hash_to_hex(obj_id) + self.assertTrue(hex_obj_id.startswith(self.prefix)) + return content + + def ensure_invalid(self, content): + obj_id = hashutil.hashdata(content)['sha1'] + hex_obj_id = hashutil.hash_to_hex(obj_id) + self.assertFalse(hex_obj_id.startswith(self.prefix)) + return content + + def filter_storage(self, storage): + return add_filter(storage, id_prefix(self.prefix)) + + +@attr('!db') +class TestRegexFilter(MixinTestIdFilter, unittest.TestCase): + def setUp(self): + self.regex = r'[a-f][0-9].*' + super().setUp() + + def filter_storage(self, storage): + return add_filter(storage, id_regex(self.regex)) diff --git a/swh/objstorage/tests/test_objstorage.py b/swh/objstorage/tests/test_objstorage.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/test_objstorage.py @@ -0,0 +1,17 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import unittest +import tempfile + +from swh.objstorage import PathSlicingObjStorage + +from swh.objstorage.tests.objstorage_testing import ObjStorageTestFixture + + +class TestObjStorage(ObjStorageTestFixture, unittest.TestCase): + + def setUp(self): + self.storage = PathSlicingObjStorage(tempfile.mkdtemp(), '0:2/0:5') diff --git a/swh/objstorage/tests/test_objstorage_api.py b/swh/objstorage/tests/test_objstorage_api.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/test_objstorage_api.py @@ -0,0 +1,88 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import tempfile +import unittest + +from nose.tools import istest +from nose.plugins.attrib import attr + +from swh.core import hashutil +from swh.objstorage.exc import ObjNotFoundError, Error +from swh.objstorage.tests.server_testing import ServerTestFixture +from swh.objstorage.api.client import RemoteObjStorage +from swh.objstorage.api.server import app + + +@attr('db') +class TestRemoteObjStorage(ServerTestFixture, unittest.TestCase): + """ Test the remote archive API. + """ + + def setUp(self): + self.config = {'storage_base': tempfile.mkdtemp(), + 'storage_slicing': '0:1/0:5'} + self.app = app + super().setUp() + self.objstorage = RemoteObjStorage(self.url()) + + def tearDown(self): + super().tearDown() + + @istest + def content_add(self): + content = bytes('Test content', 'utf8') + id = self.objstorage.content_add(content) + self.assertEquals(self.objstorage.content_get(id), content) + + @istest + def content_get_present(self): + content = bytes('content_get_present', 'utf8') + content_hash = hashutil.hashdata(content) + id = self.objstorage.content_add(content) + self.assertEquals(content_hash['sha1'], id) + + @istest + def content_get_missing(self): + content = bytes('content_get_missing', 'utf8') + content_hash = hashutil.hashdata(content) + with self.assertRaises(ObjNotFoundError): + self.objstorage.content_get(content_hash['sha1']) + + @istest + def content_get_random(self): + ids = [] + for i in range(100): + content = bytes('content_get_present', 'utf8') + id = self.objstorage.content_add(content) + ids.append(id) + for id in self.objstorage.content_get_random(50): + self.assertIn(id, ids) + + @istest + def content_check_invalid(self): + content = bytes('content_check_invalid', 'utf8') + invalid_id = hashutil.hashdata(b'invalid content')['sha1'] + # Add the content with an invalid id. + self.objstorage.content_add(content, invalid_id) + # Then check it and expect an error. + with self.assertRaises(Error): + self.objstorage.content_check(invalid_id) + + @istest + def content_check_valid(self): + content = bytes('content_check_valid', 'utf8') + id = self.objstorage.content_add(content) + try: + self.objstorage.content_check(id) + except: + self.fail('Integrity check failed') + + @istest + def content_check_missing(self): + content = bytes('content_check_valid', 'utf8') + content_hash = hashutil.hashdata(content) + with self.assertRaises(ObjNotFoundError): + self.objstorage.content_check(content_hash['sha1']) diff --git a/swh/objstorage/tests/test_objstorage_multiplexer.py b/swh/objstorage/tests/test_objstorage_multiplexer.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/test_objstorage_multiplexer.py @@ -0,0 +1,78 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import tempfile +import unittest + +from nose.tools import istest + +from swh.objstorage import PathSlicingObjStorage +from swh.objstorage.multiplexer import MultiplexerObjStorage +from swh.objstorage.multiplexer.filter import add_filter, read_only + +from objstorage_testing import ObjStorageTestFixture + + +class TestMultiplexerObjStorage(ObjStorageTestFixture, unittest.TestCase): + + def setUp(self): + super().setUp() + self.storage_v1 = PathSlicingObjStorage(tempfile.mkdtemp(), '0:2/2:4') + self.storage_v2 = PathSlicingObjStorage(tempfile.mkdtemp(), '0:1/0:5') + + self.r_storage = add_filter(self.storage_v1, read_only()) + self.w_storage = self.storage_v2 + self.storage = MultiplexerObjStorage([self.r_storage, self.w_storage]) + + @istest + def contains(self): + content_p, obj_id_p = self.hash_content(b'contains_present') + content_m, obj_id_m = self.hash_content(b'contains_missing') + self.storage.add(content_p, obj_id=obj_id_p) + self.assertIn(obj_id_p, self.storage) + self.assertNotIn(obj_id_m, self.storage) + + @istest + def iter(self): + content, obj_id = self.hash_content(b'iter') + self.assertEqual(list(iter(self.storage)), []) + self.storage.add(content, obj_id=obj_id) + self.assertEqual(list(iter(self.storage)), [obj_id]) + + @istest + def len(self): + content, obj_id = self.hash_content(b'len') + self.assertEqual(len(self.storage), 0) + self.storage.add(content, obj_id=obj_id) + self.assertEqual(len(self.storage), 1) + + @istest + def len_multiple(self): + content, obj_id = self.hash_content(b'len_multiple') + # Add a content to the read-only storage + self.storage_v1.add(content) + self.assertEqual(len(self.storage), 1) + # By adding the same content to the global storage, it should be + # Replicated. + # len() behavior is to indicates the number of files, not unique + # contents. + self.storage.add(content) + self.assertEqual(len(self.storage), 2) + + @istest + def get_random_contents(self): + content, obj_id = self.hash_content(b'get_random_content') + self.storage.add(content) + random_contents = list(self.storage.get_random(1)) + self.assertEqual(1, len(random_contents)) + self.assertIn(obj_id, random_contents) + + @istest + def access_readonly(self): + # Add a content to the readonly storage + content, obj_id = self.hash_content(b'content in read-only') + self.storage_v1.add(content) + # Try to retrieve it on the main storage + self.assertIn(obj_id, self.storage) diff --git a/swh/objstorage/tests/test_objstorage_pathslicing.py b/swh/objstorage/tests/test_objstorage_pathslicing.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/test_objstorage_pathslicing.py @@ -0,0 +1,76 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import tempfile +import unittest + +from nose.tools import istest + +from swh.core import hashutil +from swh.objstorage import exc +from swh.objstorage import PathSlicingObjStorage + +from objstorage_testing import ObjStorageTestFixture + + +class TestpathSlicingObjStorage(ObjStorageTestFixture, unittest.TestCase): + + def setUp(self): + super().setUp() + self.slicing = '0:2/2:4/4:6' + self.tmpdir = tempfile.mkdtemp() + self.storage = PathSlicingObjStorage(self.tmpdir, self.slicing) + + def content_path(self, obj_id): + hex_obj_id = hashutil.hash_to_hex(obj_id) + return self.storage._obj_path(hex_obj_id) + + @istest + def contains(self): + content_p, obj_id_p = self.hash_content(b'contains_present') + content_m, obj_id_m = self.hash_content(b'contains_missing') + self.storage.add(content_p, obj_id=obj_id_p) + self.assertIn(obj_id_p, self.storage) + self.assertNotIn(obj_id_m, self.storage) + + @istest + def iter(self): + content, obj_id = self.hash_content(b'iter') + self.assertEqual(list(iter(self.storage)), []) + self.storage.add(content, obj_id=obj_id) + self.assertEqual(list(iter(self.storage)), [obj_id]) + + @istest + def len(self): + content, obj_id = self.hash_content(b'check_not_gzip') + self.assertEqual(len(self.storage), 0) + self.storage.add(content, obj_id=obj_id) + self.assertEqual(len(self.storage), 1) + + @istest + def check_not_gzip(self): + content, obj_id = self.hash_content(b'check_not_gzip') + self.storage.add(content, obj_id=obj_id) + with open(self.content_path(obj_id), 'ab') as f: # Add garbage. + f.write(b'garbage') + with self.assertRaises(exc.Error): + self.storage.check(obj_id) + + @istest + def check_id_mismatch(self): + content, obj_id = self.hash_content(b'check_id_mismatch') + self.storage.add(content, obj_id=obj_id) + with open(self.content_path(obj_id), 'wb') as f: + f.write(b'unexpected content') + with self.assertRaises(exc.Error): + self.storage.check(obj_id) + + @istest + def get_random_contents(self): + content, obj_id = self.hash_content(b'get_random_content') + self.storage.add(content, obj_id=obj_id) + random_contents = list(self.storage.get_random(1)) + self.assertEqual(1, len(random_contents)) + self.assertIn(obj_id, random_contents)