diff --git a/swh/storage/objstorage/multiplexer/__init__.py b/swh/storage/objstorage/multiplexer/__init__.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage/multiplexer/__init__.py @@ -0,0 +1,4 @@ +from .multiplexer_objstorage import MultiplexerObjStorage + + +__all__ = [MultiplexerObjStorage] diff --git a/swh/storage/objstorage/multiplexer/filter/__init__.py b/swh/storage/objstorage/multiplexer/filter/__init__.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage/multiplexer/filter/__init__.py @@ -0,0 +1,98 @@ +import functools + +from .read_write_filter import ReadObjStorageFilter +from .id_filter import RegexIdObjStorageFilter, PrefixIdObjStorageFilter + + +FILTERS = { + 'readonly': ReadObjStorageFilter, + 'regex': RegexIdObjStorageFilter, + 'prefix': PrefixIdObjStorageFilter +} + + +FILTERS_PRIORITY = { + 'readonly': 0, + 'prefix': 1, + 'regex': 2 +} + + +def read_only(storage): + """ Make the given storage read-only. + + Args: + storage (ObjStorage): storage to be changed to read-only mode. + + Returns: + a new storage that will be read-only (see filters). + """ + return add_filter(storage, {'type': 'readonly'}) + + +def _filter_priority(self, filter_type): + """ Get the priority of this filter. + + Priority is a value that indicates if the operation of the + filter is time-consuming (smaller values means quick execution), + or very likely to be almost always the same value (False being small, + and True high). + + In case the filters are chained, they will be ordered in a way that + small priorities (quick execution or instantly break the chain) are + executed first. + + Default value is 1. Value 0 is recommended for storages that change + behavior only by disabling some operations (making the method return + None). + """ + return FILTERS_PRIORITY.get_or_default(filter_type, 1) + + +def add_filter(storage, filter_conf): + """ Add a filter to the given storage. + + Args: + storage (ObjStorage): storage which will be filtered. + filter_conf (dict): configuration of an ObjStorageFilter, given as + a dictionnary that contains the keys: + - type: which represent the type of filter, one of the keys + of FILTERS + - Every arguments that this type of filter require. + + Returns: + A filtered storage that perform only the valid operations. + """ + type = filter_conf['type'] + args = {k: v for k, v in filter_conf.items() if k is not 'type'} + filter = FILTERS[type](storage=storage, **args) + return filter + + +def add_filters(storage, *filter_confs): + """ Add multiple filters to the given storage. + + (See filter.add_filter) + + Args: + storage (ObjStorage): storage which will be filtered. + filter_confs (list): any number of filter conf, as a dict with: + - type: which represent the type of filter, one of the keys of + FILTERS. + - Every arguments that this type of filter require. + + Returns: + A filtered storage that fulfill the requirement of all the given + filters. + """ + # Reverse sorting in order to put the filter with bigest priority first. + filter_confs.sort(key=lambda conf: _filter_priority(conf['type']), + reverse=True) + + # Add the bigest filter to the storage, and reduce it to accumulate filters + # on top of it, until the smallest (fastest, see filter.filter_priority) is + # added. + return functools.reduce( + lambda stor, conf: add_filter(stor, conf), + [storage] + filter_confs + ) diff --git a/swh/storage/objstorage/multiplexer/filter/filter.py b/swh/storage/objstorage/multiplexer/filter/filter.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage/multiplexer/filter/filter.py @@ -0,0 +1,48 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from ...objstorage import ObjStorage + + +class ObjStorageFilter(ObjStorage): + """ Base implementation of a filter that allow inputs on ObjStorage or not + + This class copy the API of ...objstorage in order to filter the inputs + of this class. + If the operation is allowed, return the result of this operation + applied to the destination implementation. Otherwise, just return + without any operation. + + This class is an abstract base class for a classic read/write storage. + Filters can inherit from it and only redefine some methods in order + to change behavior. + """ + + def __init__(self, storage): + self.storage = storage + + def __contains__(self, *args, **kwargs): + return self.storage.__contains__(*args, **kwargs) + + def __iter__(self): + return self.storage.__iter__() + + def __len__(self): + return self.storage.__len__() + + def add(self, *args, **kwargs): + return self.storage.add(*args, **kwargs) + + def restore(self, *args, **kwargs): + return self.storage.restore(*args, **kwargs) + + def get(self, *args, **kwargs): + return self.storage.get(*args, **kwargs) + + def check(self, *args, **kwargs): + return self.storage.check(*args, **kwargs) + + def get_random(self, *args, **kwargs): + return self.storage.get_random(*args, **kwargs) diff --git a/swh/storage/objstorage/multiplexer/filter/id_filter.py b/swh/storage/objstorage/multiplexer/filter/id_filter.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage/multiplexer/filter/id_filter.py @@ -0,0 +1,87 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import re + +from swh.core import hashutil + +from .filter import ObjStorageFilter +from ...objstorage import ID_HASH_ALGO + + +def compute_hash(bytes): + """ Compute the hash of the given content. + """ + # Checksum is missing, compute it on the fly. + h = hashutil._new_hash(ID_HASH_ALGO, len(bytes)) + h.update(bytes) + return h.digest() + + +class IdObjStorageFilter(ObjStorageFilter): + """ Filter that only allow operations if the object match a requirement. + + Even for read operations, check before if the id matche the requirements. + This may prevent for unnecesary disk access. + """ + + def is_valid(self, obj_id): + """ Indicates if the given id is valid. + """ + raise NotImplementedError('Implementations of an IdObjStorageFilter ' + 'must have a "is_valid" method') + + def __contains__(self, obj_id, *args, **kwargs): + if self.is_valid(obj_id): + return self.storage.__contains__(*args, obj_id=obj_id, **kwargs) + + def add(self, content, obj_id=None, check_presence=True, *args, **kwargs): + if obj_id is None: + obj_id = compute_hash(content) + if self.is_valid(obj_id): + return self.storage.add(*args, obj_id=obj_id, **kwargs) + + def restore(self, content, obj_id=None, *args, **kwargs): + if obj_id is None: + obj_id = compute_hash(content) + if self.is_valid(obj_id): + return self.storage.restore(*args, **kwargs) + + def get(self, obj_id, *args, **kwargs): + if self.is_valid(obj_id): + return self.storage.get(*args, obj_id=obj_id, **kwargs) + + def check(self, obj_id, *args, **kwargs): + if self.is_valid(obj_id): + return self.storage.check(*args, obj_id=obj_id, **kwargs) + + def get_random(self, *args, **kwargs): + return self.storage.get_random(*args, **kwargs) + + +class RegexIdObjStorageFilter(IdObjStorageFilter): + """ Filter that allow operations if the content's id as hex match a regex. + """ + + def __init__(self, storage, regex): + super().__init__(storage) + self.regex = re.compile(regex) + + def is_valid(self, obj_id): + hex_obj_id = hashutil.hash_to_hex(obj_id) + return self.regex.match(hex_obj_id) is not None + + +class PrefixIdObjStorageFilter(IdObjStorageFilter): + """ Filter that allow operations if the hexlified id have a given prefix. + """ + + def __init__(self, storage, prefix): + super().__init__(storage) + self.prefix = prefix + + def is_valid(self, obj_id): + hex_obj_id = hashutil.hash_to_hex(obj_id) + return hex_obj_id.startswith(self.prefix) diff --git a/swh/storage/objstorage/multiplexer/filter/read_write_filter.py b/swh/storage/objstorage/multiplexer/filter/read_write_filter.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage/multiplexer/filter/read_write_filter.py @@ -0,0 +1,17 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from .filter import ObjStorageFilter + + +class ReadObjStorageFilter(ObjStorageFilter): + """ Filter that disable write operation of the storage. + """ + + def add(self, *args, **kwargs): + return + + def restore(self, *args, **kwargs): + return diff --git a/swh/storage/objstorage/multiplexer/multiplexer_objstorage.py b/swh/storage/objstorage/multiplexer/multiplexer_objstorage.py new file mode 100644 --- /dev/null +++ b/swh/storage/objstorage/multiplexer/multiplexer_objstorage.py @@ -0,0 +1,166 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import random + +from ..objstorage import ObjStorage +from ...exc import ObjNotFoundError + + +class MultiplexerObjStorage(ObjStorage): + """ Implementation of ObjStorage that distribute between multiple storages + + The multiplexer object storage allows an input to be demultiplexed + among multiple storages that will or will not accept it by themselves + (see .filter package). + + As the ids can be differents, no pre-computed ids should be submitted. + Also, there is no garanties that the returned ids can be used directly + into the storages that the multiplexer manage. + """ + + def __init__(self, storages): + self.storages = storages + + def __contains__(self, obj_id): + for storage in self.storages: + if obj_id in storage: + return True + return False + + def __iter__(self): + for storage in self.storages: + yield from storage + + def __len__(self): + """ Returns the number of files in the storage. + + Warning: Multiple files may represent the same content, so this method + does not indicates how many different contents are in the storage. + """ + return sum(map(len, self.storages)) + + def add(self, content, obj_id=None, check_presence=True): + """ Add a new object to the object storage. + + If the adding works in all the storages that accept this content, + this is a success. Otherwise, the adding is an error even if it succeed + in some of the storages. + + Args: + content: content of the object to be added to the storage. + obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When + given, obj_id will be trusted to match the bytes. If missing, + obj_id will be computed on the fly. + check_presence: indicate if the presence of the content should be + verified before adding the file. + + Returns: + an id of the object into the storage. As the write-storages are + always readable as well, any id will be valid to retrieve a + content. + """ + return [storage.add(content, obj_id, check_presence) + for storage in self.storages].pop() + + def restore(self, content, obj_id=None): + """ Restore a content that have been corrupted. + + This function is identical to add_bytes but does not check if + the object id is already in the file system. + + (see "add" method) + + Args: + content: content of the object to be added to the storage + obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When + given, obj_id will be trusted to match bytes. If missing, + obj_id will be computed on the fly. + + Returns: + an id of the object into the storage. As the write-storages are + always readable as well, any id will be valid to retrieve a + content. + """ + return [storage.restore(content, obj_id) + for storage in self.storages].pop() + + def get(self, obj_id): + """ Retrieve the content of a given object. + + Args: + obj_id: object id. + + Returns: + the content of the requested object as bytes. + + Raises: + ObjNotFoundError: if the requested object is missing. + """ + for storage in self.storages: + try: + return storage.get(obj_id) + except ObjNotFoundError: + continue + # If no storage contains this content, raise the error + raise ObjNotFoundError(obj_id) + + def check(self, obj_id): + """ Perform an integrity check for a given object. + + Verify that the file object is in place and that the gziped content + matches the object id. + + Args: + obj_id: object id. + + Raises: + ObjNotFoundError: if the requested object is missing. + Error: if the request object is corrupted. + """ + nb_present = 0 + for storage in self.storages: + try: + storage.check(obj_id) + except ObjNotFoundError: + continue + else: + nb_present += 1 + # If there is an Error because of a corrupted file, then let it pass. + + # Raise the ObjNotFoundError only if the content coulnd't be found in + # all the storages. + if nb_present == 0: + raise ObjNotFoundError(obj_id) + + def get_random(self, batch_size): + """ Get random ids of existing contents + + This method is used in order to get random ids to perform + content integrity verifications on random contents. + + Attributes: + batch_size (int): Number of ids that will be given + + Yields: + An iterable of ids of contents that are in the current object + storage. + """ + storages_set = [storage for storage in self.storages + if len(storage) > 0] + if len(storages_set) <= 0: + return [] + + while storages_set: + storage = random.choice(storages_set) + try: + return storage.get_random(batch_size) + except NotImplementedError: + storages_set.remove(storage) + # There is no storage that allow the get_random operation + raise NotImplementedError( + "There is no storage implementation into the multiplexer that " + "support the 'get_random' operation" + ) diff --git a/swh/storage/tests/test_objstorage_multiplexer.py b/swh/storage/tests/test_objstorage_multiplexer.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_objstorage_multiplexer.py @@ -0,0 +1,78 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import tempfile +import unittest + +from nose.tools import istest + +from swh.storage.objstorage import PathSlicingObjStorage +from swh.storage.objstorage.multiplexer import MultiplexerObjStorage +from swh.storage.objstorage.multiplexer import filter as objstorage_filter + +from objstorage_testing import ObjStorageTestFixture + + +class TestMultiplexerObjStorage(ObjStorageTestFixture, unittest.TestCase): + + def setUp(self): + super().setUp() + self.storage_v1 = PathSlicingObjStorage(tempfile.mkdtemp(), + '0:2/2:4/4:6') + self.storage_v2 = PathSlicingObjStorage(tempfile.mkdtemp(), '0:1/0:5') + self.r_storage = objstorage_filter.read_only(self.storage_v1) + self.w_storage = self.storage_v2 + self.storage = MultiplexerObjStorage([self.r_storage, self.w_storage]) + + @istest + def contains(self): + content_p, obj_id_p = self.hash_content(b'contains_present') + content_m, obj_id_m = self.hash_content(b'contains_missing') + self.storage.add(content_p, obj_id=obj_id_p) + self.assertIn(obj_id_p, self.storage) + self.assertNotIn(obj_id_m, self.storage) + + @istest + def iter(self): + content, obj_id = self.hash_content(b'iter') + self.assertEqual(list(iter(self.storage)), []) + self.storage.add(content, obj_id=obj_id) + self.assertEqual(list(iter(self.storage)), [obj_id]) + + @istest + def len(self): + content, obj_id = self.hash_content(b'len') + self.assertEqual(len(self.storage), 0) + self.storage.add(content, obj_id=obj_id) + self.assertEqual(len(self.storage), 1) + + @istest + def len_multiple(self): + content, obj_id = self.hash_content(b'len_multiple') + # Add a content to the read-only storage + self.storage_v1.add(content) + self.assertEqual(len(self.storage), 1) + # By adding the same content to the global storage, it should be + # Replicated. + # len() behavior is to indicates the number of files, not unique + # contents. + self.storage.add(content) + self.assertEqual(len(self.storage), 2) + + @istest + def get_random_contents(self): + content, obj_id = self.hash_content(b'get_random_content') + self.storage.add(content) + random_contents = list(self.storage.get_random(1)) + self.assertEqual(1, len(random_contents)) + self.assertIn(obj_id, random_contents) + + @istest + def access_readonly(self): + # Add a content to the readonly storage + content, obj_id = self.hash_content(b'content in read-only') + self.storage_v1.add(content) + # Try to retrieve it on the main storage + self.assertIn(obj_id, self.storage)