Page MenuHomeSoftware Heritage

D53.id182.diff
No OneTemporary

D53.id182.diff

diff --git a/swh/storage/objstorage/multiplexer/__init__.py b/swh/storage/objstorage/multiplexer/__init__.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/objstorage/multiplexer/__init__.py
@@ -0,0 +1,4 @@
+from .multiplexer_objstorage import MultiplexerObjStorage
+
+
+__all__ = [MultiplexerObjStorage]
diff --git a/swh/storage/objstorage/multiplexer/filter/__init__.py b/swh/storage/objstorage/multiplexer/filter/__init__.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/objstorage/multiplexer/filter/__init__.py
@@ -0,0 +1,98 @@
+import functools
+
+from .read_write_filter import ReadObjStorageFilter
+from .id_filter import RegexIdObjStorageFilter, PrefixIdObjStorageFilter
+
+
+FILTERS = {
+ 'readonly': ReadObjStorageFilter,
+ 'regex': RegexIdObjStorageFilter,
+ 'prefix': PrefixIdObjStorageFilter
+}
+
+
+FILTERS_PRIORITY = {
+ 'readonly': 0,
+ 'prefix': 1,
+ 'regex': 2
+}
+
+
+def read_only(storage):
+ """ Make the given storage read-only.
+
+ Args:
+ storage (ObjStorage): storage to be changed to read-only mode.
+
+ Returns:
+ a new storage that will be read-only (see filters).
+ """
+ return add_filter(storage, {'type': 'readonly'})
+
+
+def _filter_priority(self, filter_type):
+ """ Get the priority of this filter.
+
+ Priority is a value that indicates if the operation of the
+ filter is time-consuming (smaller values means quick execution),
+ or very likely to be almost always the same value (False being small,
+ and True high).
+
+ In case the filters are chained, they will be ordered in a way that
+ small priorities (quick execution or instantly break the chain) are
+ executed first.
+
+ Default value is 1. Value 0 is recommended for storages that change
+ behavior only by disabling some operations (making the method return
+ None).
+ """
+ return FILTERS_PRIORITY.get_or_default(filter_type, 1)
+
+
+def add_filter(storage, filter_conf):
+ """ Add a filter to the given storage.
+
+ Args:
+ storage (ObjStorage): storage which will be filtered.
+ filter_conf (dict): configuration of an ObjStorageFilter, given as
+ a dictionnary that contains the keys:
+ - type: which represent the type of filter, one of the keys
+ of FILTERS
+ - Every arguments that this type of filter require.
+
+ Returns:
+ A filtered storage that perform only the valid operations.
+ """
+ type = filter_conf['type']
+ args = {k: v for k, v in filter_conf.items() if k is not 'type'}
+ filter = FILTERS[type](storage=storage, **args)
+ return filter
+
+
+def add_filters(storage, *filter_confs):
+ """ Add multiple filters to the given storage.
+
+ (See filter.add_filter)
+
+ Args:
+ storage (ObjStorage): storage which will be filtered.
+ filter_confs (list): any number of filter conf, as a dict with:
+ - type: which represent the type of filter, one of the keys of
+ FILTERS.
+ - Every arguments that this type of filter require.
+
+ Returns:
+ A filtered storage that fulfill the requirement of all the given
+ filters.
+ """
+ # Reverse sorting in order to put the filter with bigest priority first.
+ filter_confs.sort(key=lambda conf: _filter_priority(conf['type']),
+ reverse=True)
+
+ # Add the bigest filter to the storage, and reduce it to accumulate filters
+ # on top of it, until the smallest (fastest, see filter.filter_priority) is
+ # added.
+ return functools.reduce(
+ lambda stor, conf: add_filter(stor, conf),
+ [storage] + filter_confs
+ )
diff --git a/swh/storage/objstorage/multiplexer/filter/filter.py b/swh/storage/objstorage/multiplexer/filter/filter.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/objstorage/multiplexer/filter/filter.py
@@ -0,0 +1,48 @@
+# Copyright (C) 2015-2016 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from ...objstorage import ObjStorage
+
+
+class ObjStorageFilter(ObjStorage):
+ """ Base implementation of a filter that allow inputs on ObjStorage or not
+
+ This class copy the API of ...objstorage in order to filter the inputs
+ of this class.
+ If the operation is allowed, return the result of this operation
+ applied to the destination implementation. Otherwise, just return
+ without any operation.
+
+ This class is an abstract base class for a classic read/write storage.
+ Filters can inherit from it and only redefine some methods in order
+ to change behavior.
+ """
+
+ def __init__(self, storage):
+ self.storage = storage
+
+ def __contains__(self, *args, **kwargs):
+ return self.storage.__contains__(*args, **kwargs)
+
+ def __iter__(self):
+ return self.storage.__iter__()
+
+ def __len__(self):
+ return self.storage.__len__()
+
+ def add(self, *args, **kwargs):
+ return self.storage.add(*args, **kwargs)
+
+ def restore(self, *args, **kwargs):
+ return self.storage.restore(*args, **kwargs)
+
+ def get(self, *args, **kwargs):
+ return self.storage.get(*args, **kwargs)
+
+ def check(self, *args, **kwargs):
+ return self.storage.check(*args, **kwargs)
+
+ def get_random(self, *args, **kwargs):
+ return self.storage.get_random(*args, **kwargs)
diff --git a/swh/storage/objstorage/multiplexer/filter/id_filter.py b/swh/storage/objstorage/multiplexer/filter/id_filter.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/objstorage/multiplexer/filter/id_filter.py
@@ -0,0 +1,87 @@
+# Copyright (C) 2015-2016 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import re
+
+from swh.core import hashutil
+
+from .filter import ObjStorageFilter
+from ...objstorage import ID_HASH_ALGO
+
+
+def compute_hash(content):
+ """ Compute the hash of the given content.
+ """
+ # Checksum is missing, compute it on the fly.
+ h = hashutil._new_hash(ID_HASH_ALGO, len(bytes))
+ h.update(bytes)
+ return h.digest()
+
+
+class IdObjStorageFilter(ObjStorageFilter):
+ """ Filter that only allow operations if the object match a requirement.
+
+ Even for read operations, check before if the id matche the requirements.
+ This may prevent for unnecesary disk access.
+ """
+
+ def is_valid(self, obj_id):
+ """ Indicates if the given id is valid.
+ """
+ raise NotImplementedError('Implementations of an IdObjStorageFilter '
+ 'must have a "is_valid" method')
+
+ def __contains__(self, obj_id, *args, **kwargs):
+ if self.is_valid(obj_id):
+ return self.storage.__contains__(*args, obj_id=obj_id, **kwargs)
+
+ def add(self, content, obj_id=None, check_presence=True, *args, **kwargs):
+ if obj_id is None:
+ obj_id = compute_hash(content)
+ if self.is_valid(obj_id):
+ return self.storage.add(*args, obj_id=obj_id, **kwargs)
+
+ def restore(self, content, obj_id=None, *args, **kwargs):
+ if obj_id is None:
+ obj_id = compute_hash(content)
+ if self.is_valid(obj_id):
+ return self.storage.restore(*args, **kwargs)
+
+ def get(self, obj_id, *args, **kwargs):
+ if self.is_valid(obj_id):
+ return self.storage.get(*args, obj_id=obj_id, **kwargs)
+
+ def check(self, obj_id, *args, **kwargs):
+ if self.is_valid(obj_id):
+ return self.storage.check(*args, obj_id=obj_id, **kwargs)
+
+ def get_random(self, *args, **kwargs):
+ return self.storage.get_random(*args, **kwargs)
+
+
+class RegexIdObjStorageFilter(IdObjStorageFilter):
+ """ Filter that allow operations if the content's id as hex match a regex.
+ """
+
+ def __init__(self, storage, regex):
+ super().__init__(storage)
+ self.regex = re.compile(regex)
+
+ def is_valid(self, obj_id):
+ hex_obj_id = hashutil.hash_to_hex(obj_id)
+ return self.regex.match(hex_obj_id) is not None
+
+
+class PrefixIdObjStorageFilter(IdObjStorageFilter):
+ """ Filter that allow operations if the hexlified id have a given prefix.
+ """
+
+ def __init__(self, storage, prefix):
+ super().__init__(storage)
+ self.prefix = prefix
+
+ def is_valid(self, obj_id):
+ hex_obj_id = hashutil.hash_to_hex(obj_id)
+ return hex_obj_id.startswith(self.prefix)
diff --git a/swh/storage/objstorage/multiplexer/filter/read_write_filter.py b/swh/storage/objstorage/multiplexer/filter/read_write_filter.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/objstorage/multiplexer/filter/read_write_filter.py
@@ -0,0 +1,17 @@
+# Copyright (C) 2015-2016 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from .filter import ObjStorageFilter
+
+
+class ReadObjStorageFilter(ObjStorageFilter):
+ """ Filter that disable write operation of the storage.
+ """
+
+ def add(self, *args, **kwargs):
+ return
+
+ def restore(self, *args, **kwargs):
+ return
diff --git a/swh/storage/objstorage/multiplexer/multiplexer_objstorage.py b/swh/storage/objstorage/multiplexer/multiplexer_objstorage.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/objstorage/multiplexer/multiplexer_objstorage.py
@@ -0,0 +1,166 @@
+# Copyright (C) 2015-2016 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import random
+
+from ..objstorage import ObjStorage
+from ...exc import ObjNotFoundError
+
+
+class MultiplexerObjStorage(ObjStorage):
+ """ Implementation of ObjStorage that distribute between multiple storages
+
+ The multiplexer object storage allows an input to be demultiplexed
+ among multiple storages that will or will not accept it by themselves
+ (see .filter package).
+
+ As the ids can be differents, no pre-computed ids should be submitted.
+ Also, there is no garanties that the returned ids can be used directly
+ into the storages that the multiplexer manage.
+ """
+
+ def __init__(self, storages):
+ self.storages = storages
+
+ def __contains__(self, obj_id):
+ for storage in self.storages:
+ if obj_id in storage:
+ return True
+ return False
+
+ def __iter__(self):
+ for storage in self.storages:
+ yield from storage
+
+ def __len__(self):
+ """ Returns the number of files in the storage.
+
+ Warning: Multiple files may represent the same content, so this method
+ does not indicates how many different contents are in the storage.
+ """
+ return sum(map(len, self.storages))
+
+ def add(self, content, obj_id=None, check_presence=True):
+ """ Add a new object to the object storage.
+
+ If the adding works in all the storages that accept this content,
+ this is a success. Otherwise, the adding is an error even if it succeed
+ in some of the storages.
+
+ Args:
+ content: content of the object to be added to the storage.
+ obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When
+ given, obj_id will be trusted to match the bytes. If missing,
+ obj_id will be computed on the fly.
+ check_presence: indicate if the presence of the content should be
+ verified before adding the file.
+
+ Returns:
+ an id of the object into the storage. As the write-storages are
+ always readable as well, any id will be valid to retrieve a
+ content.
+ """
+ return [storage.add(content, obj_id, check_presence)
+ for storage in self.storages].pop()
+
+ def restore(self, content, obj_id=None):
+ """ Restore a content that have been corrupted.
+
+ This function is identical to add_bytes but does not check if
+ the object id is already in the file system.
+
+ (see "add" method)
+
+ Args:
+ content: content of the object to be added to the storage
+ obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When
+ given, obj_id will be trusted to match bytes. If missing,
+ obj_id will be computed on the fly.
+
+ Returns:
+ an id of the object into the storage. As the write-storages are
+ always readable as well, any id will be valid to retrieve a
+ content.
+ """
+ return [storage.restore(content, obj_id)
+ for storage in self.storages].pop()
+
+ def get(self, obj_id):
+ """ Retrieve the content of a given object.
+
+ Args:
+ obj_id: object id.
+
+ Returns:
+ the content of the requested object as bytes.
+
+ Raises:
+ ObjNotFoundError: if the requested object is missing.
+ """
+ for storage in self.storages:
+ try:
+ return storage.get(obj_id)
+ except ObjNotFoundError:
+ continue
+ # If no storage contains this content, raise the error
+ raise ObjNotFoundError(obj_id)
+
+ def check(self, obj_id):
+ """ Perform an integrity check for a given object.
+
+ Verify that the file object is in place and that the gziped content
+ matches the object id.
+
+ Args:
+ obj_id: object id.
+
+ Raises:
+ ObjNotFoundError: if the requested object is missing.
+ Error: if the request object is corrupted.
+ """
+ nb_present = 0
+ for storage in self.storages:
+ try:
+ storage.check(obj_id)
+ except ObjNotFoundError:
+ continue
+ else:
+ nb_present += 1
+ # If there is an Error because of a corrupted file, then let it pass.
+
+ # Raise the ObjNotFoundError only if the content coulnd't be found in
+ # all the storages.
+ if nb_present == 0:
+ raise ObjNotFoundError(obj_id)
+
+ def get_random(self, batch_size):
+ """ Get random ids of existing contents
+
+ This method is used in order to get random ids to perform
+ content integrity verifications on random contents.
+
+ Attributes:
+ batch_size (int): Number of ids that will be given
+
+ Yields:
+ An iterable of ids of contents that are in the current object
+ storage.
+ """
+ storages_set = [storage for storage in self.storages
+ if len(storage) > 0]
+ if len(storages_set) <= 0:
+ return []
+
+ while storages_set:
+ storage = random.choice(storages_set)
+ try:
+ return storage.get_random(batch_size)
+ except NotImplementedError:
+ storages_set.remove(storage)
+ # There is no storage that allow the get_random operation
+ raise NotImplementedError(
+ "There is no storage implementation into the multiplexer that "
+ "support the 'get_random' operation"
+ )
diff --git a/swh/storage/tests/test_objstorage_multiplexer.py b/swh/storage/tests/test_objstorage_multiplexer.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_objstorage_multiplexer.py
@@ -0,0 +1,78 @@
+# Copyright (C) 2015-2016 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import tempfile
+import unittest
+
+from nose.tools import istest
+
+from swh.storage.objstorage import PathSlicingObjStorage
+from swh.storage.objstorage.multiplexer import MultiplexerObjStorage
+from swh.storage.objstorage.multiplexer import filter as objstorage_filter
+
+from objstorage_testing import ObjStorageTestFixture
+
+
+class TestMultiplexerObjStorage(ObjStorageTestFixture, unittest.TestCase):
+
+ def setUp(self):
+ super().setUp()
+ self.storage_v1 = PathSlicingObjStorage(tempfile.mkdtemp(),
+ '0:2/2:4/4:6')
+ self.storage_v2 = PathSlicingObjStorage(tempfile.mkdtemp(), '0:1/0:5')
+ self.r_storage = objstorage_filter.read_only(self.storage_v1)
+ self.w_storage = self.storage_v2
+ self.storage = MultiplexerObjStorage([self.r_storage, self.w_storage])
+
+ @istest
+ def contains(self):
+ content_p, obj_id_p = self.hash_content(b'contains_present')
+ content_m, obj_id_m = self.hash_content(b'contains_missing')
+ self.storage.add(content_p, obj_id=obj_id_p)
+ self.assertIn(obj_id_p, self.storage)
+ self.assertNotIn(obj_id_m, self.storage)
+
+ @istest
+ def iter(self):
+ content, obj_id = self.hash_content(b'iter')
+ self.assertEqual(list(iter(self.storage)), [])
+ self.storage.add(content, obj_id=obj_id)
+ self.assertEqual(list(iter(self.storage)), [obj_id])
+
+ @istest
+ def len(self):
+ content, obj_id = self.hash_content(b'len')
+ self.assertEqual(len(self.storage), 0)
+ self.storage.add(content, obj_id=obj_id)
+ self.assertEqual(len(self.storage), 1)
+
+ @istest
+ def len_multiple(self):
+ content, obj_id = self.hash_content(b'len_multiple')
+ # Add a content to the read-only storage
+ self.storage_v1.add(content)
+ self.assertEqual(len(self.storage), 1)
+ # By adding the same content to the global storage, it should be
+ # Replicated.
+ # len() behavior is to indicates the number of files, not unique
+ # contents.
+ self.storage.add(content)
+ self.assertEqual(len(self.storage), 2)
+
+ @istest
+ def get_random_contents(self):
+ content, obj_id = self.hash_content(b'get_random_content')
+ self.storage.add(content)
+ random_contents = list(self.storage.get_random(1))
+ self.assertEqual(1, len(random_contents))
+ self.assertIn(obj_id, random_contents)
+
+ @istest
+ def access_readonly(self):
+ # Add a content to the readonly storage
+ content, obj_id = self.hash_content(b'content in read-only')
+ self.storage_v1.add(content)
+ # Try to retrieve it on the main storage
+ self.assertIn(obj_id, self.storage)

File Metadata

Mime Type
text/plain
Expires
Dec 21 2024, 6:05 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3229693

Event Timeline