diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -115,3 +115,21 @@ return self.flush() return {} + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + """Clear objects from current buffer. + + WARNING: + + data that has not been flushed to storage will be lost when this + method is called. This should only be called when `flush` fails and + you want to continue your processing. + + """ + if object_types is None: + object_types = self.object_types + + for object_type in object_types: + q = self._objects[object_type] + q.clear() diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -979,3 +979,10 @@ def metadata_provider_get_by(self, provider): # TODO raise NotImplementedError('not yet supported for Cassandra') + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + """Do nothing + + """ + return None diff --git a/swh/storage/filter.py b/swh/storage/filter.py --- a/swh/storage/filter.py +++ b/swh/storage/filter.py @@ -1,10 +1,10 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Dict, Iterable, Set +from typing import Dict, Iterable, Optional, Set from swh.model.model import ( Content, SkippedContent, Directory, Revision, @@ -149,3 +149,9 @@ fn = fn_by_object_type[object_type] return set(fn(missing_ids)) + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + if hasattr(self.storage, 'clear_buffers'): + return self.storage.clear_buffers(object_types) + return None diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -1005,3 +1005,10 @@ def diff_revision(self, revision, track_renaming=False): raise NotImplementedError('InMemoryStorage.diff_revision') + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + """Do nothing + + """ + return None diff --git a/swh/storage/retry.py b/swh/storage/retry.py --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -149,3 +149,9 @@ if hasattr(self.storage, 'flush'): return self.storage.flush(object_types) return {} + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + if hasattr(self.storage, 'clear_buffers'): + return self.storage.clear_buffers(object_types) + return None diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -1152,3 +1152,10 @@ @timed def diff_revision(self, revision, track_renaming=False): return diff.diff_revision(self, revision, track_renaming) + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + """Do nothing + + """ + return None diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -277,3 +277,69 @@ s = storage.flush() assert s == {} + + +def test_buffering_proxy_storage_clear(sample_data): + """Clear operation on buffer + + """ + threshold = 10 + contents = sample_data['content'] + assert 0 < len(contents) < threshold + skipped_contents = sample_data['skipped_content'] + assert 0 < len(skipped_contents) < threshold + directories = sample_data['directory'] + assert 0 < len(directories) < threshold + revisions = sample_data['revision'] + assert 0 < len(revisions) < threshold + releases = sample_data['release'] + assert 0 < len(releases) < threshold + + storage = get_storage_with_buffer_config( + min_batch_size={ + 'content': threshold, + 'skipped_content': threshold, + 'directory': threshold, + 'revision': threshold, + 'release': threshold, + } + ) + + s = storage.content_add(contents) + assert s == {} + s = storage.skipped_content_add(skipped_contents) + assert s == {} + s = storage.directory_add(directories) + assert s == {} + s = storage.revision_add(revisions) + assert s == {} + s = storage.release_add(releases) + assert s == {} + + assert len(storage._objects['content']) == len(contents) + assert len(storage._objects['skipped_content']) == len(skipped_contents) + assert len(storage._objects['directory']) == len(directories) + assert len(storage._objects['revision']) == len(revisions) + assert len(storage._objects['release']) == len(releases) + + # clear only content from the buffer + s = storage.clear_buffers(['content']) + assert s is None + + # specific clear operation on specific object type content only touched + # them + assert len(storage._objects['content']) == 0 + assert len(storage._objects['skipped_content']) == len(skipped_contents) + assert len(storage._objects['directory']) == len(directories) + assert len(storage._objects['revision']) == len(revisions) + assert len(storage._objects['release']) == len(releases) + + # clear current buffer from all object types + s = storage.clear_buffers() + assert s is None + + assert len(storage._objects['content']) == 0 + assert len(storage._objects['skipped_content']) == 0 + assert len(storage._objects['directory']) == 0 + assert len(storage._objects['revision']) == 0 + assert len(storage._objects['release']) == 0 diff --git a/swh/storage/validate.py b/swh/storage/validate.py --- a/swh/storage/validate.py +++ b/swh/storage/validate.py @@ -5,7 +5,7 @@ import datetime import contextlib -from typing import Dict, Iterable, List +from typing import Dict, Iterable, List, Optional from swh.model.model import ( BaseModel, SkippedContent, Content, Directory, Revision, Release, Snapshot, @@ -100,3 +100,9 @@ with convert_validation_exceptions(): origin = Origin.from_dict(origin) return self.storage.origin_add_one(origin) + + def clear_buffers( + self, object_types: Optional[Iterable[str]] = None) -> None: + if hasattr(self.storage, 'clear_buffers'): + return self.storage.clear_buffers(object_types) + return None