Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7066244
D2085.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
10 KB
Subscribers
None
D2085.diff
View Options
diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py
--- a/swh/storage/__init__.py
+++ b/swh/storage/__init__.py
@@ -13,7 +13,7 @@
pass
-STORAGE_IMPLEMENTATION = {'local', 'remote', 'memory', 'filter'}
+STORAGE_IMPLEMENTATION = {'local', 'remote', 'memory', 'filter', 'buffer'}
def get_storage(cls, args):
@@ -22,7 +22,8 @@
Args:
storage (dict): dictionary with keys:
- - cls (str): storage's class, either local, remote, memory, filter
+ - cls (str): storage's class, either local, remote, memory, filter,
+ buffer
- args (dict): dictionary with keys
Returns:
@@ -44,5 +45,7 @@
from .in_memory import Storage
elif cls == 'filter':
from .filter import FilteringProxyStorage as Storage
+ elif cls == 'buffer':
+ from .buffer import BufferingProxyStorage as Storage
return Storage(**args)
diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/buffer.py
@@ -0,0 +1,106 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from collections import deque
+from functools import partial
+from typing import Optional, Iterable, Dict
+
+from swh.core.utils import grouper
+from swh.storage import get_storage
+
+
+class BufferingProxyStorage:
+ """Storage implementation in charge of accumulating objects prior to
+ discussing with the "main" storage.
+
+ Sample configuration use case for buffering storage:
+
+ .. code-block:: yaml
+
+ storage:
+ cls: buffer
+ args:
+ storage:
+ cls: remote
+ args: http://storage.internal.staging.swh.network:5002/
+ min_batch_size:
+ content: 10000
+ content_bytes: 100000000
+ directory: 5000
+ revision: 1000
+
+ """
+ def __init__(self, storage, min_batch_size=None):
+ self.storage = get_storage(**storage)
+
+ if min_batch_size is None:
+ min_batch_size = {}
+
+ self.min_batch_size = {
+ 'content': min_batch_size.get('content', 10000),
+ 'content_bytes': min_batch_size.get('content_bytes',
+ 100*1024*1024),
+ 'directory': min_batch_size.get('directory', 25000),
+ 'revision': min_batch_size.get('revision', 100000),
+ }
+ self.object_types = ['content', 'directory', 'revision']
+ self._objects = {k: deque() for k in self.object_types}
+
+ def __getattr__(self, key):
+ if key.endswith('_add'):
+ object_type = key.split('_')[0]
+ if object_type in self.object_types:
+ return partial(
+ self.object_add, object_type=object_type
+ )
+ return getattr(self.storage, key)
+
+ def content_add(self, content: Iterable[Dict]) -> Dict:
+ """Enqueue contents to write to the storage.
+
+ Following policies apply:
+ - First, check if the queue's threshold is hit. If it is flush content
+ to the storage.
+
+ - If not, check if the total size of enqueued contents's threshold is
+ hit. If it is flush content to the storage.
+
+ """
+ s = self.object_add(content, object_type='content')
+ if not s:
+ q = self._objects['content']
+ total_size = sum(c['length'] for c in q)
+ if total_size >= self.min_batch_size['content_bytes']:
+ return self.flush(['content'])
+
+ return s
+
+ def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict:
+ if object_types is None:
+ object_types = self.object_types
+ summary = {} # type: Dict[str, Dict]
+ for object_type in object_types:
+ q = self._objects[object_type]
+ for objs in grouper(q, n=self.min_batch_size[object_type]):
+ add_fn = getattr(self.storage, '%s_add' % object_type)
+ s = add_fn(objs)
+ summary = {k: v + summary.get(k, 0)
+ for k, v in s.items()}
+ q.clear()
+
+ return summary
+
+ def object_add(self, objects: Iterable[Dict], *, object_type: str) -> Dict:
+ """Enqueue objects to write to the storage. This checks if the queue's
+ threshold is hit. If it is actually write those to the storage.
+
+ """
+ q = self._objects[object_type]
+ threshold = self.min_batch_size[object_type]
+ q.extend(objects)
+ if len(q) >= threshold:
+ return self.flush()
+
+ return {}
diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/tests/test_buffer.py
@@ -0,0 +1,179 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from swh.storage.buffer import BufferingProxyStorage
+
+
+def test_buffering_proxy_storage_content_threshold_not_hit(sample_data):
+ contents = sample_data['content']
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory', 'args': {}},
+ min_batch_size={
+ 'content': 10,
+ }
+ )
+ s = storage.content_add([contents[0], contents[1]])
+ assert s == {}
+
+ # contents have not been written to storage
+ missing_contents = storage.content_missing(
+ [contents[0], contents[1]])
+ assert set(missing_contents) == set(
+ [contents[0]['sha1'], contents[1]['sha1']])
+
+ s = storage.flush()
+ assert s == {
+ 'content:add': 1 + 1,
+ 'content:add:bytes': contents[0]['length'] + contents[1]['length'],
+ 'skipped_content:add': 0
+ }
+
+ missing_contents = storage.content_missing(
+ [contents[0], contents[1]])
+ assert list(missing_contents) == []
+
+
+def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data):
+ contents = sample_data['content']
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory', 'args': {}},
+ min_batch_size={
+ 'content': 1,
+ }
+ )
+
+ s = storage.content_add([contents[0]])
+ assert s == {
+ 'content:add': 1,
+ 'content:add:bytes': contents[0]['length'],
+ 'skipped_content:add': 0
+ }
+
+ missing_contents = storage.content_missing([contents[0]])
+ assert list(missing_contents) == []
+
+ s = storage.flush()
+ assert s == {}
+
+
+def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data):
+ contents = sample_data['content']
+ content_bytes_min_batch_size = 20
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory', 'args': {}},
+ min_batch_size={
+ 'content': 10,
+ 'content_bytes': content_bytes_min_batch_size,
+ }
+ )
+
+ assert contents[0]['length'] > content_bytes_min_batch_size
+
+ s = storage.content_add([contents[0]])
+ assert s == {
+ 'content:add': 1,
+ 'content:add:bytes': contents[0]['length'],
+ 'skipped_content:add': 0
+ }
+
+ missing_contents = storage.content_missing([contents[0]])
+ assert list(missing_contents) == []
+
+ s = storage.flush()
+ assert s == {}
+
+
+def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data):
+ directories = sample_data['directory']
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory', 'args': {}},
+ min_batch_size={
+ 'directory': 10,
+ }
+ )
+ s = storage.directory_add([directories[0]])
+ assert s == {}
+
+ directory_id = directories[0]['id']
+ missing_directories = storage.directory_missing(
+ [directory_id])
+ assert list(missing_directories) == [directory_id]
+
+ s = storage.flush()
+ assert s == {
+ 'directory:add': 1,
+ }
+
+ missing_directories = storage.directory_missing(
+ [directory_id])
+ assert list(missing_directories) == []
+
+
+def test_buffering_proxy_storage_directory_threshold_hit(sample_data):
+ directories = sample_data['directory']
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory', 'args': {}},
+ min_batch_size={
+ 'directory': 1,
+ }
+ )
+ s = storage.directory_add([directories[0]])
+ assert s == {
+ 'directory:add': 1,
+ }
+
+ missing_directories = storage.directory_missing(
+ [directories[0]['id']])
+ assert list(missing_directories) == []
+
+ s = storage.flush()
+ assert s == {}
+
+
+def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data):
+ revisions = sample_data['revision']
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory', 'args': {}},
+ min_batch_size={
+ 'revision': 10,
+ }
+ )
+ s = storage.revision_add([revisions[0]])
+ assert s == {}
+
+ revision_id = revisions[0]['id']
+ missing_revisions = storage.revision_missing(
+ [revision_id])
+ assert list(missing_revisions) == [revision_id]
+
+ s = storage.flush()
+ assert s == {
+ 'revision:add': 1,
+ }
+
+ missing_revisions = storage.revision_missing(
+ [revision_id])
+ assert list(missing_revisions) == []
+
+
+def test_buffering_proxy_storage_revision_threshold_hit(sample_data):
+ revisions = sample_data['revision']
+ storage = BufferingProxyStorage(
+ storage={'cls': 'memory', 'args': {}},
+ min_batch_size={
+ 'revision': 1,
+ }
+ )
+ s = storage.revision_add([revisions[0]])
+ assert s == {
+ 'revision:add': 1,
+ }
+
+ missing_revisions = storage.revision_missing(
+ [revisions[0]['id']])
+ assert list(missing_revisions) == []
+
+ s = storage.flush()
+ assert s == {}
diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py
--- a/swh/storage/tests/test_init.py
+++ b/swh/storage/tests/test_init.py
@@ -12,6 +12,7 @@
from swh.storage.api.client import RemoteStorage
from swh.storage.storage import Storage as DbStorage
from swh.storage.in_memory import Storage as MemoryStorage
+from swh.storage.buffer import BufferingProxyStorage
from swh.storage.filter import FilteringProxyStorage
@@ -31,7 +32,10 @@
}),
('filter', FilteringProxyStorage, {'storage': {
'cls': 'memory', 'args': {}}
- })
+ }),
+ ('buffer', BufferingProxyStorage, {'storage': {
+ 'cls': 'memory', 'args': {}}
+ }),
]:
actual_storage = get_storage(cls, args=dummy_args)
assert actual_storage is not None
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Nov 5 2024, 3:49 AM (10 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217660
Attached To
D2085: swh.storage.buffer: Add buffering proxy storage implementation
Event Timeline
Log In to Comment