diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py index 29c48cc8..4f52f3c4 100644 --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -1,106 +1,108 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import deque from functools import partial from typing import Optional, Iterable, Dict from swh.core.utils import grouper from swh.storage import get_storage class BufferingProxyStorage: """Storage implementation in charge of accumulating objects prior to discussing with the "main" storage. Sample configuration use case for buffering storage: .. code-block:: yaml storage: cls: buffer args: storage: cls: remote args: http://storage.internal.staging.swh.network:5002/ min_batch_size: content: 10000 content_bytes: 100000000 directory: 5000 revision: 1000 + release: 10000 """ def __init__(self, storage, min_batch_size=None): self.storage = get_storage(**storage) if min_batch_size is None: min_batch_size = {} self.min_batch_size = { 'content': min_batch_size.get('content', 10000), 'content_bytes': min_batch_size.get('content_bytes', 100*1024*1024), 'directory': min_batch_size.get('directory', 25000), 'revision': min_batch_size.get('revision', 100000), + 'release': min_batch_size.get('release', 100000), } - self.object_types = ['content', 'directory', 'revision'] + self.object_types = ['content', 'directory', 'revision', 'release'] self._objects = {k: deque() for k in self.object_types} def __getattr__(self, key): if key.endswith('_add'): object_type = key.split('_')[0] if object_type in self.object_types: return partial( self.object_add, object_type=object_type ) return getattr(self.storage, key) def content_add(self, content: Iterable[Dict]) -> Dict: """Enqueue contents to write to the storage. Following policies apply: - First, check if the queue's threshold is hit. If it is flush content to the storage. - If not, check if the total size of enqueued contents's threshold is hit. If it is flush content to the storage. """ s = self.object_add(content, object_type='content') if not s: q = self._objects['content'] total_size = sum(c['length'] for c in q) if total_size >= self.min_batch_size['content_bytes']: return self.flush(['content']) return s def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: if object_types is None: object_types = self.object_types summary = {} # type: Dict[str, Dict] for object_type in object_types: q = self._objects[object_type] for objs in grouper(q, n=self.min_batch_size[object_type]): add_fn = getattr(self.storage, '%s_add' % object_type) s = add_fn(objs) summary = {k: v + summary.get(k, 0) for k, v in s.items()} q.clear() return summary def object_add(self, objects: Iterable[Dict], *, object_type: str) -> Dict: """Enqueue objects to write to the storage. This checks if the queue's threshold is hit. If it is actually write those to the storage. """ q = self._objects[object_type] threshold = self.min_batch_size[object_type] q.extend(objects) if len(q) >= threshold: return self.flush() return {} diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py index 8c566088..2bfcb24a 100644 --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -1,179 +1,231 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.storage.buffer import BufferingProxyStorage def test_buffering_proxy_storage_content_threshold_not_hit(sample_data): contents = sample_data['content'] storage = BufferingProxyStorage( storage={'cls': 'memory'}, min_batch_size={ 'content': 10, } ) s = storage.content_add([contents[0], contents[1]]) assert s == {} # contents have not been written to storage missing_contents = storage.content_missing( [contents[0], contents[1]]) assert set(missing_contents) == set( [contents[0]['sha1'], contents[1]['sha1']]) s = storage.flush() assert s == { 'content:add': 1 + 1, 'content:add:bytes': contents[0]['length'] + contents[1]['length'], 'skipped_content:add': 0 } missing_contents = storage.content_missing( [contents[0], contents[1]]) assert list(missing_contents) == [] def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data): contents = sample_data['content'] storage = BufferingProxyStorage( storage={'cls': 'memory'}, min_batch_size={ 'content': 1, } ) s = storage.content_add([contents[0]]) assert s == { 'content:add': 1, 'content:add:bytes': contents[0]['length'], 'skipped_content:add': 0 } missing_contents = storage.content_missing([contents[0]]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data): contents = sample_data['content'] content_bytes_min_batch_size = 2 storage = BufferingProxyStorage( storage={'cls': 'memory'}, min_batch_size={ 'content': 10, 'content_bytes': content_bytes_min_batch_size, } ) assert contents[0]['length'] > content_bytes_min_batch_size s = storage.content_add([contents[0]]) assert s == { 'content:add': 1, 'content:add:bytes': contents[0]['length'], 'skipped_content:add': 0 } missing_contents = storage.content_missing([contents[0]]) assert list(missing_contents) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data): directories = sample_data['directory'] storage = BufferingProxyStorage( storage={'cls': 'memory'}, min_batch_size={ 'directory': 10, } ) s = storage.directory_add([directories[0]]) assert s == {} directory_id = directories[0]['id'] missing_directories = storage.directory_missing( [directory_id]) assert list(missing_directories) == [directory_id] s = storage.flush() assert s == { 'directory:add': 1, } missing_directories = storage.directory_missing( [directory_id]) assert list(missing_directories) == [] def test_buffering_proxy_storage_directory_threshold_hit(sample_data): directories = sample_data['directory'] storage = BufferingProxyStorage( storage={'cls': 'memory'}, min_batch_size={ 'directory': 1, } ) s = storage.directory_add([directories[0]]) assert s == { 'directory:add': 1, } missing_directories = storage.directory_missing( [directories[0]['id']]) assert list(missing_directories) == [] s = storage.flush() assert s == {} def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data): revisions = sample_data['revision'] storage = BufferingProxyStorage( storage={'cls': 'memory'}, min_batch_size={ 'revision': 10, } ) s = storage.revision_add([revisions[0]]) assert s == {} revision_id = revisions[0]['id'] missing_revisions = storage.revision_missing( [revision_id]) assert list(missing_revisions) == [revision_id] s = storage.flush() assert s == { 'revision:add': 1, } missing_revisions = storage.revision_missing( [revision_id]) assert list(missing_revisions) == [] def test_buffering_proxy_storage_revision_threshold_hit(sample_data): revisions = sample_data['revision'] storage = BufferingProxyStorage( storage={'cls': 'memory'}, min_batch_size={ 'revision': 1, } ) s = storage.revision_add([revisions[0]]) assert s == { 'revision:add': 1, } missing_revisions = storage.revision_missing( [revisions[0]['id']]) assert list(missing_revisions) == [] s = storage.flush() assert s == {} + + +def test_buffering_proxy_storage_release_threshold_not_hit(sample_data): + releases = sample_data['release'] + threshold = 10 + + assert len(releases) < threshold + storage = BufferingProxyStorage( + storage={'cls': 'memory'}, + min_batch_size={ + 'release': threshold, # configuration set + } + ) + s = storage.release_add(releases) + assert s == {} + + release_ids = [r['id'] for r in releases] + missing_releases = storage.release_missing(release_ids) + assert list(missing_releases) == release_ids + + s = storage.flush() + assert s == { + 'release:add': len(releases), + } + + missing_releases = storage.release_missing(release_ids) + assert list(missing_releases) == [] + + +def test_buffering_proxy_storage_release_threshold_hit(sample_data): + releases = sample_data['release'] + threshold = 2 + assert len(releases) > threshold + + storage = BufferingProxyStorage( + storage={'cls': 'memory'}, + min_batch_size={ + 'release': threshold, # configuration set + } + ) + + s = storage.release_add(releases) + assert s == { + 'release:add': len(releases), + } + + release_ids = [r['id'] for r in releases] + missing_releases = storage.release_missing(release_ids) + assert list(missing_releases) == [] + + s = storage.flush() + assert s == {}