diff --git a/swh/storage/objstorage/multiplexer/__init__.py b/swh/storage/objstorage/multiplexer/__init__.py new file mode 100644 index 000000000..3b9be1120 --- /dev/null +++ b/swh/storage/objstorage/multiplexer/__init__.py @@ -0,0 +1,4 @@ +from .multiplexer_objstorage import MultiplexerObjStorage + + +__all__ = [MultiplexerObjStorage] diff --git a/swh/storage/objstorage/multiplexer/multiplexer_objstorage.py b/swh/storage/objstorage/multiplexer/multiplexer_objstorage.py new file mode 100644 index 000000000..9923e4210 --- /dev/null +++ b/swh/storage/objstorage/multiplexer/multiplexer_objstorage.py @@ -0,0 +1,166 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import random + +from ..objstorage import ObjStorage +from ...exc import ObjNotFoundError + + +class MultiplexerObjStorage(ObjStorage): + """ Implementation of ObjStorage that distribute between multiple storages + + The multiplexer object storage allows an input to be demultiplexed + among multiple storages that will or will not accept it by themselves + (see .filter package). + + As the ids can be differents, no pre-computed ids should be submitted. + Also, there are no guarantees that the returned ids can be used directly + into the storages that the multiplexer manage. + """ + + def __init__(self, storages): + self.storages = storages + + def __contains__(self, obj_id): + for storage in self.storages: + if obj_id in storage: + return True + return False + + def __iter__(self): + for storage in self.storages: + yield from storage + + def __len__(self): + """ Returns the number of files in the storage. + + Warning: Multiple files may represent the same content, so this method + does not indicate how many different contents are in the storage. + """ + return sum(map(len, self.storages)) + + def add(self, content, obj_id=None, check_presence=True): + """ Add a new object to the object storage. + + If the adding step works in all the storages that accept this content, + this is a success. Otherwise, the full adding step is an error even if + it succeed in some of the storages. + + Args: + content: content of the object to be added to the storage. + obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When + given, obj_id will be trusted to match the bytes. If missing, + obj_id will be computed on the fly. + check_presence: indicate if the presence of the content should be + verified before adding the file. + + Returns: + an id of the object into the storage. As the write-storages are + always readable as well, any id will be valid to retrieve a + content. + """ + return [storage.add(content, obj_id, check_presence) + for storage in self.storages].pop() + + def restore(self, content, obj_id=None): + """ Restore a content that have been corrupted. + + This function is identical to add_bytes but does not check if + the object id is already in the file system. + + (see "add" method) + + Args: + content: content of the object to be added to the storage + obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When + given, obj_id will be trusted to match bytes. If missing, + obj_id will be computed on the fly. + + Returns: + an id of the object into the storage. As the write-storages are + always readable as well, any id will be valid to retrieve a + content. + """ + return [storage.restore(content, obj_id) + for storage in self.storages].pop() + + def get(self, obj_id): + """ Retrieve the content of a given object. + + Args: + obj_id: object id. + + Returns: + the content of the requested object as bytes. + + Raises: + ObjNotFoundError: if the requested object is missing. + """ + for storage in self.storages: + try: + return storage.get(obj_id) + except ObjNotFoundError: + continue + # If no storage contains this content, raise the error + raise ObjNotFoundError(obj_id) + + def check(self, obj_id): + """ Perform an integrity check for a given object. + + Verify that the file object is in place and that the gziped content + matches the object id. + + Args: + obj_id: object id. + + Raises: + ObjNotFoundError: if the requested object is missing. + Error: if the request object is corrupted. + """ + nb_present = 0 + for storage in self.storages: + try: + storage.check(obj_id) + except ObjNotFoundError: + continue + else: + nb_present += 1 + # If there is an Error because of a corrupted file, then let it pass. + + # Raise the ObjNotFoundError only if the content coulnd't be found in + # all the storages. + if nb_present == 0: + raise ObjNotFoundError(obj_id) + + def get_random(self, batch_size): + """ Get random ids of existing contents + + This method is used in order to get random ids to perform + content integrity verifications on random contents. + + Attributes: + batch_size (int): Number of ids that will be given + + Yields: + An iterable of ids of contents that are in the current object + storage. + """ + storages_set = [storage for storage in self.storages + if len(storage) > 0] + if len(storages_set) <= 0: + return [] + + while storages_set: + storage = random.choice(storages_set) + try: + return storage.get_random(batch_size) + except NotImplementedError: + storages_set.remove(storage) + # There is no storage that allow the get_random operation + raise NotImplementedError( + "There is no storage implementation into the multiplexer that " + "support the 'get_random' operation" + )