diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -13,7 +13,7 @@ pass -STORAGE_IMPLEMENTATION = {'local', 'remote', 'memory', 'filter'} +STORAGE_IMPLEMENTATION = {'local', 'remote', 'memory', 'filter', 'buffer'} def get_storage(cls, args): @@ -22,7 +22,8 @@ Args: storage (dict): dictionary with keys: - - cls (str): storage's class, either local, remote, memory, filter + - cls (str): storage's class, either local, remote, memory, filter, + buffer - args (dict): dictionary with keys Returns: @@ -44,5 +45,7 @@ from .in_memory import Storage elif cls == 'filter': from .filter import FilteringProxyStorage as Storage + elif cls == 'buffer': + from .buffer import BufferingProxyStorage as Storage return Storage(**args) diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py new file mode 100644 --- /dev/null +++ b/swh/storage/buffer.py @@ -0,0 +1,105 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import deque +from functools import partial +from typing import Optional, Sequence, Dict + +from swh.core.utils import grouper +from swh.storage import get_storage + + +class BufferingProxyStorage: + """Storage implementation in charge of accumulating objects prior to + discussing with the "main" storage. + + Sample configuration use case for buffering storage: + + .. code-block:: yaml + + storage: + cls: buffer + args: + storage: + cls: remote + args: http://storage.internal.staging.swh.network:5002/ + thresholds: + content: 10000 + content_bytes: 100000000 + directory: 5000 + revision: 1000 + + """ + def __init__(self, storage, thresholds=None): + self.storage = get_storage(**storage) + + if thresholds is None: + thresholds = {} + + self.thresholds = { + 'content': thresholds.get('content', 10000), + 'content_bytes': thresholds.get('content_bytes', 100*1024*1024), + 'directory': thresholds.get('directory', 25000), + 'revision': thresholds.get('revision', 100000), + } + self.object_types = ['content', 'directory', 'revision'] + self._objects = {k: deque() for k in self.object_types} + + def __getattr__(self, key): + if key.endswith('_add'): + object_type = key.split('_')[0] + if object_type in self.object_types: + return partial( + self.object_add, object_type=object_type + ) + return getattr(self.storage, key) + + def content_add(self, content: Sequence[Dict]) -> Dict: + """Enqueue contents to write to the storage. + + Following policies apply: + - First, check if the queue's threshold is hit. If it is flush content + to the storage. + + - If not, check if the total size of enqueued contents's threshold is + hit. If it is flush content to the storage. + + """ + s = self.object_add(content, object_type='content') + if not s: + q = self._objects['content'] + total_size = sum(c['length'] for c in q) + if total_size >= self.thresholds['content_bytes']: + return self.flush(['content']) + + return s + + def flush(self, object_types: Optional[Sequence[str]] = None) -> Dict: + if object_types is None: + object_types = self.object_types + summary = {} # type: Dict[str, Dict] + for object_type in object_types: + q = self._objects[object_type] + for objs in grouper(q, n=self.thresholds[object_type]): + add_fn = getattr(self.storage, '%s_add' % object_type) + s = add_fn(objs) + summary = {k: v + summary.get(k, 0) + for k, v in s.items()} + q.clear() + + return summary + + def object_add(self, objects: Sequence[Dict], *, object_type: str) -> Dict: + """Enqueue objects to write to the storage. This checks if the queue's + threshold is hit. If it is actually write those to the storage. + + """ + q = self._objects[object_type] + threshold = self.thresholds[object_type] + q.extend(objects) + if len(q) >= threshold: + return self.flush() + + return {} diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py new file mode 100644 --- /dev/null +++ b/swh/storage/tests/test_buffer.py @@ -0,0 +1,179 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.storage.buffer import BufferingProxyStorage + + +def test_buffering_proxy_storage_content_threshold_not_hit(sample_data): + contents = sample_data['content'] + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + thresholds={ + 'content': 10, + } + ) + s = storage.content_add([contents[0], contents[1]]) + assert s == {} + + # contents have not been written to storage + missing_contents = storage.content_missing( + [contents[0], contents[1]]) + assert set(missing_contents) == set( + [contents[0]['sha1'], contents[1]['sha1']]) + + s = storage.flush() + assert s == { + 'content:add': 1 + 1, + 'content:add:bytes': contents[0]['length'] + contents[1]['length'], + 'skipped_content:add': 0 + } + + missing_contents = storage.content_missing( + [contents[0], contents[1]]) + assert list(missing_contents) == [] + + +def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data): + contents = sample_data['content'] + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + thresholds={ + 'content': 1, + } + ) + + s = storage.content_add([contents[0]]) + assert s == { + 'content:add': 1, + 'content:add:bytes': contents[0]['length'], + 'skipped_content:add': 0 + } + + missing_contents = storage.content_missing([contents[0]]) + assert list(missing_contents) == [] + + s = storage.flush() + assert s == {} + + +def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data): + contents = sample_data['content'] + content_bytes_threshold = 20 + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + thresholds={ + 'content': 10, + 'content_bytes': content_bytes_threshold, + } + ) + + assert contents[0]['length'] > content_bytes_threshold + + s = storage.content_add([contents[0]]) + assert s == { + 'content:add': 1, + 'content:add:bytes': contents[0]['length'], + 'skipped_content:add': 0 + } + + missing_contents = storage.content_missing([contents[0]]) + assert list(missing_contents) == [] + + s = storage.flush() + assert s == {} + + +def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data): + directories = sample_data['directory'] + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + thresholds={ + 'directory': 10, + } + ) + s = storage.directory_add([directories[0]]) + assert s == {} + + directory_id = directories[0]['id'] + missing_directories = storage.directory_missing( + [directory_id]) + assert list(missing_directories) == [directory_id] + + s = storage.flush() + assert s == { + 'directory:add': 1, + } + + missing_directories = storage.directory_missing( + [directory_id]) + assert list(missing_directories) == [] + + +def test_buffering_proxy_storage_directory_threshold_hit(sample_data): + directories = sample_data['directory'] + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + thresholds={ + 'directory': 1, + } + ) + s = storage.directory_add([directories[0]]) + assert s == { + 'directory:add': 1, + } + + missing_directories = storage.directory_missing( + [directories[0]['id']]) + assert list(missing_directories) == [] + + s = storage.flush() + assert s == {} + + +def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data): + revisions = sample_data['revision'] + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + thresholds={ + 'revision': 10, + } + ) + s = storage.revision_add([revisions[0]]) + assert s == {} + + revision_id = revisions[0]['id'] + missing_revisions = storage.revision_missing( + [revision_id]) + assert list(missing_revisions) == [revision_id] + + s = storage.flush() + assert s == { + 'revision:add': 1, + } + + missing_revisions = storage.revision_missing( + [revision_id]) + assert list(missing_revisions) == [] + + +def test_buffering_proxy_storage_revision_threshold_hit(sample_data): + revisions = sample_data['revision'] + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + thresholds={ + 'revision': 1, + } + ) + s = storage.revision_add([revisions[0]]) + assert s == { + 'revision:add': 1, + } + + missing_revisions = storage.revision_missing( + [revisions[0]['id']]) + assert list(missing_revisions) == [] + + s = storage.flush() + assert s == {} diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py --- a/swh/storage/tests/test_init.py +++ b/swh/storage/tests/test_init.py @@ -10,6 +10,7 @@ from swh.storage.api.client import RemoteStorage # from swh.storage.storage import Storage as DbStorage from swh.storage.in_memory import Storage as MemoryStorage +from swh.storage.buffer import BufferingProxyStorage from swh.storage.filter import FilteringProxyStorage @@ -29,7 +30,10 @@ # }), ('filter', FilteringProxyStorage, {'storage': { 'cls': 'memory', 'args': {}} - }) + }), + ('buffer', BufferingProxyStorage, {'storage': { + 'cls': 'memory', 'args': {}} + }), ]: actual_storage = get_storage(cls, args=dummy_args) assert actual_storage is not None