Page MenuHomeSoftware Heritage

D2085.diff
No OneTemporary

D2085.diff

diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py
--- a/swh/storage/__init__.py
+++ b/swh/storage/__init__.py
@@ -13,7 +13,7 @@
pass
-STORAGE_IMPLEMENTATION = {'local', 'remote', 'memory', 'filter'}
+STORAGE_IMPLEMENTATION = {'local', 'remote', 'memory', 'filter', 'buffer'}
def get_storage(cls, args):
@@ -22,7 +22,8 @@
Args:
storage (dict): dictionary with keys:
- - cls (str): storage's class, either local, remote, memory, filter
+ - cls (str): storage's class, either local, remote, memory, filter,
+ buffer
- args (dict): dictionary with keys
Returns:
@@ -44,5 +45,7 @@
from .in_memory import Storage
elif cls == 'filter':
from .filter import FilteringProxyStorage as Storage
+ elif cls == 'buffer':
+ from .buffer import BufferingProxyStorage as Storage
return Storage(**args)
diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/buffer.py
@@ -0,0 +1,106 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from collections import deque
+from functools import partial
+from typing import Optional, Iterable, Dict
+
+from swh.core.utils import grouper
+from swh.storage import get_storage
+
+
+class BufferingProxyStorage:
+ """Storage implementation in charge of accumulating objects prior to
+ discussing with the "main" storage.
+
+ Sample configuration use case for buffering storage:
+
+ .. code-block:: yaml
+
+ storage:
+ cls: buffer
+ args:
+ storage:
+ cls: remote
+ args: http://storage.internal.staging.swh.network:5002/
+ min_batch_size:
+ content: 10000
+ content_bytes: 100000000
+ directory: 5000
+ revision: 1000
+
+ """
+ def __init__(self, storage, min_batch_size=None):
+ self.storage = get_storage(**storage)
+
+ if min_batch_size is None:
+ min_batch_size = {}
+
+ self.min_batch_size = {
+ 'content': min_batch_size.get('content', 10000),
+ 'content_bytes': min_batch_size.get('content_bytes',
+ 100*1024*1024),
+ 'directory': min_batch_size.get('directory', 25000),
+ 'revision': min_batch_size.get('revision', 100000),
+ }
+ self.object_types = ['content', 'directory', 'revision']
+ self._objects = {k: deque() for k in self.object_types}
+
+ def __getattr__(self, key):
+ if key.endswith('_add'):
+ object_type = key.split('_')[0]
+ if object_type in self.object_types:
+ return partial(
+ self.object_add, object_type=object_type
+ )
+ return getattr(self.storage, key)
+
+ def content_add(self, content: Iterable[Dict]) -> Dict:
+ """Enqueue contents to write to the storage.
+
+ Following policies apply:
+ - First, check if the queue's threshold is hit. If it is flush content
+ to the storage.
+
+ - If not, check if the total size of enqueued contents's threshold is
+ hit. If it is flush content to the storage.
+
+ """
+ s = self.object_add(content, object_type='content')
+ if not s:
+ q = self._objects['content']
+ total_size = sum(c['length'] for c in q)
+ if total_size >= self.min_batch_size['content_bytes']:
+ return self.flush(['content'])
+
+ return s
+
+ def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict:
+ if object_types is None:
+ object_types = self.object_types
+ summary = {} # type: Dict[str, Dict]
+ for object_type in object_types:
+ q = self._objects[object_type]
+ for objs in grouper(q, n=self.min_batch_size[object_type]):
+ add_fn = getattr(self.storage, '%s_add' % object_type)
+ s = add_fn(objs)
+ summary = {k: v + summary.get(k, 0)
+ for k, v in s.items()}
+ q.clear()
+
+ return summary
+
+ def object_add(self, objects: Iterable[Dict], *, object_type: str) -> Dict:
+ """Enqueue objects to write to the storage. This checks if the queue's
+ threshold is hit. If it is actually write those to the storage.
+
+ """
+ q = self._objects[object_type]
+ threshold = self.min_batch_size[object_type]
+ q.extend(objects)
+ if len(q) >= threshold:
+ return self.flush()
+
+ return {}
diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_buffer.py
@@ -0,0 +1,179 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from swh.storage.buffer import BufferingProxyStorage
+
+
+def test_buffering_proxy_storage_content_threshold_not_hit(sample_data):
+ contents = sample_data['content']
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory', 'args': {}},
+ min_batch_size={
+ 'content': 10,
+ }
+ )
+ s = storage.content_add([contents[0], contents[1]])
+ assert s == {}
+
+ # contents have not been written to storage
+ missing_contents = storage.content_missing(
+ [contents[0], contents[1]])
+ assert set(missing_contents) == set(
+ [contents[0]['sha1'], contents[1]['sha1']])
+
+ s = storage.flush()
+ assert s == {
+ 'content:add': 1 + 1,
+ 'content:add:bytes': contents[0]['length'] + contents[1]['length'],
+ 'skipped_content:add': 0
+ }
+
+ missing_contents = storage.content_missing(
+ [contents[0], contents[1]])
+ assert list(missing_contents) == []
+
+
+def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data):
+ contents = sample_data['content']
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory', 'args': {}},
+ min_batch_size={
+ 'content': 1,
+ }
+ )
+
+ s = storage.content_add([contents[0]])
+ assert s == {
+ 'content:add': 1,
+ 'content:add:bytes': contents[0]['length'],
+ 'skipped_content:add': 0
+ }
+
+ missing_contents = storage.content_missing([contents[0]])
+ assert list(missing_contents) == []
+
+ s = storage.flush()
+ assert s == {}
+
+
+def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data):
+ contents = sample_data['content']
+ content_bytes_min_batch_size = 20
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory', 'args': {}},
+ min_batch_size={
+ 'content': 10,
+ 'content_bytes': content_bytes_min_batch_size,
+ }
+ )
+
+ assert contents[0]['length'] > content_bytes_min_batch_size
+
+ s = storage.content_add([contents[0]])
+ assert s == {
+ 'content:add': 1,
+ 'content:add:bytes': contents[0]['length'],
+ 'skipped_content:add': 0
+ }
+
+ missing_contents = storage.content_missing([contents[0]])
+ assert list(missing_contents) == []
+
+ s = storage.flush()
+ assert s == {}
+
+
+def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data):
+ directories = sample_data['directory']
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory', 'args': {}},
+ min_batch_size={
+ 'directory': 10,
+ }
+ )
+ s = storage.directory_add([directories[0]])
+ assert s == {}
+
+ directory_id = directories[0]['id']
+ missing_directories = storage.directory_missing(
+ [directory_id])
+ assert list(missing_directories) == [directory_id]
+
+ s = storage.flush()
+ assert s == {
+ 'directory:add': 1,
+ }
+
+ missing_directories = storage.directory_missing(
+ [directory_id])
+ assert list(missing_directories) == []
+
+
+def test_buffering_proxy_storage_directory_threshold_hit(sample_data):
+ directories = sample_data['directory']
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory', 'args': {}},
+ min_batch_size={
+ 'directory': 1,
+ }
+ )
+ s = storage.directory_add([directories[0]])
+ assert s == {
+ 'directory:add': 1,
+ }
+
+ missing_directories = storage.directory_missing(
+ [directories[0]['id']])
+ assert list(missing_directories) == []
+
+ s = storage.flush()
+ assert s == {}
+
+
+def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data):
+ revisions = sample_data['revision']
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory', 'args': {}},
+ min_batch_size={
+ 'revision': 10,
+ }
+ )
+ s = storage.revision_add([revisions[0]])
+ assert s == {}
+
+ revision_id = revisions[0]['id']
+ missing_revisions = storage.revision_missing(
+ [revision_id])
+ assert list(missing_revisions) == [revision_id]
+
+ s = storage.flush()
+ assert s == {
+ 'revision:add': 1,
+ }
+
+ missing_revisions = storage.revision_missing(
+ [revision_id])
+ assert list(missing_revisions) == []
+
+
+def test_buffering_proxy_storage_revision_threshold_hit(sample_data):
+ revisions = sample_data['revision']
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory', 'args': {}},
+ min_batch_size={
+ 'revision': 1,
+ }
+ )
+ s = storage.revision_add([revisions[0]])
+ assert s == {
+ 'revision:add': 1,
+ }
+
+ missing_revisions = storage.revision_missing(
+ [revisions[0]['id']])
+ assert list(missing_revisions) == []
+
+ s = storage.flush()
+ assert s == {}
diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py
--- a/swh/storage/tests/test_init.py
+++ b/swh/storage/tests/test_init.py
@@ -12,6 +12,7 @@
from swh.storage.api.client import RemoteStorage
from swh.storage.storage import Storage as DbStorage
from swh.storage.in_memory import Storage as MemoryStorage
+from swh.storage.buffer import BufferingProxyStorage
from swh.storage.filter import FilteringProxyStorage
@@ -31,7 +32,10 @@
}),
('filter', FilteringProxyStorage, {'storage': {
'cls': 'memory', 'args': {}}
- })
+ }),
+ ('buffer', BufferingProxyStorage, {'storage': {
+ 'cls': 'memory', 'args': {}}
+ }),
]:
actual_storage = get_storage(cls, args=dummy_args)
assert actual_storage is not None

File Metadata

Mime Type
text/plain
Expires
Nov 5 2024, 3:49 AM (10 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217660

Event Timeline