Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7124733
D53.id182.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
18 KB
Subscribers
None
D53.id182.diff
View Options
diff --git a/swh/storage/objstorage/multiplexer/__init__.py b/swh/storage/objstorage/multiplexer/__init__.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/objstorage/multiplexer/__init__.py
@@ -0,0 +1,4 @@
+from .multiplexer_objstorage import MultiplexerObjStorage
+
+
+__all__ = [MultiplexerObjStorage]
diff --git a/swh/storage/objstorage/multiplexer/filter/__init__.py b/swh/storage/objstorage/multiplexer/filter/__init__.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/objstorage/multiplexer/filter/__init__.py
@@ -0,0 +1,98 @@
+import functools
+
+from .read_write_filter import ReadObjStorageFilter
+from .id_filter import RegexIdObjStorageFilter, PrefixIdObjStorageFilter
+
+
+FILTERS = {
+ 'readonly': ReadObjStorageFilter,
+ 'regex': RegexIdObjStorageFilter,
+ 'prefix': PrefixIdObjStorageFilter
+}
+
+
+FILTERS_PRIORITY = {
+ 'readonly': 0,
+ 'prefix': 1,
+ 'regex': 2
+}
+
+
+def read_only(storage):
+ """ Make the given storage read-only.
+
+ Args:
+ storage (ObjStorage): storage to be changed to read-only mode.
+
+ Returns:
+ a new storage that will be read-only (see filters).
+ """
+ return add_filter(storage, {'type': 'readonly'})
+
+
+def _filter_priority(self, filter_type):
+ """ Get the priority of this filter.
+
+ Priority is a value that indicates if the operation of the
+ filter is time-consuming (smaller values means quick execution),
+ or very likely to be almost always the same value (False being small,
+ and True high).
+
+ In case the filters are chained, they will be ordered in a way that
+ small priorities (quick execution or instantly break the chain) are
+ executed first.
+
+ Default value is 1. Value 0 is recommended for storages that change
+ behavior only by disabling some operations (making the method return
+ None).
+ """
+ return FILTERS_PRIORITY.get_or_default(filter_type, 1)
+
+
+def add_filter(storage, filter_conf):
+ """ Add a filter to the given storage.
+
+ Args:
+ storage (ObjStorage): storage which will be filtered.
+ filter_conf (dict): configuration of an ObjStorageFilter, given as
+ a dictionnary that contains the keys:
+ - type: which represent the type of filter, one of the keys
+ of FILTERS
+ - Every arguments that this type of filter require.
+
+ Returns:
+ A filtered storage that perform only the valid operations.
+ """
+ type = filter_conf['type']
+ args = {k: v for k, v in filter_conf.items() if k is not 'type'}
+ filter = FILTERS[type](storage=storage, **args)
+ return filter
+
+
+def add_filters(storage, *filter_confs):
+ """ Add multiple filters to the given storage.
+
+ (See filter.add_filter)
+
+ Args:
+ storage (ObjStorage): storage which will be filtered.
+ filter_confs (list): any number of filter conf, as a dict with:
+ - type: which represent the type of filter, one of the keys of
+ FILTERS.
+ - Every arguments that this type of filter require.
+
+ Returns:
+ A filtered storage that fulfill the requirement of all the given
+ filters.
+ """
+ # Reverse sorting in order to put the filter with bigest priority first.
+ filter_confs.sort(key=lambda conf: _filter_priority(conf['type']),
+ reverse=True)
+
+ # Add the bigest filter to the storage, and reduce it to accumulate filters
+ # on top of it, until the smallest (fastest, see filter.filter_priority) is
+ # added.
+ return functools.reduce(
+ lambda stor, conf: add_filter(stor, conf),
+ [storage] + filter_confs
+ )
diff --git a/swh/storage/objstorage/multiplexer/filter/filter.py b/swh/storage/objstorage/multiplexer/filter/filter.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/objstorage/multiplexer/filter/filter.py
@@ -0,0 +1,48 @@
+# Copyright (C) 2015-2016 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from ...objstorage import ObjStorage
+
+
+class ObjStorageFilter(ObjStorage):
+ """ Base implementation of a filter that allow inputs on ObjStorage or not
+
+ This class copy the API of ...objstorage in order to filter the inputs
+ of this class.
+ If the operation is allowed, return the result of this operation
+ applied to the destination implementation. Otherwise, just return
+ without any operation.
+
+ This class is an abstract base class for a classic read/write storage.
+ Filters can inherit from it and only redefine some methods in order
+ to change behavior.
+ """
+
+ def __init__(self, storage):
+ self.storage = storage
+
+ def __contains__(self, *args, **kwargs):
+ return self.storage.__contains__(*args, **kwargs)
+
+ def __iter__(self):
+ return self.storage.__iter__()
+
+ def __len__(self):
+ return self.storage.__len__()
+
+ def add(self, *args, **kwargs):
+ return self.storage.add(*args, **kwargs)
+
+ def restore(self, *args, **kwargs):
+ return self.storage.restore(*args, **kwargs)
+
+ def get(self, *args, **kwargs):
+ return self.storage.get(*args, **kwargs)
+
+ def check(self, *args, **kwargs):
+ return self.storage.check(*args, **kwargs)
+
+ def get_random(self, *args, **kwargs):
+ return self.storage.get_random(*args, **kwargs)
diff --git a/swh/storage/objstorage/multiplexer/filter/id_filter.py b/swh/storage/objstorage/multiplexer/filter/id_filter.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/objstorage/multiplexer/filter/id_filter.py
@@ -0,0 +1,87 @@
+# Copyright (C) 2015-2016 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import re
+
+from swh.core import hashutil
+
+from .filter import ObjStorageFilter
+from ...objstorage import ID_HASH_ALGO
+
+
+def compute_hash(content):
+ """ Compute the hash of the given content.
+ """
+ # Checksum is missing, compute it on the fly.
+ h = hashutil._new_hash(ID_HASH_ALGO, len(bytes))
+ h.update(bytes)
+ return h.digest()
+
+
+class IdObjStorageFilter(ObjStorageFilter):
+ """ Filter that only allow operations if the object match a requirement.
+
+ Even for read operations, check before if the id matche the requirements.
+ This may prevent for unnecesary disk access.
+ """
+
+ def is_valid(self, obj_id):
+ """ Indicates if the given id is valid.
+ """
+ raise NotImplementedError('Implementations of an IdObjStorageFilter '
+ 'must have a "is_valid" method')
+
+ def __contains__(self, obj_id, *args, **kwargs):
+ if self.is_valid(obj_id):
+ return self.storage.__contains__(*args, obj_id=obj_id, **kwargs)
+
+ def add(self, content, obj_id=None, check_presence=True, *args, **kwargs):
+ if obj_id is None:
+ obj_id = compute_hash(content)
+ if self.is_valid(obj_id):
+ return self.storage.add(*args, obj_id=obj_id, **kwargs)
+
+ def restore(self, content, obj_id=None, *args, **kwargs):
+ if obj_id is None:
+ obj_id = compute_hash(content)
+ if self.is_valid(obj_id):
+ return self.storage.restore(*args, **kwargs)
+
+ def get(self, obj_id, *args, **kwargs):
+ if self.is_valid(obj_id):
+ return self.storage.get(*args, obj_id=obj_id, **kwargs)
+
+ def check(self, obj_id, *args, **kwargs):
+ if self.is_valid(obj_id):
+ return self.storage.check(*args, obj_id=obj_id, **kwargs)
+
+ def get_random(self, *args, **kwargs):
+ return self.storage.get_random(*args, **kwargs)
+
+
+class RegexIdObjStorageFilter(IdObjStorageFilter):
+ """ Filter that allow operations if the content's id as hex match a regex.
+ """
+
+ def __init__(self, storage, regex):
+ super().__init__(storage)
+ self.regex = re.compile(regex)
+
+ def is_valid(self, obj_id):
+ hex_obj_id = hashutil.hash_to_hex(obj_id)
+ return self.regex.match(hex_obj_id) is not None
+
+
+class PrefixIdObjStorageFilter(IdObjStorageFilter):
+ """ Filter that allow operations if the hexlified id have a given prefix.
+ """
+
+ def __init__(self, storage, prefix):
+ super().__init__(storage)
+ self.prefix = prefix
+
+ def is_valid(self, obj_id):
+ hex_obj_id = hashutil.hash_to_hex(obj_id)
+ return hex_obj_id.startswith(self.prefix)
diff --git a/swh/storage/objstorage/multiplexer/filter/read_write_filter.py b/swh/storage/objstorage/multiplexer/filter/read_write_filter.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/objstorage/multiplexer/filter/read_write_filter.py
@@ -0,0 +1,17 @@
+# Copyright (C) 2015-2016 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from .filter import ObjStorageFilter
+
+
+class ReadObjStorageFilter(ObjStorageFilter):
+ """ Filter that disable write operation of the storage.
+ """
+
+ def add(self, *args, **kwargs):
+ return
+
+ def restore(self, *args, **kwargs):
+ return
diff --git a/swh/storage/objstorage/multiplexer/multiplexer_objstorage.py b/swh/storage/objstorage/multiplexer/multiplexer_objstorage.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/objstorage/multiplexer/multiplexer_objstorage.py
@@ -0,0 +1,166 @@
+# Copyright (C) 2015-2016 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import random
+
+from ..objstorage import ObjStorage
+from ...exc import ObjNotFoundError
+
+
+class MultiplexerObjStorage(ObjStorage):
+ """ Implementation of ObjStorage that distribute between multiple storages
+
+ The multiplexer object storage allows an input to be demultiplexed
+ among multiple storages that will or will not accept it by themselves
+ (see .filter package).
+
+ As the ids can be differents, no pre-computed ids should be submitted.
+ Also, there is no garanties that the returned ids can be used directly
+ into the storages that the multiplexer manage.
+ """
+
+ def __init__(self, storages):
+ self.storages = storages
+
+ def __contains__(self, obj_id):
+ for storage in self.storages:
+ if obj_id in storage:
+ return True
+ return False
+
+ def __iter__(self):
+ for storage in self.storages:
+ yield from storage
+
+ def __len__(self):
+ """ Returns the number of files in the storage.
+
+ Warning: Multiple files may represent the same content, so this method
+ does not indicates how many different contents are in the storage.
+ """
+ return sum(map(len, self.storages))
+
+ def add(self, content, obj_id=None, check_presence=True):
+ """ Add a new object to the object storage.
+
+ If the adding works in all the storages that accept this content,
+ this is a success. Otherwise, the adding is an error even if it succeed
+ in some of the storages.
+
+ Args:
+ content: content of the object to be added to the storage.
+ obj_id: checksum of [bytes] using [ID_HASH_ALGO] algorithm. When
+ given, obj_id will be trusted to match the bytes. If missing,
+ obj_id will be computed on the fly.
+ check_presence: indicate if the presence of the content should be
+ verified before adding the file.
+
+ Returns:
+ an id of the object into the storage. As the write-storages are
+ always readable as well, any id will be valid to retrieve a
+ content.
+ """
+ return [storage.add(content, obj_id, check_presence)
+ for storage in self.storages].pop()
+
+ def restore(self, content, obj_id=None):
+ """ Restore a content that have been corrupted.
+
+ This function is identical to add_bytes but does not check if
+ the object id is already in the file system.
+
+ (see "add" method)
+
+ Args:
+ content: content of the object to be added to the storage
+ obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When
+ given, obj_id will be trusted to match bytes. If missing,
+ obj_id will be computed on the fly.
+
+ Returns:
+ an id of the object into the storage. As the write-storages are
+ always readable as well, any id will be valid to retrieve a
+ content.
+ """
+ return [storage.restore(content, obj_id)
+ for storage in self.storages].pop()
+
+ def get(self, obj_id):
+ """ Retrieve the content of a given object.
+
+ Args:
+ obj_id: object id.
+
+ Returns:
+ the content of the requested object as bytes.
+
+ Raises:
+ ObjNotFoundError: if the requested object is missing.
+ """
+ for storage in self.storages:
+ try:
+ return storage.get(obj_id)
+ except ObjNotFoundError:
+ continue
+ # If no storage contains this content, raise the error
+ raise ObjNotFoundError(obj_id)
+
+ def check(self, obj_id):
+ """ Perform an integrity check for a given object.
+
+ Verify that the file object is in place and that the gziped content
+ matches the object id.
+
+ Args:
+ obj_id: object id.
+
+ Raises:
+ ObjNotFoundError: if the requested object is missing.
+ Error: if the request object is corrupted.
+ """
+ nb_present = 0
+ for storage in self.storages:
+ try:
+ storage.check(obj_id)
+ except ObjNotFoundError:
+ continue
+ else:
+ nb_present += 1
+ # If there is an Error because of a corrupted file, then let it pass.
+
+ # Raise the ObjNotFoundError only if the content coulnd't be found in
+ # all the storages.
+ if nb_present == 0:
+ raise ObjNotFoundError(obj_id)
+
+ def get_random(self, batch_size):
+ """ Get random ids of existing contents
+
+ This method is used in order to get random ids to perform
+ content integrity verifications on random contents.
+
+ Attributes:
+ batch_size (int): Number of ids that will be given
+
+ Yields:
+ An iterable of ids of contents that are in the current object
+ storage.
+ """
+ storages_set = [storage for storage in self.storages
+ if len(storage) > 0]
+ if len(storages_set) <= 0:
+ return []
+
+ while storages_set:
+ storage = random.choice(storages_set)
+ try:
+ return storage.get_random(batch_size)
+ except NotImplementedError:
+ storages_set.remove(storage)
+ # There is no storage that allow the get_random operation
+ raise NotImplementedError(
+ "There is no storage implementation into the multiplexer that "
+ "support the 'get_random' operation"
+ )
diff --git a/swh/storage/tests/test_objstorage_multiplexer.py b/swh/storage/tests/test_objstorage_multiplexer.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_objstorage_multiplexer.py
@@ -0,0 +1,78 @@
+# Copyright (C) 2015-2016 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import tempfile
+import unittest
+
+from nose.tools import istest
+
+from swh.storage.objstorage import PathSlicingObjStorage
+from swh.storage.objstorage.multiplexer import MultiplexerObjStorage
+from swh.storage.objstorage.multiplexer import filter as objstorage_filter
+
+from objstorage_testing import ObjStorageTestFixture
+
+
+class TestMultiplexerObjStorage(ObjStorageTestFixture, unittest.TestCase):
+
+ def setUp(self):
+ super().setUp()
+ self.storage_v1 = PathSlicingObjStorage(tempfile.mkdtemp(),
+ '0:2/2:4/4:6')
+ self.storage_v2 = PathSlicingObjStorage(tempfile.mkdtemp(), '0:1/0:5')
+ self.r_storage = objstorage_filter.read_only(self.storage_v1)
+ self.w_storage = self.storage_v2
+ self.storage = MultiplexerObjStorage([self.r_storage, self.w_storage])
+
+ @istest
+ def contains(self):
+ content_p, obj_id_p = self.hash_content(b'contains_present')
+ content_m, obj_id_m = self.hash_content(b'contains_missing')
+ self.storage.add(content_p, obj_id=obj_id_p)
+ self.assertIn(obj_id_p, self.storage)
+ self.assertNotIn(obj_id_m, self.storage)
+
+ @istest
+ def iter(self):
+ content, obj_id = self.hash_content(b'iter')
+ self.assertEqual(list(iter(self.storage)), [])
+ self.storage.add(content, obj_id=obj_id)
+ self.assertEqual(list(iter(self.storage)), [obj_id])
+
+ @istest
+ def len(self):
+ content, obj_id = self.hash_content(b'len')
+ self.assertEqual(len(self.storage), 0)
+ self.storage.add(content, obj_id=obj_id)
+ self.assertEqual(len(self.storage), 1)
+
+ @istest
+ def len_multiple(self):
+ content, obj_id = self.hash_content(b'len_multiple')
+ # Add a content to the read-only storage
+ self.storage_v1.add(content)
+ self.assertEqual(len(self.storage), 1)
+ # By adding the same content to the global storage, it should be
+ # Replicated.
+ # len() behavior is to indicates the number of files, not unique
+ # contents.
+ self.storage.add(content)
+ self.assertEqual(len(self.storage), 2)
+
+ @istest
+ def get_random_contents(self):
+ content, obj_id = self.hash_content(b'get_random_content')
+ self.storage.add(content)
+ random_contents = list(self.storage.get_random(1))
+ self.assertEqual(1, len(random_contents))
+ self.assertIn(obj_id, random_contents)
+
+ @istest
+ def access_readonly(self):
+ # Add a content to the readonly storage
+ content, obj_id = self.hash_content(b'content in read-only')
+ self.storage_v1.add(content)
+ # Try to retrieve it on the main storage
+ self.assertIn(obj_id, self.storage)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 21 2024, 6:05 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3229693
Attached To
D53: Implement the object storage multiplexer
Event Timeline
Log In to Comment