diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -101,9 +101,9 @@ ) def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: + summary: Dict[str, Dict] = self.storage.flush(object_types) if object_types is None: object_types = self.object_types - summary = {} # type: Dict[str, Dict] for object_type in object_types: buffer_ = self._objects[object_type] batches = grouper(buffer_.values(), n=self.min_batch_size[object_type]) diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -1002,3 +1002,6 @@ def metadata_provider_get_by(self, provider): # TODO raise NotImplementedError("not yet supported for Cassandra") + + def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: + return {} diff --git a/swh/storage/filter.py b/swh/storage/filter.py --- a/swh/storage/filter.py +++ b/swh/storage/filter.py @@ -4,7 +4,7 @@ # See top-level LICENSE file for more information -from typing import Dict, Iterable, Set +from typing import Dict, Iterable, Optional, Set from swh.model.model import ( Content, @@ -137,3 +137,6 @@ fn = fn_by_object_type[object_type] return set(fn(missing_ids)) + + def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: + return self.storage.flush(object_types) diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -1016,3 +1016,6 @@ def diff_revision(self, revision, track_renaming=False): raise NotImplementedError("InMemoryStorage.diff_revision") + + def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: + return {} diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -1303,3 +1303,14 @@ for more details). """ ... + + @remote_api_endpoint("flush") + def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: + """For real backend storages (pg, storage, in-memory), this is expected to be a + noop operation doing nothing. + + For proxy storages (especially buffer), this is expected to have a real + effect of forcing the actual writes to the backend. + + """ + ... diff --git a/swh/storage/retry.py b/swh/storage/retry.py --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -168,11 +168,8 @@ def snapshot_add(self, snapshots: Iterable[Snapshot]) -> Dict: return self.storage.snapshot_add(snapshots) - @swh_retry def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: """Specific case for buffer proxy storage failing to flush data """ - if hasattr(self.storage, "flush"): - return self.storage.flush(object_types) - return {} + return self.storage.flush(object_types) diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -189,6 +189,7 @@ @process_metrics def content_add(self, content: Iterable[Content]) -> Dict: ctime = now() + contents = [attr.evolve(c, ctime=ctime) for c in content] objstorage_summary = self.objstorage.content_add(contents) @@ -1215,3 +1216,6 @@ @timed def diff_revision(self, revision, track_renaming=False): return diff.diff_revision(self, revision, track_renaming) + + def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: + return {} diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -3990,3 +3990,9 @@ "absent", "Content too long", ) + + def test_flush(self, swh_storage): + """Calling clear buffers on real storage does nothing + + """ + assert swh_storage.flush() == {} diff --git a/swh/storage/validate.py b/swh/storage/validate.py --- a/swh/storage/validate.py +++ b/swh/storage/validate.py @@ -5,7 +5,7 @@ import datetime import contextlib -from typing import Dict, Iterable, List +from typing import Dict, Iterable, Optional, List from swh.model.model import ( BaseModel, @@ -107,3 +107,6 @@ with convert_validation_exceptions(): origin = Origin.from_dict(origin) return self.storage.origin_add_one(origin) + + def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: + return self.storage.flush(object_types)