diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py index aef1d38..50354ea 100644 --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -1,48 +1,51 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from . import storage Storage = storage.Storage class HashCollision(Exception): pass -STORAGE_IMPLEMENTATION = {'local', 'remote', 'memory', 'filter'} +STORAGE_IMPLEMENTATION = {'local', 'remote', 'memory', 'filter', 'buffer'} def get_storage(cls, args): """Get a storage object of class `storage_class` with arguments `storage_args`. Args: storage (dict): dictionary with keys: - - cls (str): storage's class, either local, remote, memory, filter + - cls (str): storage's class, either local, remote, memory, filter, + buffer - args (dict): dictionary with keys Returns: an instance of swh.storage.Storage (either local or remote) Raises: ValueError if passed an unknown storage class. """ if cls not in STORAGE_IMPLEMENTATION: raise ValueError('Unknown storage class `%s`. Supported: %s' % ( cls, ', '.join(STORAGE_IMPLEMENTATION))) if cls == 'remote': from .api.client import RemoteStorage as Storage elif cls == 'local': from .storage import Storage elif cls == 'memory': from .in_memory import Storage elif cls == 'filter': from .filter import FilteringProxyStorage as Storage + elif cls == 'buffer': + from .buffer import BufferingProxyStorage as Storage return Storage(**args) diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py new file mode 100644 index 0000000..29c48cc --- /dev/null +++ b/swh/storage/buffer.py @@ -0,0 +1,106 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import deque +from functools import partial +from typing import Optional, Iterable, Dict + +from swh.core.utils import grouper +from swh.storage import get_storage + + +class BufferingProxyStorage: + """Storage implementation in charge of accumulating objects prior to + discussing with the "main" storage. + + Sample configuration use case for buffering storage: + + .. code-block:: yaml + + storage: + cls: buffer + args: + storage: + cls: remote + args: http://storage.internal.staging.swh.network:5002/ + min_batch_size: + content: 10000 + content_bytes: 100000000 + directory: 5000 + revision: 1000 + + """ + def __init__(self, storage, min_batch_size=None): + self.storage = get_storage(**storage) + + if min_batch_size is None: + min_batch_size = {} + + self.min_batch_size = { + 'content': min_batch_size.get('content', 10000), + 'content_bytes': min_batch_size.get('content_bytes', + 100*1024*1024), + 'directory': min_batch_size.get('directory', 25000), + 'revision': min_batch_size.get('revision', 100000), + } + self.object_types = ['content', 'directory', 'revision'] + self._objects = {k: deque() for k in self.object_types} + + def __getattr__(self, key): + if key.endswith('_add'): + object_type = key.split('_')[0] + if object_type in self.object_types: + return partial( + self.object_add, object_type=object_type + ) + return getattr(self.storage, key) + + def content_add(self, content: Iterable[Dict]) -> Dict: + """Enqueue contents to write to the storage. + + Following policies apply: + - First, check if the queue's threshold is hit. If it is flush content + to the storage. + + - If not, check if the total size of enqueued contents's threshold is + hit. If it is flush content to the storage. + + """ + s = self.object_add(content, object_type='content') + if not s: + q = self._objects['content'] + total_size = sum(c['length'] for c in q) + if total_size >= self.min_batch_size['content_bytes']: + return self.flush(['content']) + + return s + + def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: + if object_types is None: + object_types = self.object_types + summary = {} # type: Dict[str, Dict] + for object_type in object_types: + q = self._objects[object_type] + for objs in grouper(q, n=self.min_batch_size[object_type]): + add_fn = getattr(self.storage, '%s_add' % object_type) + s = add_fn(objs) + summary = {k: v + summary.get(k, 0) + for k, v in s.items()} + q.clear() + + return summary + + def object_add(self, objects: Iterable[Dict], *, object_type: str) -> Dict: + """Enqueue objects to write to the storage. This checks if the queue's + threshold is hit. If it is actually write those to the storage. + + """ + q = self._objects[object_type] + threshold = self.min_batch_size[object_type] + q.extend(objects) + if len(q) >= threshold: + return self.flush() + + return {} diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py new file mode 100644 index 0000000..d9a3fa0 --- /dev/null +++ b/swh/storage/tests/test_buffer.py @@ -0,0 +1,179 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.storage.buffer import BufferingProxyStorage + + +def test_buffering_proxy_storage_content_threshold_not_hit(sample_data): + contents = sample_data['content'] + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + min_batch_size={ + 'content': 10, + } + ) + s = storage.content_add([contents[0], contents[1]]) + assert s == {} + + # contents have not been written to storage + missing_contents = storage.content_missing( + [contents[0], contents[1]]) + assert set(missing_contents) == set( + [contents[0]['sha1'], contents[1]['sha1']]) + + s = storage.flush() + assert s == { + 'content:add': 1 + 1, + 'content:add:bytes': contents[0]['length'] + contents[1]['length'], + 'skipped_content:add': 0 + } + + missing_contents = storage.content_missing( + [contents[0], contents[1]]) + assert list(missing_contents) == [] + + +def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data): + contents = sample_data['content'] + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + min_batch_size={ + 'content': 1, + } + ) + + s = storage.content_add([contents[0]]) + assert s == { + 'content:add': 1, + 'content:add:bytes': contents[0]['length'], + 'skipped_content:add': 0 + } + + missing_contents = storage.content_missing([contents[0]]) + assert list(missing_contents) == [] + + s = storage.flush() + assert s == {} + + +def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data): + contents = sample_data['content'] + content_bytes_min_batch_size = 20 + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + min_batch_size={ + 'content': 10, + 'content_bytes': content_bytes_min_batch_size, + } + ) + + assert contents[0]['length'] > content_bytes_min_batch_size + + s = storage.content_add([contents[0]]) + assert s == { + 'content:add': 1, + 'content:add:bytes': contents[0]['length'], + 'skipped_content:add': 0 + } + + missing_contents = storage.content_missing([contents[0]]) + assert list(missing_contents) == [] + + s = storage.flush() + assert s == {} + + +def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data): + directories = sample_data['directory'] + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + min_batch_size={ + 'directory': 10, + } + ) + s = storage.directory_add([directories[0]]) + assert s == {} + + directory_id = directories[0]['id'] + missing_directories = storage.directory_missing( + [directory_id]) + assert list(missing_directories) == [directory_id] + + s = storage.flush() + assert s == { + 'directory:add': 1, + } + + missing_directories = storage.directory_missing( + [directory_id]) + assert list(missing_directories) == [] + + +def test_buffering_proxy_storage_directory_threshold_hit(sample_data): + directories = sample_data['directory'] + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + min_batch_size={ + 'directory': 1, + } + ) + s = storage.directory_add([directories[0]]) + assert s == { + 'directory:add': 1, + } + + missing_directories = storage.directory_missing( + [directories[0]['id']]) + assert list(missing_directories) == [] + + s = storage.flush() + assert s == {} + + +def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data): + revisions = sample_data['revision'] + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + min_batch_size={ + 'revision': 10, + } + ) + s = storage.revision_add([revisions[0]]) + assert s == {} + + revision_id = revisions[0]['id'] + missing_revisions = storage.revision_missing( + [revision_id]) + assert list(missing_revisions) == [revision_id] + + s = storage.flush() + assert s == { + 'revision:add': 1, + } + + missing_revisions = storage.revision_missing( + [revision_id]) + assert list(missing_revisions) == [] + + +def test_buffering_proxy_storage_revision_threshold_hit(sample_data): + revisions = sample_data['revision'] + storage = BufferingProxyStorage( + storage={'cls': 'memory', 'args': {}}, + min_batch_size={ + 'revision': 1, + } + ) + s = storage.revision_add([revisions[0]]) + assert s == { + 'revision:add': 1, + } + + missing_revisions = storage.revision_missing( + [revisions[0]['id']]) + assert list(missing_revisions) == [] + + s = storage.flush() + assert s == {} diff --git a/swh/storage/tests/test_init.py b/swh/storage/tests/test_init.py index 924c383..7309280 100644 --- a/swh/storage/tests/test_init.py +++ b/swh/storage/tests/test_init.py @@ -1,46 +1,50 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from unittest.mock import patch from swh.storage import get_storage from swh.storage.api.client import RemoteStorage from swh.storage.storage import Storage as DbStorage from swh.storage.in_memory import Storage as MemoryStorage +from swh.storage.buffer import BufferingProxyStorage from swh.storage.filter import FilteringProxyStorage @patch('swh.storage.storage.psycopg2.pool') def test_get_storage(mock_pool): """Instantiating an existing storage should be ok """ mock_pool.ThreadedConnectionPool.return_value = None for cls, real_class, dummy_args in [ ('remote', RemoteStorage, {'url': 'url'}), ('memory', MemoryStorage, {}), ('local', DbStorage, { 'db': 'postgresql://db', 'objstorage': { 'cls': 'memory', 'args': {}, }, }), ('filter', FilteringProxyStorage, {'storage': { 'cls': 'memory', 'args': {}} - }) + }), + ('buffer', BufferingProxyStorage, {'storage': { + 'cls': 'memory', 'args': {}} + }), ]: actual_storage = get_storage(cls, args=dummy_args) assert actual_storage is not None assert isinstance(actual_storage, real_class) def test_get_storage_failure(): """Instantiating an unknown storage should raise """ with pytest.raises(ValueError, match='Unknown storage class `unknown`'): get_storage('unknown', args=[])