diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -131,3 +131,22 @@ return self.flush() return {} + + def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: + """Clear objects from current buffer. + + WARNING: + + data that has not been flushed to storage will be lost when this + method is called. This should only be called when `flush` fails and + you want to continue your processing. + + """ + if object_types is None: + object_types = self.object_types + + for object_type in object_types: + q = self._objects[object_type] + q.clear() + + return self.storage.clear_buffers(object_types) diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -1002,3 +1002,9 @@ def metadata_provider_get_by(self, provider): # TODO raise NotImplementedError("not yet supported for Cassandra") + + def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: + """Do nothing + + """ + return None diff --git a/swh/storage/filter.py b/swh/storage/filter.py --- a/swh/storage/filter.py +++ b/swh/storage/filter.py @@ -1,10 +1,10 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Dict, Iterable, Set +from typing import Dict, Iterable, Optional, Set from swh.model.model import ( Content, @@ -32,14 +32,13 @@ """ + object_types = ["content", "skipped_content", "directory", "revision"] + objects_seen: Dict[str, Set[bytes]] = {} + def __init__(self, storage): self.storage = get_storage(**storage) - self.objects_seen = { - "content": set(), # sha256 - "skipped_content": set(), # sha1_git - "directory": set(), # sha1_git - "revision": set(), # sha1_git - } + for object_type in self.object_types: + self.objects_seen[object_type] = set() def __getattr__(self, key): if key == "storage": @@ -137,3 +136,15 @@ fn = fn_by_object_type[object_type] return set(fn(missing_ids)) + + def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: + """Clear objects from current buffer + + """ + if object_types is None: + object_types = self.object_types + + for object_type in object_types: + self.objects_seen[object_type] = set() + + return self.storage.clear_buffers(object_types) diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -1016,3 +1016,9 @@ def diff_revision(self, revision, track_renaming=False): raise NotImplementedError("InMemoryStorage.diff_revision") + + def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: + """Do nothing + + """ + return None diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -1303,3 +1303,12 @@ for more details). """ ... + + @remote_api_endpoint("clear/buffer") + def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: + """For backend storages (pg, storage, in-memory), this is a noop operation. For proxy + storages (especially filter, buffer), this is an operation which cleans internal + state. + + """ + ... diff --git a/swh/storage/retry.py b/swh/storage/retry.py --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -176,3 +176,6 @@ if hasattr(self.storage, "flush"): return self.storage.flush(object_types) return {} + + def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: + return self.storage.clear_buffers(object_types) diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -1215,3 +1215,9 @@ @timed def diff_revision(self, revision, track_renaming=False): return diff.diff_revision(self, revision, track_renaming) + + def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: + """Do nothing + + """ + return None diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -336,3 +336,69 @@ s = storage.flush() assert s == {} + + +def test_buffering_proxy_storage_clear(sample_data): + """Clear operation on buffer + + """ + threshold = 10 + contents = sample_data["content"] + assert 0 < len(contents) < threshold + skipped_contents = sample_data["skipped_content"] + assert 0 < len(skipped_contents) < threshold + directories = sample_data["directory"] + assert 0 < len(directories) < threshold + revisions = sample_data["revision"] + assert 0 < len(revisions) < threshold + releases = sample_data["release"] + assert 0 < len(releases) < threshold + + storage = get_storage_with_buffer_config( + min_batch_size={ + "content": threshold, + "skipped_content": threshold, + "directory": threshold, + "revision": threshold, + "release": threshold, + } + ) + + s = storage.content_add(contents) + assert s == {} + s = storage.skipped_content_add(skipped_contents) + assert s == {} + s = storage.directory_add(directories) + assert s == {} + s = storage.revision_add(revisions) + assert s == {} + s = storage.release_add(releases) + assert s == {} + + assert len(storage._objects["content"]) == len(contents) + assert len(storage._objects["skipped_content"]) == len(skipped_contents) + assert len(storage._objects["directory"]) == len(directories) + assert len(storage._objects["revision"]) == len(revisions) + assert len(storage._objects["release"]) == len(releases) + + # clear only content from the buffer + s = storage.clear_buffers(["content"]) + assert s is None + + # specific clear operation on specific object type content only touched + # them + assert len(storage._objects["content"]) == 0 + assert len(storage._objects["skipped_content"]) == len(skipped_contents) + assert len(storage._objects["directory"]) == len(directories) + assert len(storage._objects["revision"]) == len(revisions) + assert len(storage._objects["release"]) == len(releases) + + # clear current buffer from all object types + s = storage.clear_buffers() + assert s is None + + assert len(storage._objects["content"]) == 0 + assert len(storage._objects["skipped_content"]) == 0 + assert len(storage._objects["directory"]) == 0 + assert len(storage._objects["revision"]) == 0 + assert len(storage._objects["release"]) == 0 diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py --- a/swh/storage/tests/test_filter.py +++ b/swh/storage/tests/test_filter.py @@ -123,3 +123,64 @@ assert s == { "directory:add": 0, } + + +def test_filtering_proxy_storage_clear(sample_data): + """Clear operation on filter proxy + + """ + threshold = 10 + contents = sample_data['content'] + assert 0 < len(contents) < threshold + skipped_contents = sample_data['skipped_content'] + assert 0 < len(skipped_contents) < threshold + directories = sample_data['directory'] + assert 0 < len(directories) < threshold + revisions = sample_data['revision'] + assert 0 < len(revisions) < threshold + releases = sample_data['release'] + assert 0 < len(releases) < threshold + + storage = get_storage(**storage_config) + + s = storage.content_add(contents) + assert s['content:add'] == len(contents) + s = storage.skipped_content_add(skipped_contents) + assert s == { + 'skipped_content:add': len(directories), + } + s = storage.directory_add(directories) + assert s == { + 'directory:add': len(directories), + } + s = storage.revision_add(revisions) + assert s == { + 'revision:add': len(revisions), + } + + assert len(storage.objects_seen['content']) == len(contents) + assert len(storage.objects_seen['skipped_content']) == len( + skipped_contents) + assert len(storage.objects_seen['directory']) == len(directories) + assert len(storage.objects_seen['revision']) == len(revisions) + + # clear only content from the buffer + s = storage.clear_buffers(['content']) + assert s is None + + # specific clear operation on specific object type content only touched + # them + assert len(storage.objects_seen['content']) == 0 + assert len(storage.objects_seen['skipped_content']) == len( + skipped_contents) + assert len(storage.objects_seen['directory']) == len(directories) + assert len(storage.objects_seen['revision']) == len(revisions) + + # clear current buffer from all object types + s = storage.clear_buffers() + assert s is None + + assert len(storage.objects_seen['content']) == 0 + assert len(storage.objects_seen['skipped_content']) == 0 + assert len(storage.objects_seen['directory']) == 0 + assert len(storage.objects_seen['revision']) == 0 diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -3990,3 +3990,9 @@ "absent", "Content too long", ) + + def test_clear_buffers(self, swh_storage): + """Calling clear buffers on real storage does nothing + + """ + assert swh_storage.clear_buffers() is None diff --git a/swh/storage/validate.py b/swh/storage/validate.py --- a/swh/storage/validate.py +++ b/swh/storage/validate.py @@ -5,7 +5,7 @@ import datetime import contextlib -from typing import Dict, Iterable, List +from typing import Dict, Iterable, List, Optional from swh.model.model import ( BaseModel, @@ -107,3 +107,6 @@ with convert_validation_exceptions(): origin = Origin.from_dict(origin) return self.storage.origin_add_one(origin) + + def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: + return self.storage.clear_buffers(object_types)