diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -115,3 +115,23 @@ return self.flush() return {} + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + """Clear objects from current buffer. + + WARNING: + + data that has not been flushed to storage will be lost when this + method is called. This should only be called when `flush` fails and + you want to continue your processing. + + """ + if object_types is None: + object_types = self.object_types + + for object_type in object_types: + q = self._objects[object_type] + q.clear() + + return self.storage.clear_buffers(object_types) diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -979,3 +979,10 @@ def metadata_provider_get_by(self, provider): # TODO raise NotImplementedError('not yet supported for Cassandra') + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + """Do nothing + + """ + return None diff --git a/swh/storage/filter.py b/swh/storage/filter.py --- a/swh/storage/filter.py +++ b/swh/storage/filter.py @@ -1,10 +1,10 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Dict, Iterable, Set +from typing import Dict, Iterable, Optional, Set from swh.model.model import ( Content, SkippedContent, Directory, Revision, @@ -28,14 +28,13 @@ url: http://storage.internal.staging.swh.network:5002/ """ + object_types = ['content', 'skipped_content', 'directory', 'revision'] + objects_seen: Dict[str, Set[bytes]] = {} + def __init__(self, storage): self.storage = get_storage(**storage) - self.objects_seen = { - 'content': set(), # sha256 - 'skipped_content': set(), # sha1_git - 'directory': set(), # sha1_git - 'revision': set(), # sha1_git - } + for object_type in self.object_types: + self.objects_seen[object_type] = set() def __getattr__(self, key): if key == 'storage': @@ -149,3 +148,16 @@ fn = fn_by_object_type[object_type] return set(fn(missing_ids)) + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + """Clear objects from current buffer + + """ + if object_types is None: + object_types = self.object_types + + for object_type in object_types: + self.objects_seen[object_type] = set() + + return self.storage.clear_buffers(object_types) diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -1005,3 +1005,10 @@ def diff_revision(self, revision, track_renaming=False): raise NotImplementedError('InMemoryStorage.diff_revision') + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + """Do nothing + + """ + return None diff --git a/swh/storage/retry.py b/swh/storage/retry.py --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -149,3 +149,7 @@ if hasattr(self.storage, 'flush'): return self.storage.flush(object_types) return {} + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + return self.storage.clear_buffers(object_types) diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -1152,3 +1152,10 @@ @timed def diff_revision(self, revision, track_renaming=False): return diff.diff_revision(self, revision, track_renaming) + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + """Do nothing + + """ + return None diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -277,3 +277,69 @@ s = storage.flush() assert s == {} + + +def test_buffering_proxy_storage_clear(sample_data): + """Clear operation on buffer + + """ + threshold = 10 + contents = sample_data['content'] + assert 0 < len(contents) < threshold + skipped_contents = sample_data['skipped_content'] + assert 0 < len(skipped_contents) < threshold + directories = sample_data['directory'] + assert 0 < len(directories) < threshold + revisions = sample_data['revision'] + assert 0 < len(revisions) < threshold + releases = sample_data['release'] + assert 0 < len(releases) < threshold + + storage = get_storage_with_buffer_config( + min_batch_size={ + 'content': threshold, + 'skipped_content': threshold, + 'directory': threshold, + 'revision': threshold, + 'release': threshold, + } + ) + + s = storage.content_add(contents) + assert s == {} + s = storage.skipped_content_add(skipped_contents) + assert s == {} + s = storage.directory_add(directories) + assert s == {} + s = storage.revision_add(revisions) + assert s == {} + s = storage.release_add(releases) + assert s == {} + + assert len(storage._objects['content']) == len(contents) + assert len(storage._objects['skipped_content']) == len(skipped_contents) + assert len(storage._objects['directory']) == len(directories) + assert len(storage._objects['revision']) == len(revisions) + assert len(storage._objects['release']) == len(releases) + + # clear only content from the buffer + s = storage.clear_buffers(['content']) + assert s is None + + # specific clear operation on specific object type content only touched + # them + assert len(storage._objects['content']) == 0 + assert len(storage._objects['skipped_content']) == len(skipped_contents) + assert len(storage._objects['directory']) == len(directories) + assert len(storage._objects['revision']) == len(revisions) + assert len(storage._objects['release']) == len(releases) + + # clear current buffer from all object types + s = storage.clear_buffers() + assert s is None + + assert len(storage._objects['content']) == 0 + assert len(storage._objects['skipped_content']) == 0 + assert len(storage._objects['directory']) == 0 + assert len(storage._objects['revision']) == 0 + assert len(storage._objects['release']) == 0 diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py --- a/swh/storage/tests/test_filter.py +++ b/swh/storage/tests/test_filter.py @@ -7,19 +7,22 @@ from swh.storage import get_storage -storage_config = { - 'cls': 'pipeline', - 'steps': [ - {'cls': 'validate'}, - {'cls': 'filter'}, - {'cls': 'memory'}, - ] -} +def get_storage_with_filter_config(): + storage_config = { + 'cls': 'pipeline', + 'steps': [ + {'cls': 'validate'}, + {'cls': 'filter'}, + {'cls': 'memory'}, + ] + } + + return get_storage(**storage_config) def test_filtering_proxy_storage_content(sample_data): sample_content = sample_data['content'][0] - storage = get_storage(**storage_config) + storage = get_storage_with_filter_config() content = next(storage.content_get([sample_content['sha1']])) assert not content @@ -42,7 +45,7 @@ def test_filtering_proxy_storage_skipped_content(sample_data): sample_content = sample_data['skipped_content'][0] - storage = get_storage(**storage_config) + storage = get_storage_with_filter_config() content = next(storage.skipped_content_missing([sample_content])) assert content['sha1'] == sample_content['sha1'] @@ -64,7 +67,7 @@ def test_filtering_proxy_storage_skipped_content_missing_sha1_git(sample_data): sample_content = sample_data['skipped_content'][0] sample_content2 = sample_data['skipped_content'][1] - storage = get_storage(**storage_config) + storage = get_storage_with_filter_config() sample_content['sha1_git'] = sample_content2['sha1_git'] = None content = next(storage.skipped_content_missing([sample_content])) @@ -89,7 +92,7 @@ def test_filtering_proxy_storage_revision(sample_data): sample_revision = sample_data['revision'][0] - storage = get_storage(**storage_config) + storage = get_storage_with_filter_config() revision = next(storage.revision_get([sample_revision['id']])) assert not revision @@ -110,7 +113,7 @@ def test_filtering_proxy_storage_directory(sample_data): sample_directory = sample_data['directory'][0] - storage = get_storage(**storage_config) + storage = get_storage_with_filter_config() directory = next(storage.directory_missing([sample_directory['id']])) assert directory @@ -127,3 +130,64 @@ assert s == { 'directory:add': 0, } + + +def test_filtering_proxy_storage_clear(sample_data): + """Clear operation on filter proxy + + """ + threshold = 10 + contents = sample_data['content'] + assert 0 < len(contents) < threshold + skipped_contents = sample_data['skipped_content'] + assert 0 < len(skipped_contents) < threshold + directories = sample_data['directory'] + assert 0 < len(directories) < threshold + revisions = sample_data['revision'] + assert 0 < len(revisions) < threshold + releases = sample_data['release'] + assert 0 < len(releases) < threshold + + storage = get_storage_with_filter_config() + + s = storage.content_add(contents) + assert s['content:add'] == len(contents) + s = storage.skipped_content_add(skipped_contents) + assert s == { + 'skipped_content:add': len(directories), + } + s = storage.directory_add(directories) + assert s == { + 'directory:add': len(directories), + } + s = storage.revision_add(revisions) + assert s == { + 'revision:add': len(revisions), + } + + assert len(storage.objects_seen['content']) == len(contents) + assert len(storage.objects_seen['skipped_content']) == len( + skipped_contents) + assert len(storage.objects_seen['directory']) == len(directories) + assert len(storage.objects_seen['revision']) == len(revisions) + + # clear only content from the buffer + s = storage.clear_buffers(['content']) + assert s is None + + # specific clear operation on specific object type content only touched + # them + assert len(storage.objects_seen['content']) == 0 + assert len(storage.objects_seen['skipped_content']) == len( + skipped_contents) + assert len(storage.objects_seen['directory']) == len(directories) + assert len(storage.objects_seen['revision']) == len(revisions) + + # clear current buffer from all object types + s = storage.clear_buffers() + assert s is None + + assert len(storage.objects_seen['content']) == 0 + assert len(storage.objects_seen['skipped_content']) == 0 + assert len(storage.objects_seen['directory']) == 0 + assert len(storage.objects_seen['revision']) == 0 diff --git a/swh/storage/validate.py b/swh/storage/validate.py --- a/swh/storage/validate.py +++ b/swh/storage/validate.py @@ -5,7 +5,7 @@ import datetime import contextlib -from typing import Dict, Iterable, List +from typing import Dict, Iterable, List, Optional from swh.model.model import ( BaseModel, SkippedContent, Content, Directory, Revision, Release, Snapshot, @@ -100,3 +100,7 @@ with convert_validation_exceptions(): origin = Origin.from_dict(origin) return self.storage.origin_add_one(origin) + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + return self.storage.clear_buffers(object_types)