diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -131,3 +131,22 @@ return self.flush() return {} + + def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: + """Clear objects from current buffer. + + WARNING: + + data that has not been flushed to storage will be lost when this + method is called. This should only be called when `flush` fails and + you want to continue your processing. + + """ + if object_types is None: + object_types = self.object_types + + for object_type in object_types: + q = self._objects[object_type] + q.clear() + + return self.storage.clear_buffers(object_types) diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -1002,3 +1002,9 @@ def metadata_provider_get_by(self, provider): # TODO raise NotImplementedError("not yet supported for Cassandra") + + def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: + """Do nothing + + """ + return None diff --git a/swh/storage/filter.py b/swh/storage/filter.py --- a/swh/storage/filter.py +++ b/swh/storage/filter.py @@ -1,10 +1,10 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Dict, Iterable, Set +from typing import Dict, Iterable, Optional, Set from swh.model.model import ( Content, @@ -32,14 +32,13 @@ """ + object_types = ["content", "skipped_content", "directory", "revision"] + objects_seen: Dict[str, Set[bytes]] = {} + def __init__(self, storage): self.storage = get_storage(**storage) - self.objects_seen = { - "content": set(), # sha256 - "skipped_content": set(), # sha1_git - "directory": set(), # sha1_git - "revision": set(), # sha1_git - } + for object_type in self.object_types: + self.objects_seen[object_type] = set() def __getattr__(self, key): if key == "storage": @@ -137,3 +136,15 @@ fn = fn_by_object_type[object_type] return set(fn(missing_ids)) + + def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: + """Clear objects from current buffer + + """ + if object_types is None: + object_types = self.object_types + + for object_type in object_types: + self.objects_seen[object_type] = set() + + return self.storage.clear_buffers(object_types) diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -1016,3 +1016,9 @@ def diff_revision(self, revision, track_renaming=False): raise NotImplementedError("InMemoryStorage.diff_revision") + + def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: + """Do nothing + + """ + return None diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -1303,3 +1303,14 @@ for more details). """ ... + + @remote_api_endpoint("clear/buffer") + def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: + """For real backend storages (pg, storage, in-memory), this is expected to be a + noop operation doing nothing. + + For proxy storages (especially filter, buffer), this is expected to + have a real effect of cleaning the internal state. + + """ + ... diff --git a/swh/storage/retry.py b/swh/storage/retry.py --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -176,3 +176,6 @@ if hasattr(self.storage, "flush"): return self.storage.flush(object_types) return {} + + def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: + return self.storage.clear_buffers(object_types) diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -1215,3 +1215,9 @@ @timed def diff_revision(self, revision, track_renaming=False): return diff.diff_revision(self, revision, track_renaming) + + def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: + """Do nothing + + """ + return None diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -336,3 +336,69 @@ s = storage.flush() assert s == {} + + +def test_buffering_proxy_storage_clear(sample_data): + """Clear operation on buffer + + """ + threshold = 10 + contents = sample_data["content"] + assert 0 < len(contents) < threshold + skipped_contents = sample_data["skipped_content"] + assert 0 < len(skipped_contents) < threshold + directories = sample_data["directory"] + assert 0 < len(directories) < threshold + revisions = sample_data["revision"] + assert 0 < len(revisions) < threshold + releases = sample_data["release"] + assert 0 < len(releases) < threshold + + storage = get_storage_with_buffer_config( + min_batch_size={ + "content": threshold, + "skipped_content": threshold, + "directory": threshold, + "revision": threshold, + "release": threshold, + } + ) + + s = storage.content_add(contents) + assert s == {} + s = storage.skipped_content_add(skipped_contents) + assert s == {} + s = storage.directory_add(directories) + assert s == {} + s = storage.revision_add(revisions) + assert s == {} + s = storage.release_add(releases) + assert s == {} + + assert len(storage._objects["content"]) == len(contents) + assert len(storage._objects["skipped_content"]) == len(skipped_contents) + assert len(storage._objects["directory"]) == len(directories) + assert len(storage._objects["revision"]) == len(revisions) + assert len(storage._objects["release"]) == len(releases) + + # clear only content from the buffer + s = storage.clear_buffers(["content"]) + assert s is None + + # specific clear operation on specific object type content only touched + # them + assert len(storage._objects["content"]) == 0 + assert len(storage._objects["skipped_content"]) == len(skipped_contents) + assert len(storage._objects["directory"]) == len(directories) + assert len(storage._objects["revision"]) == len(revisions) + assert len(storage._objects["release"]) == len(releases) + + # clear current buffer from all object types + s = storage.clear_buffers() + assert s is None + + assert len(storage._objects["content"]) == 0 + assert len(storage._objects["skipped_content"]) == 0 + assert len(storage._objects["directory"]) == 0 + assert len(storage._objects["revision"]) == 0 + assert len(storage._objects["release"]) == 0 diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py --- a/swh/storage/tests/test_filter.py +++ b/swh/storage/tests/test_filter.py @@ -3,123 +3,182 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import pytest from swh.storage import get_storage -storage_config = { - "cls": "pipeline", - "steps": [{"cls": "validate"}, {"cls": "filter"}, {"cls": "memory"},], -} +@pytest.fixture +def swh_storage(): + storage_config = { + "cls": "pipeline", + "steps": [{"cls": "validate"}, {"cls": "filter"}, {"cls": "memory"},], + } + + return get_storage(**storage_config) -def test_filtering_proxy_storage_content(sample_data): +def test_filtering_proxy_storage_content(swh_storage, sample_data): sample_content = sample_data["content"][0] - storage = get_storage(**storage_config) - content = next(storage.content_get([sample_content["sha1"]])) + content = next(swh_storage.content_get([sample_content["sha1"]])) assert not content - s = storage.content_add([sample_content]) + s = swh_storage.content_add([sample_content]) assert s == { "content:add": 1, "content:add:bytes": sample_content["length"], } - content = next(storage.content_get([sample_content["sha1"]])) + content = next(swh_storage.content_get([sample_content["sha1"]])) assert content is not None - s = storage.content_add([sample_content]) + s = swh_storage.content_add([sample_content]) assert s == { "content:add": 0, "content:add:bytes": 0, } -def test_filtering_proxy_storage_skipped_content(sample_data): +def test_filtering_proxy_storage_skipped_content(swh_storage, sample_data): sample_content = sample_data["skipped_content"][0] - storage = get_storage(**storage_config) - content = next(storage.skipped_content_missing([sample_content])) + content = next(swh_storage.skipped_content_missing([sample_content])) assert content["sha1"] == sample_content["sha1"] - s = storage.skipped_content_add([sample_content]) + s = swh_storage.skipped_content_add([sample_content]) assert s == { "skipped_content:add": 1, } - content = list(storage.skipped_content_missing([sample_content])) + content = list(swh_storage.skipped_content_missing([sample_content])) assert content == [] - s = storage.skipped_content_add([sample_content]) + s = swh_storage.skipped_content_add([sample_content]) assert s == { "skipped_content:add": 0, } -def test_filtering_proxy_storage_skipped_content_missing_sha1_git(sample_data): +def test_filtering_proxy_storage_skipped_content_missing_sha1_git( + swh_storage, sample_data +): sample_content = sample_data["skipped_content"][0] sample_content2 = sample_data["skipped_content"][1] - storage = get_storage(**storage_config) sample_content["sha1_git"] = sample_content2["sha1_git"] = None - content = next(storage.skipped_content_missing([sample_content])) + content = next(swh_storage.skipped_content_missing([sample_content])) assert content["sha1"] == sample_content["sha1"] - s = storage.skipped_content_add([sample_content]) + s = swh_storage.skipped_content_add([sample_content]) assert s == { "skipped_content:add": 1, } - content = list(storage.skipped_content_missing([sample_content])) + content = list(swh_storage.skipped_content_missing([sample_content])) assert content == [] - s = storage.skipped_content_add([sample_content2]) + s = swh_storage.skipped_content_add([sample_content2]) assert s == { "skipped_content:add": 1, } - content = list(storage.skipped_content_missing([sample_content2])) + content = list(swh_storage.skipped_content_missing([sample_content2])) assert content == [] -def test_filtering_proxy_storage_revision(sample_data): +def test_filtering_proxy_storage_revision(swh_storage, sample_data): sample_revision = sample_data["revision"][0] - storage = get_storage(**storage_config) - revision = next(storage.revision_get([sample_revision["id"]])) + revision = next(swh_storage.revision_get([sample_revision["id"]])) assert not revision - s = storage.revision_add([sample_revision]) + s = swh_storage.revision_add([sample_revision]) assert s == { "revision:add": 1, } - revision = next(storage.revision_get([sample_revision["id"]])) + revision = next(swh_storage.revision_get([sample_revision["id"]])) assert revision is not None - s = storage.revision_add([sample_revision]) + s = swh_storage.revision_add([sample_revision]) assert s == { "revision:add": 0, } -def test_filtering_proxy_storage_directory(sample_data): +def test_filtering_proxy_storage_directory(swh_storage, sample_data): sample_directory = sample_data["directory"][0] - storage = get_storage(**storage_config) - directory = next(storage.directory_missing([sample_directory["id"]])) + directory = next(swh_storage.directory_missing([sample_directory["id"]])) assert directory - s = storage.directory_add([sample_directory]) + s = swh_storage.directory_add([sample_directory]) assert s == { "directory:add": 1, } - directory = list(storage.directory_missing([sample_directory["id"]])) + directory = list(swh_storage.directory_missing([sample_directory["id"]])) assert not directory - s = storage.directory_add([sample_directory]) + s = swh_storage.directory_add([sample_directory]) assert s == { "directory:add": 0, } + + +def test_filtering_proxy_storage_clear(swh_storage, sample_data): + """Clear operation on filter proxy + + """ + threshold = 10 + contents = sample_data["content"] + assert 0 < len(contents) < threshold + skipped_contents = sample_data["skipped_content"] + assert 0 < len(skipped_contents) < threshold + directories = sample_data["directory"] + assert 0 < len(directories) < threshold + revisions = sample_data["revision"] + assert 0 < len(revisions) < threshold + releases = sample_data["release"] + assert 0 < len(releases) < threshold + + s = swh_storage.content_add(contents) + assert s["content:add"] == len(contents) + s = swh_storage.skipped_content_add(skipped_contents) + assert s == { + "skipped_content:add": len(directories), + } + s = swh_storage.directory_add(directories) + assert s == { + "directory:add": len(directories), + } + s = swh_storage.revision_add(revisions) + assert s == { + "revision:add": len(revisions), + } + + assert len(swh_storage.objects_seen["content"]) == len(contents) + assert len(swh_storage.objects_seen["skipped_content"]) == len(skipped_contents) + assert len(swh_storage.objects_seen["directory"]) == len(directories) + assert len(swh_storage.objects_seen["revision"]) == len(revisions) + + # clear only content from the buffer + s = swh_storage.clear_buffers(["content"]) + assert s is None + + # specific clear operation on specific object type content only touched + # them + assert len(swh_storage.objects_seen["content"]) == 0 + assert len(swh_storage.objects_seen["skipped_content"]) == len(skipped_contents) + assert len(swh_storage.objects_seen["directory"]) == len(directories) + assert len(swh_storage.objects_seen["revision"]) == len(revisions) + + # clear current buffer from all object types + s = swh_storage.clear_buffers() + assert s is None + + assert len(swh_storage.objects_seen["content"]) == 0 + assert len(swh_storage.objects_seen["skipped_content"]) == 0 + assert len(swh_storage.objects_seen["directory"]) == 0 + assert len(swh_storage.objects_seen["revision"]) == 0 diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -3990,3 +3990,9 @@ "absent", "Content too long", ) + + def test_clear_buffers(self, swh_storage): + """Calling clear buffers on real storage does nothing + + """ + assert swh_storage.clear_buffers() is None diff --git a/swh/storage/validate.py b/swh/storage/validate.py --- a/swh/storage/validate.py +++ b/swh/storage/validate.py @@ -5,7 +5,7 @@ import datetime import contextlib -from typing import Dict, Iterable, List +from typing import Dict, Iterable, List, Optional from swh.model.model import ( BaseModel, @@ -107,3 +107,6 @@ with convert_validation_exceptions(): origin = Origin.from_dict(origin) return self.storage.origin_add_one(origin) + + def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: + return self.storage.clear_buffers(object_types)