diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -3,9 +3,8 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections import deque from functools import partial -from typing import Optional, Iterable, Dict +from typing import Dict, Iterable, List, Optional from swh.core.utils import grouper from swh.model.model import Content, BaseModel @@ -52,14 +51,15 @@ } self.object_types = [ 'content', 'skipped_content', 'directory', 'revision', 'release'] - self._objects = {k: deque() for k in self.object_types} + self._objects = {k: {} for k in self.object_types} def __getattr__(self, key): if key.endswith('_add'): object_type = key.rsplit('_', 1)[0] if object_type in self.object_types: return partial( - self.object_add, object_type=object_type + self.object_add, object_type=object_type, + keys=['id'], ) if key == 'storage': raise AttributeError(key) @@ -78,40 +78,52 @@ """ content = list(content) - s = self.object_add(content, object_type='content') + s = self.object_add( + content, object_type='content', + keys=['sha1', 'sha1_git', 'sha256', 'blake2s256']) if not s: - q = self._objects['content'] - total_size = sum(c.length for c in q) + buffer_ = self._objects['content'].values() + total_size = sum(c.length for c in buffer_) if total_size >= self.min_batch_size['content_bytes']: return self.flush(['content']) return s + def skipped_content_add(self, content: Iterable[Content]) -> Dict: + return self.object_add( + content, object_type='skipped_content', + keys=['sha1', 'sha1_git', 'sha256', 'blake2s256']) + def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: if object_types is None: object_types = self.object_types summary = {} # type: Dict[str, Dict] for object_type in object_types: - q = self._objects[object_type] - for objs in grouper(q, n=self.min_batch_size[object_type]): + buffer_ = self._objects[object_type] + batches = grouper( + buffer_.values(), n=self.min_batch_size[object_type]) + for batch in batches: add_fn = getattr(self.storage, '%s_add' % object_type) - s = add_fn(objs) + s = add_fn(batch) summary = {k: v + summary.get(k, 0) for k, v in s.items()} - q.clear() + buffer_.clear() return summary def object_add( - self, objects: Iterable[BaseModel], *, object_type: str) -> Dict: + self, objects: Iterable[BaseModel], *, + object_type: str, keys: List[str]) -> Dict: """Enqueue objects to write to the storage. This checks if the queue's threshold is hit. If it is actually write those to the storage. """ - q = self._objects[object_type] + buffer_ = self._objects[object_type] threshold = self.min_batch_size[object_type] - q.extend(objects) - if len(q) >= threshold: + for obj in objects: + obj_key = tuple(getattr(obj, key) for key in keys) + buffer_[obj_key] = obj + if len(buffer_) >= threshold: return self.flush() return {} diff --git a/swh/storage/tests/conftest.py b/swh/storage/tests/conftest.py --- a/swh/storage/tests/conftest.py +++ b/swh/storage/tests/conftest.py @@ -233,7 +233,7 @@ 'skipped_content': [data.skipped_cont, data.skipped_cont2], 'person': [data.person], 'directory': [data.dir2, data.dir], - 'revision': [data.revision], + 'revision': [data.revision, data.revision2, data.revision3], 'release': [data.release, data.release2, data.release3], 'snapshot': [data.snapshot], 'origin': [data.origin, data.origin2], diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -67,6 +67,34 @@ assert s == {} +def test_buffering_proxy_storage_content_deduplicate(sample_data): + contents = sample_data['content'] + storage = get_storage_with_buffer_config( + min_batch_size={ + 'content': 2, + } + ) + + s = storage.content_add([contents[0], contents[0]]) + assert s == {} + + s = storage.content_add([contents[0]]) + assert s == {} + + s = storage.content_add([contents[1]]) + assert s == { + 'content:add': 1 + 1, + 'content:add:bytes': contents[0]['length'] + contents[1]['length'], + } + + missing_contents = storage.content_missing( + [contents[0], contents[1]]) + assert list(missing_contents) == [] + + s = storage.flush() + assert s == {} + + def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data): contents = sample_data['content'] content_bytes_min_batch_size = 2 @@ -139,6 +167,33 @@ assert s == {} +def test_buffering_proxy_storage_skipped_content_deduplicate(sample_data): + contents = sample_data['skipped_content'] + storage = get_storage_with_buffer_config( + min_batch_size={ + 'skipped_content': 2, + } + ) + + s = storage.skipped_content_add([contents[0], contents[0]]) + assert s == {} + + s = storage.skipped_content_add([contents[0]]) + assert s == {} + + s = storage.skipped_content_add([contents[1]]) + assert s == { + 'skipped_content:add': 1 + 1, + } + + missing_contents = storage.skipped_content_missing( + [contents[0], contents[1]]) + assert list(missing_contents) == [] + + s = storage.flush() + assert s == {} + + def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data): directories = sample_data['directory'] storage = get_storage_with_buffer_config( @@ -184,6 +239,33 @@ assert s == {} +def test_buffering_proxy_storage_directory_deduplicate(sample_data): + directories = sample_data['directory'] + storage = get_storage_with_buffer_config( + min_batch_size={ + 'directory': 2, + } + ) + + s = storage.directory_add([directories[0], directories[0]]) + assert s == {} + + s = storage.directory_add([directories[0]]) + assert s == {} + + s = storage.directory_add([directories[1]]) + assert s == { + 'directory:add': 1 + 1, + } + + missing_directories = storage.directory_missing( + [directories[0]['id'], directories[1]['id']]) + assert list(missing_directories) == [] + + s = storage.flush() + assert s == {} + + def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data): revisions = sample_data['revision'] storage = get_storage_with_buffer_config( @@ -229,6 +311,33 @@ assert s == {} +def test_buffering_proxy_storage_revision_deduplicate(sample_data): + revisions = sample_data['revision'] + storage = get_storage_with_buffer_config( + min_batch_size={ + 'revision': 2, + } + ) + + s = storage.revision_add([revisions[0], revisions[0]]) + assert s == {} + + s = storage.revision_add([revisions[0]]) + assert s == {} + + s = storage.revision_add([revisions[1]]) + assert s == { + 'revision:add': 1 + 1, + } + + missing_revisions = storage.revision_missing( + [revisions[0]['id'], revisions[1]['id']]) + assert list(missing_revisions) == [] + + s = storage.flush() + assert s == {} + + def test_buffering_proxy_storage_release_threshold_not_hit(sample_data): releases = sample_data['release'] threshold = 10 @@ -277,3 +386,30 @@ s = storage.flush() assert s == {} + + +def test_buffering_proxy_storage_release_deduplicate(sample_data): + releases = sample_data['release'] + storage = get_storage_with_buffer_config( + min_batch_size={ + 'release': 2, + } + ) + + s = storage.release_add([releases[0], releases[0]]) + assert s == {} + + s = storage.release_add([releases[0]]) + assert s == {} + + s = storage.release_add([releases[1]]) + assert s == { + 'release:add': 1 + 1, + } + + missing_releases = storage.release_missing( + [releases[0]['id'], releases[1]['id']]) + assert list(missing_releases) == [] + + s = storage.flush() + assert s == {}