diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -115,3 +115,23 @@ return self.flush() return {} + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + """Clear objects from current buffer. + + WARNING: + + data that has not been flushed to storage will be lost when this + method is called. This should only be called when `flush` fails and + you want to continue your processing. + + """ + if object_types is None: + object_types = self.object_types + + for object_type in object_types: + q = self._objects[object_type] + q.clear() + + return self.storage.clear_buffers(object_types) diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -979,3 +979,10 @@ def metadata_provider_get_by(self, provider): # TODO raise NotImplementedError('not yet supported for Cassandra') + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + """Do nothing + + """ + return None diff --git a/swh/storage/filter.py b/swh/storage/filter.py --- a/swh/storage/filter.py +++ b/swh/storage/filter.py @@ -1,10 +1,10 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Dict, Iterable, Set +from typing import Dict, Iterable, Optional, Set from swh.model.model import ( Content, SkippedContent, Directory, Revision, @@ -28,14 +28,13 @@ url: http://storage.internal.staging.swh.network:5002/ """ + object_types = ['content', 'skipped_content', 'directory', 'revision'] + objects_seen: Dict[str, Set[bytes]] = {} + def __init__(self, storage): self.storage = get_storage(**storage) - self.objects_seen = { - 'content': set(), # sha256 - 'skipped_content': set(), # sha1_git - 'directory': set(), # sha1_git - 'revision': set(), # sha1_git - } + for object_type in self.object_types: + self.objects_seen[object_type] = set() def __getattr__(self, key): if key == 'storage': @@ -149,3 +148,16 @@ fn = fn_by_object_type[object_type] return set(fn(missing_ids)) + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + """Clear objects from current buffer + + """ + if object_types is None: + object_types = self.object_types + + for object_type in object_types: + self.objects_seen[object_type] = set() + + return self.storage.clear_buffers(object_types) diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -1005,3 +1005,10 @@ def diff_revision(self, revision, track_renaming=False): raise NotImplementedError('InMemoryStorage.diff_revision') + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + """Do nothing + + """ + return None diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -1288,3 +1288,15 @@ for more details). """ ... + + @remote_api_endpoint('clear/buffer') + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + """For real backend storages (pg, storage, in-memory), this is expected to be a + noop operation doing nothing. + + For proxy storages (especially filter, buffer), this is expected to + have a real effect of cleaning the internal state. + + """ + ... diff --git a/swh/storage/retry.py b/swh/storage/retry.py --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -149,3 +149,7 @@ if hasattr(self.storage, 'flush'): return self.storage.flush(object_types) return {} + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + return self.storage.clear_buffers(object_types) diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -1154,3 +1154,10 @@ @timed def diff_revision(self, revision, track_renaming=False): return diff.diff_revision(self, revision, track_renaming) + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + """Do nothing + + """ + return None diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -277,3 +277,69 @@ s = storage.flush() assert s == {} + + +def test_buffering_proxy_storage_clear(sample_data): + """Clear operation on buffer + + """ + threshold = 10 + contents = sample_data['content'] + assert 0 < len(contents) < threshold + skipped_contents = sample_data['skipped_content'] + assert 0 < len(skipped_contents) < threshold + directories = sample_data['directory'] + assert 0 < len(directories) < threshold + revisions = sample_data['revision'] + assert 0 < len(revisions) < threshold + releases = sample_data['release'] + assert 0 < len(releases) < threshold + + storage = get_storage_with_buffer_config( + min_batch_size={ + 'content': threshold, + 'skipped_content': threshold, + 'directory': threshold, + 'revision': threshold, + 'release': threshold, + } + ) + + s = storage.content_add(contents) + assert s == {} + s = storage.skipped_content_add(skipped_contents) + assert s == {} + s = storage.directory_add(directories) + assert s == {} + s = storage.revision_add(revisions) + assert s == {} + s = storage.release_add(releases) + assert s == {} + + assert len(storage._objects['content']) == len(contents) + assert len(storage._objects['skipped_content']) == len(skipped_contents) + assert len(storage._objects['directory']) == len(directories) + assert len(storage._objects['revision']) == len(revisions) + assert len(storage._objects['release']) == len(releases) + + # clear only content from the buffer + s = storage.clear_buffers(['content']) + assert s is None + + # specific clear operation on specific object type content only touched + # them + assert len(storage._objects['content']) == 0 + assert len(storage._objects['skipped_content']) == len(skipped_contents) + assert len(storage._objects['directory']) == len(directories) + assert len(storage._objects['revision']) == len(revisions) + assert len(storage._objects['release']) == len(releases) + + # clear current buffer from all object types + s = storage.clear_buffers() + assert s is None + + assert len(storage._objects['content']) == 0 + assert len(storage._objects['skipped_content']) == 0 + assert len(storage._objects['directory']) == 0 + assert len(storage._objects['revision']) == 0 + assert len(storage._objects['release']) == 0 diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py --- a/swh/storage/tests/test_filter.py +++ b/swh/storage/tests/test_filter.py @@ -3,127 +3,187 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import pytest from swh.storage import get_storage -storage_config = { - 'cls': 'pipeline', - 'steps': [ - {'cls': 'validate'}, - {'cls': 'filter'}, - {'cls': 'memory'}, - ] -} +@pytest.fixture +def swh_storage(): + storage_config = { + 'cls': 'pipeline', + 'steps': [ + {'cls': 'validate'}, + {'cls': 'filter'}, + {'cls': 'memory'}, + ] + } + + return get_storage(**storage_config) -def test_filtering_proxy_storage_content(sample_data): +def test_filtering_proxy_storage_content(swh_storage, sample_data): sample_content = sample_data['content'][0] - storage = get_storage(**storage_config) - content = next(storage.content_get([sample_content['sha1']])) + content = next(swh_storage.content_get([sample_content['sha1']])) assert not content - s = storage.content_add([sample_content]) + s = swh_storage.content_add([sample_content]) assert s == { 'content:add': 1, 'content:add:bytes': sample_content['length'], } - content = next(storage.content_get([sample_content['sha1']])) + content = next(swh_storage.content_get([sample_content['sha1']])) assert content is not None - s = storage.content_add([sample_content]) + s = swh_storage.content_add([sample_content]) assert s == { 'content:add': 0, 'content:add:bytes': 0, } -def test_filtering_proxy_storage_skipped_content(sample_data): +def test_filtering_proxy_storage_skipped_content(swh_storage, sample_data): sample_content = sample_data['skipped_content'][0] - storage = get_storage(**storage_config) - content = next(storage.skipped_content_missing([sample_content])) + content = next(swh_storage.skipped_content_missing([sample_content])) assert content['sha1'] == sample_content['sha1'] - s = storage.skipped_content_add([sample_content]) + s = swh_storage.skipped_content_add([sample_content]) assert s == { 'skipped_content:add': 1, } - content = list(storage.skipped_content_missing([sample_content])) + content = list(swh_storage.skipped_content_missing([sample_content])) assert content == [] - s = storage.skipped_content_add([sample_content]) + s = swh_storage.skipped_content_add([sample_content]) assert s == { 'skipped_content:add': 0, } -def test_filtering_proxy_storage_skipped_content_missing_sha1_git(sample_data): +def test_filtering_proxy_storage_skipped_content_missing_sha1_git( + swh_storage, sample_data): sample_content = sample_data['skipped_content'][0] sample_content2 = sample_data['skipped_content'][1] - storage = get_storage(**storage_config) sample_content['sha1_git'] = sample_content2['sha1_git'] = None - content = next(storage.skipped_content_missing([sample_content])) + content = next(swh_storage.skipped_content_missing([sample_content])) assert content['sha1'] == sample_content['sha1'] - s = storage.skipped_content_add([sample_content]) + s = swh_storage.skipped_content_add([sample_content]) assert s == { 'skipped_content:add': 1, } - content = list(storage.skipped_content_missing([sample_content])) + content = list(swh_storage.skipped_content_missing([sample_content])) assert content == [] - s = storage.skipped_content_add([sample_content2]) + s = swh_storage.skipped_content_add([sample_content2]) assert s == { 'skipped_content:add': 1, } - content = list(storage.skipped_content_missing([sample_content2])) + content = list(swh_storage.skipped_content_missing([sample_content2])) assert content == [] -def test_filtering_proxy_storage_revision(sample_data): +def test_filtering_proxy_storage_revision(swh_storage, sample_data): sample_revision = sample_data['revision'][0] - storage = get_storage(**storage_config) - revision = next(storage.revision_get([sample_revision['id']])) + revision = next(swh_storage.revision_get([sample_revision['id']])) assert not revision - s = storage.revision_add([sample_revision]) + s = swh_storage.revision_add([sample_revision]) assert s == { 'revision:add': 1, } - revision = next(storage.revision_get([sample_revision['id']])) + revision = next(swh_storage.revision_get([sample_revision['id']])) assert revision is not None - s = storage.revision_add([sample_revision]) + s = swh_storage.revision_add([sample_revision]) assert s == { 'revision:add': 0, } -def test_filtering_proxy_storage_directory(sample_data): +def test_filtering_proxy_storage_directory(swh_storage, sample_data): sample_directory = sample_data['directory'][0] - storage = get_storage(**storage_config) - directory = next(storage.directory_missing([sample_directory['id']])) + directory = next(swh_storage.directory_missing([sample_directory['id']])) assert directory - s = storage.directory_add([sample_directory]) + s = swh_storage.directory_add([sample_directory]) assert s == { 'directory:add': 1, } - directory = list(storage.directory_missing([sample_directory['id']])) + directory = list(swh_storage.directory_missing([sample_directory['id']])) assert not directory - s = storage.directory_add([sample_directory]) + s = swh_storage.directory_add([sample_directory]) assert s == { 'directory:add': 0, } + + +def test_filtering_proxy_storage_clear(swh_storage, sample_data): + """Clear operation on filter proxy + + """ + threshold = 10 + contents = sample_data['content'] + assert 0 < len(contents) < threshold + skipped_contents = sample_data['skipped_content'] + assert 0 < len(skipped_contents) < threshold + directories = sample_data['directory'] + assert 0 < len(directories) < threshold + revisions = sample_data['revision'] + assert 0 < len(revisions) < threshold + releases = sample_data['release'] + assert 0 < len(releases) < threshold + + s = swh_storage.content_add(contents) + assert s['content:add'] == len(contents) + s = swh_storage.skipped_content_add(skipped_contents) + assert s == { + 'skipped_content:add': len(directories), + } + s = swh_storage.directory_add(directories) + assert s == { + 'directory:add': len(directories), + } + s = swh_storage.revision_add(revisions) + assert s == { + 'revision:add': len(revisions), + } + + assert len(swh_storage.objects_seen['content']) == len(contents) + assert len(swh_storage.objects_seen['skipped_content']) == len( + skipped_contents) + assert len(swh_storage.objects_seen['directory']) == len(directories) + assert len(swh_storage.objects_seen['revision']) == len(revisions) + + # clear only content from the buffer + s = swh_storage.clear_buffers(['content']) + assert s is None + + # specific clear operation on specific object type content only touched + # them + assert len(swh_storage.objects_seen['content']) == 0 + assert len(swh_storage.objects_seen['skipped_content']) == len( + skipped_contents) + assert len(swh_storage.objects_seen['directory']) == len(directories) + assert len(swh_storage.objects_seen['revision']) == len(revisions) + + # clear current buffer from all object types + s = swh_storage.clear_buffers() + assert s is None + + assert len(swh_storage.objects_seen['content']) == 0 + assert len(swh_storage.objects_seen['skipped_content']) == 0 + assert len(swh_storage.objects_seen['directory']) == 0 + assert len(swh_storage.objects_seen['revision']) == 0 diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -3900,3 +3900,9 @@ assert dbdata[1] == (cont2['sha1'], cont2['sha1_git'], cont2['sha256'], cont2['blake2s256'], cont2['length'], 'absent', 'Content too long') + + def test_clear_buffers(self, swh_storage): + """Calling clear buffers on real storage does nothing + + """ + assert swh_storage.clear_buffers() is None diff --git a/swh/storage/validate.py b/swh/storage/validate.py --- a/swh/storage/validate.py +++ b/swh/storage/validate.py @@ -5,7 +5,7 @@ import datetime import contextlib -from typing import Dict, Iterable, List +from typing import Dict, Iterable, List, Optional from swh.model.model import ( BaseModel, SkippedContent, Content, Directory, Revision, Release, Snapshot, @@ -100,3 +100,7 @@ with convert_validation_exceptions(): origin = Origin.from_dict(origin) return self.storage.origin_add_one(origin) + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + return self.storage.clear_buffers(object_types)