diff --git a/swh/objstorage/multiplexer/filter/__init__.py b/swh/objstorage/multiplexer/filter/__init__.py index 9410830..8820909 100644 --- a/swh/objstorage/multiplexer/filter/__init__.py +++ b/swh/objstorage/multiplexer/filter/__init__.py @@ -1,98 +1,100 @@ import functools from .read_write_filter import ReadObjStorageFilter from .id_filter import RegexIdObjStorageFilter, PrefixIdObjStorageFilter _FILTERS_CLASSES = { 'readonly': ReadObjStorageFilter, 'regex': RegexIdObjStorageFilter, 'prefix': PrefixIdObjStorageFilter } _FILTERS_PRIORITY = { 'readonly': 0, 'prefix': 1, 'regex': 2 } def read_only(): return {'type': 'readonly'} def id_prefix(prefix): return {'type': 'prefix', 'prefix': prefix} def id_regex(regex): return {'type': 'regex', 'regex': regex} def _filter_priority(filter_type): - """ Get the priority of this filter. + """Get the priority of this filter. - Priority is a value that indicates if the operation of the - filter is time-consuming (smaller values means quick execution), - or very likely to be almost always the same value (False being small, - and True high). + Priority is a value that indicates if the operation of the filter + is time-consuming (smaller values means quick execution), or very + likely to be almost always the same value (False being small, and + True high). - In case the filters are chained, they will be ordered in a way that - small priorities (quick execution or instantly break the chain) are - executed first. + In case the filters are chained, they will be ordered in a way + that small priorities (quick execution or instantly break the + chain) are executed first. + + Default value is 1. Value 0 is recommended for storages that + change behavior only by disabling some operations (making the + method return None). - Default value is 1. Value 0 is recommended for storages that change - behavior only by disabling some operations (making the method return - None). """ return _FILTERS_PRIORITY.get(filter_type, 1) def add_filter(storage, filter_conf): - """ Add a filter to the given storage. + """Add a filter to the given storage. Args: storage (ObjStorage): storage which will be filtered. filter_conf (dict): configuration of an ObjStorageFilter, given as a dictionnary that contains the keys: - type: which represent the type of filter, one of the keys - of FILTERS - - Every arguments that this type of filter require. + of _FILTERS_CLASSES + - Every arguments that this type of filter requires. Returns: A filtered storage that perform only the valid operations. + """ type = filter_conf['type'] args = {k: v for k, v in filter_conf.items() if k is not 'type'} filter = _FILTERS_CLASSES[type](storage=storage, **args) return filter def add_filters(storage, filter_confs): """ Add multiple filters to the given storage. (See filter.add_filter) Args: storage (ObjStorage): storage which will be filtered. filter_confs (list): any number of filter conf, as a dict with: - type: which represent the type of filter, one of the keys of FILTERS. - Every arguments that this type of filter require. Returns: A filtered storage that fulfill the requirement of all the given filters. """ # Reverse sorting in order to put the filter with biggest priority first. filter_confs.sort(key=lambda conf: _filter_priority(conf['type']), reverse=True) # Add the bigest filter to the storage, and reduce it to accumulate filters # on top of it, until the smallest (fastest, see filter.filter_priority) is # added. return functools.reduce( lambda stor, conf: add_filter(stor, conf), [storage] + filter_confs ) diff --git a/swh/objstorage/multiplexer/filter/filter.py b/swh/objstorage/multiplexer/filter/filter.py index a3e8673..c514eec 100644 --- a/swh/objstorage/multiplexer/filter/filter.py +++ b/swh/objstorage/multiplexer/filter/filter.py @@ -1,62 +1,65 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from ...objstorage import ObjStorage class ObjStorageFilter(ObjStorage): - """ Base implementation of a filter that allow inputs on ObjStorage or not + """Base implementation of a filter that allow inputs on ObjStorage or + not. + + This class copy the API of ...objstorage in order to filter the + inputs of this class. - This class copy the API of ...objstorage in order to filter the inputs - of this class. If the operation is allowed, return the result of this operation applied to the destination implementation. Otherwise, just return without any operation. - This class is an abstract base class for a classic read/write storage. - Filters can inherit from it and only redefine some methods in order - to change behavior. + This class is an abstract base class for a classic read/write + storage. Filters can inherit from it and only redefine some + methods in order to change behavior. + """ def __init__(self, storage): self.storage = storage def __contains__(self, *args, **kwargs): return self.storage.__contains__(*args, **kwargs) def __iter__(self): """ Iterates over the content of each storages Warning: The `__iter__` methods frequently have bad performance. You almost certainly don't want to use this method in production as the wrapped storage may cause performance issues. """ return self.storage.__iter__() def __len__(self): """ Compute the number of objects in the current object storage. Warning: performance issue in `__iter__` also applies here. Returns: number of objects contained in the storage. """ return self.storage.__len__() def add(self, content, obj_id=None, check_presence=True, *args, **kwargs): return self.storage.add(content, obj_id, check_presence, *args, **kwargs) def restore(self, content, obj_id=None, *args, **kwargs): return self.storage.restore(content, obj_id, *args, **kwargs) def get(self, obj_id, *args, **kwargs): return self.storage.get(obj_id, *args, **kwargs) def check(self, obj_id, *args, **kwargs): return self.storage.check(obj_id, *args, **kwargs) def get_random(self, batch_size, *args, **kwargs): return self.storage.get_random(batch_size, *args, **kwargs) diff --git a/swh/objstorage/multiplexer/multiplexer_objstorage.py b/swh/objstorage/multiplexer/multiplexer_objstorage.py index 25c7179..ea0a558 100644 --- a/swh/objstorage/multiplexer/multiplexer_objstorage.py +++ b/swh/objstorage/multiplexer/multiplexer_objstorage.py @@ -1,154 +1,159 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from ..objstorage import ObjStorage from ..exc import ObjNotFoundError class MultiplexerObjStorage(ObjStorage): - """ Implementation of ObjStorage that distribute between multiple storages + """Implementation of ObjStorage that distributes between multiple + storages. The multiplexer object storage allows an input to be demultiplexed - among multiple storages that will or will not accept it by themselves - (see .filter package). + among multiple storages that will or will not accept it by + themselves (see .filter package). - As the ids can be differents, no pre-computed ids should be submitted. - Also, there are no guarantees that the returned ids can be used directly - into the storages that the multiplexer manage. + As the ids can be differents, no pre-computed ids should be + submitted. Also, there are no guarantees that the returned ids + can be used directly into the storages that the multiplexer + manage. Use case examples could be: Example 1: storage_v1 = filter.read_only(PathSlicingObjStorage('/dir1', '0:2/2:4/4:6')) storage_v2 = PathSlicingObjStorage('/dir2', '0:1/0:5') storage = MultiplexerObjStorage([storage_v1, storage_v2]) - When using 'storage', all the new contents will only be added to the v2 - storage, while it will be retrievable from both. + When using 'storage', all the new contents will only be added + to the v2 storage, while it will be retrievable from both. Example 2: storage_v1 = filter.id_regex( PathSlicingObjStorage('/dir1', '0:2/2:4/4:6'), r'[^012].*' ) storage_v2 = filter.if_regex( PathSlicingObjStorage('/dir2', '0:1/0:5'), r'[012]/*' ) storage = MultiplexerObjStorage([storage_v1, storage_v2]) - When using this storage, the contents with a sha1 starting with 0, 1 or - 2 will be redirected (read AND write) to the storage_v2, while the - others will be redirected to the storage_v1. - If a content starting with 0, 1 or 2 is present in the storage_v1, it - would be ignored anyway. + When using this storage, the contents with a sha1 starting + with 0, 1 or 2 will be redirected (read AND write) to the + storage_v2, while the others will be redirected to the + storage_v1. If a content starting with 0, 1 or 2 is present + in the storage_v1, it would be ignored anyway. + """ def __init__(self, storages): self.storages = storages def __contains__(self, obj_id): for storage in self.storages: if obj_id in storage: return True return False def __iter__(self): - """ Iterates over the content of each storages + """Iterates over the content of each storages Due to the demultiplexer nature, same content can be in multiple storages and may be yielded multiple times. Warning: The `__iter__` methods frequently have bad performance. You almost certainly don't want to use this method in production. + """ for storage in self.storages: yield from storage def __len__(self): - """ Compute the number of objects in the current object storage. + """Compute the number of objects in the current object storage. Identical objects present in multiple storages will be counted as multiple objects. Warning: this currently uses `__iter__`, its warning about bad performance applies. Returns: number of objects contained in the storage. + """ return sum(map(len, self.storages)) def add(self, content, obj_id=None, check_presence=True): """ Add a new object to the object storage. If the adding step works in all the storages that accept this content, this is a success. Otherwise, the full adding step is an error even if it succeed in some of the storages. Args: content: content of the object to be added to the storage. obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When given, obj_id will be trusted to match the bytes. If missing, obj_id will be computed on the fly. check_presence: indicate if the presence of the content should be verified before adding the file. Returns: an id of the object into the storage. As the write-storages are always readable as well, any id will be valid to retrieve a content. """ return [storage.add(content, obj_id, check_presence) for storage in self.storages].pop() def restore(self, content, obj_id=None): return [storage.restore(content, obj_id) for storage in self.storages].pop() def get(self, obj_id): for storage in self.storages: try: return storage.get(obj_id) except ObjNotFoundError: continue # If no storage contains this content, raise the error raise ObjNotFoundError(obj_id) def check(self, obj_id): nb_present = 0 for storage in self.storages: try: storage.check(obj_id) except ObjNotFoundError: continue else: nb_present += 1 # If there is an Error because of a corrupted file, then let it pass. # Raise the ObjNotFoundError only if the content coulnd't be found in # all the storages. if nb_present == 0: raise ObjNotFoundError(obj_id) def get_random(self, batch_size): storages_set = [storage for storage in self.storages if len(storage) > 0] if len(storages_set) <= 0: return [] while storages_set: storage = random.choice(storages_set) try: return storage.get_random(batch_size) except NotImplementedError: storages_set.remove(storage) # There is no storage that allow the get_random operation raise NotImplementedError( "There is no storage implementation into the multiplexer that " "support the 'get_random' operation" )