diff --git a/swh/objstorage/multiplexer/filter/__init__.py b/swh/objstorage/multiplexer/filter/__init__.py index a150bfb..1d81952 100644 --- a/swh/objstorage/multiplexer/filter/__init__.py +++ b/swh/objstorage/multiplexer/filter/__init__.py @@ -1,98 +1,99 @@ from .read_write_filter import ReadObjStorageFilter from .id_filter import RegexIdObjStorageFilter, PrefixIdObjStorageFilter _FILTERS_CLASSES = { 'readonly': ReadObjStorageFilter, 'regex': RegexIdObjStorageFilter, 'prefix': PrefixIdObjStorageFilter } _FILTERS_PRIORITY = { 'readonly': 0, 'prefix': 1, 'regex': 2 } def read_only(): return {'type': 'readonly'} def id_prefix(prefix): return {'type': 'prefix', 'prefix': prefix} def id_regex(regex): return {'type': 'regex', 'regex': regex} def _filter_priority(filter_type): """Get the priority of this filter. Priority is a value that indicates if the operation of the filter is time-consuming (smaller values means quick execution), or very likely to be almost always the same value (False being small, and True high). In case the filters are chained, they will be ordered in a way that small priorities (quick execution or instantly break the chain) are executed first. Default value is 1. Value 0 is recommended for storages that change behavior only by disabling some operations (making the method return None). """ return _FILTERS_PRIORITY.get(filter_type, 1) def add_filter(storage, filter_conf): """Add a filter to the given storage. Args: - storage (ObjStorage): storage which will be filtered. + storage (swh.objstorage.ObjStorage): storage which will be filtered. filter_conf (dict): configuration of an ObjStorageFilter, given as a dictionnary that contains the keys: - - type: which represent the type of filter, one of the keys - of _FILTERS_CLASSES - - Every arguments that this type of filter requires. + + - type: which represent the type of filter, one of the keys of + _FILTERS_CLASSES + - Every arguments that this type of filter requires. Returns: A filtered storage that perform only the valid operations. """ type = filter_conf['type'] args = {k: v for k, v in filter_conf.items() if k != 'type'} filtered_storage = _FILTERS_CLASSES[type](storage=storage, **args) return filtered_storage def add_filters(storage, filter_confs): """ Add multiple filters to the given storage. (See filter.add_filter) Args: - storage (ObjStorage): storage which will be filtered. + storage (swh.objstorage.ObjStorage): storage which will be filtered. filter_confs (list): any number of filter conf, as a dict with: - - type: which represent the type of filter, one of the keys of - FILTERS. - - Every arguments that this type of filter require. + + - type: which represent the type of filter, one of the keys of FILTERS. + - Every arguments that this type of filter require. Returns: A filtered storage that fulfill the requirement of all the given filters. """ # Reverse sorting in order to put the filter with biggest priority first. filter_confs.sort(key=lambda conf: _filter_priority(conf['type']), reverse=True) # Add the bigest filter to the storage, and reduce it to accumulate filters # on top of it, until the smallest (fastest, see filter.filter_priority) is # added. for filter_conf in filter_confs: storage = add_filter(storage, filter_conf) return storage diff --git a/swh/objstorage/multiplexer/multiplexer_objstorage.py b/swh/objstorage/multiplexer/multiplexer_objstorage.py index 005d887..3041445 100644 --- a/swh/objstorage/multiplexer/multiplexer_objstorage.py +++ b/swh/objstorage/multiplexer/multiplexer_objstorage.py @@ -1,172 +1,174 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from ..objstorage import ObjStorage from ..exc import ObjNotFoundError class MultiplexerObjStorage(ObjStorage): """Implementation of ObjStorage that distributes between multiple storages. The multiplexer object storage allows an input to be demultiplexed among multiple storages that will or will not accept it by themselves (see .filter package). As the ids can be differents, no pre-computed ids should be submitted. Also, there are no guarantees that the returned ids can be used directly into the storages that the multiplexer manage. + Use case examples follow. + + Example 1:: - Use case examples could be: - Example 1: storage_v1 = filter.read_only(PathSlicingObjStorage('/dir1', '0:2/2:4/4:6')) storage_v2 = PathSlicingObjStorage('/dir2', '0:1/0:5') storage = MultiplexerObjStorage([storage_v1, storage_v2]) - When using 'storage', all the new contents will only be added - to the v2 storage, while it will be retrievable from both. + When using 'storage', all the new contents will only be added to the v2 + storage, while it will be retrievable from both. + + Example 2:: - Example 2: storage_v1 = filter.id_regex( PathSlicingObjStorage('/dir1', '0:2/2:4/4:6'), r'[^012].*' ) storage_v2 = filter.if_regex( PathSlicingObjStorage('/dir2', '0:1/0:5'), r'[012]/*' ) storage = MultiplexerObjStorage([storage_v1, storage_v2]) - When using this storage, the contents with a sha1 starting - with 0, 1 or 2 will be redirected (read AND write) to the - storage_v2, while the others will be redirected to the - storage_v1. If a content starting with 0, 1 or 2 is present - in the storage_v1, it would be ignored anyway. + When using this storage, the contents with a sha1 starting with 0, 1 or 2 + will be redirected (read AND write) to the storage_v2, while the others + will be redirected to the storage_v1. If a content starting with 0, 1 or 2 + is present in the storage_v1, it would be ignored anyway. """ def __init__(self, storages): self.storages = storages def check_config(self, *, check_write): return all( storage.check_config(check_write=check_write) for storage in self.storages ) def __contains__(self, obj_id): """Check the object storage for proper configuration. Args: check_write: check whether writes to the objstorage will succeed Returns: True if the storage is properly configured """ for storage in self.storages: if obj_id in storage: return True return False def __iter__(self): """Iterates over the content of each storages Due to the demultiplexer nature, same content can be in multiple storages and may be yielded multiple times. - Warning: The `__iter__` methods frequently have bad performance. You - almost certainly don't want to use this method in production. + Warning: + The ``__iter__`` methods frequently have bad performance. You + almost certainly don't want to use this method in production. """ for storage in self.storages: yield from storage def __len__(self): """Compute the number of objects in the current object storage. Identical objects present in multiple storages will be counted as multiple objects. Warning: this currently uses `__iter__`, its warning about bad performance applies. Returns: number of objects contained in the storage. """ return sum(map(len, self.storages)) def add(self, content, obj_id=None, check_presence=True): """ Add a new object to the object storage. If the adding step works in all the storages that accept this content, this is a success. Otherwise, the full adding step is an error even if it succeed in some of the storages. Args: content: content of the object to be added to the storage. obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When given, obj_id will be trusted to match the bytes. If missing, obj_id will be computed on the fly. check_presence: indicate if the presence of the content should be verified before adding the file. Returns: an id of the object into the storage. As the write-storages are always readable as well, any id will be valid to retrieve a content. """ return [storage.add(content, obj_id, check_presence) for storage in self.storages].pop() def restore(self, content, obj_id=None): return [storage.restore(content, obj_id) for storage in self.storages].pop() def get(self, obj_id): for storage in self.storages: try: return storage.get(obj_id) except ObjNotFoundError: continue # If no storage contains this content, raise the error raise ObjNotFoundError(obj_id) def check(self, obj_id): nb_present = 0 for storage in self.storages: try: storage.check(obj_id) except ObjNotFoundError: continue else: nb_present += 1 # If there is an Error because of a corrupted file, then let it pass. # Raise the ObjNotFoundError only if the content coulnd't be found in # all the storages. if nb_present == 0: raise ObjNotFoundError(obj_id) def get_random(self, batch_size): storages_set = [storage for storage in self.storages if len(storage) > 0] if len(storages_set) <= 0: return [] while storages_set: storage = random.choice(storages_set) try: return storage.get_random(batch_size) except NotImplementedError: storages_set.remove(storage) # There is no storage that allow the get_random operation raise NotImplementedError( "There is no storage implementation into the multiplexer that " "support the 'get_random' operation" )