diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -101,9 +101,9 @@ ) def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: + summary: Dict[str, int] = self.storage.flush(object_types) if object_types is None: object_types = self.object_types - summary = {} # type: Dict[str, Dict] for object_type in object_types: buffer_ = self._objects[object_type] batches = grouper(buffer_.values(), n=self.min_batch_size[object_type]) diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -1008,3 +1008,6 @@ """ return None + + def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: + return {} diff --git a/swh/storage/filter.py b/swh/storage/filter.py --- a/swh/storage/filter.py +++ b/swh/storage/filter.py @@ -148,3 +148,6 @@ self.objects_seen[object_type] = set() return self.storage.clear_buffers(object_types) + + def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: + return self.storage.flush(object_types) diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -1022,3 +1022,6 @@ """ return None + + def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: + return {} diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -1310,5 +1310,10 @@ storages (especially filter, buffer), this is an operation which cleans internal state. + @remote_api_endpoint("flush") + def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: + """For backend storages (pg, storage, in-memory), this is expected to be a noop + operation. For proxy storages (especially buffer), this is expected to trigger + actual writes to the backend. """ ... diff --git a/swh/storage/retry.py b/swh/storage/retry.py --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -168,14 +168,11 @@ def snapshot_add(self, snapshots: Iterable[Snapshot]) -> Dict: return self.storage.snapshot_add(snapshots) - @swh_retry + def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: + return self.storage.clear_buffers(object_types) + def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: """Specific case for buffer proxy storage failing to flush data """ - if hasattr(self.storage, "flush"): - return self.storage.flush(object_types) - return {} - - def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: - return self.storage.clear_buffers(object_types) + return self.storage.flush(object_types) diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -189,6 +189,7 @@ @process_metrics def content_add(self, content: Iterable[Content]) -> Dict: ctime = now() + contents = [attr.evolve(c, ctime=ctime) for c in content] objstorage_summary = self.objstorage.content_add(contents) @@ -1221,3 +1222,6 @@ """ return None + + def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: + return {} diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -3996,3 +3996,9 @@ """ assert swh_storage.clear_buffers() is None + + def test_flush(self, swh_storage): + """Calling clear buffers on real storage does nothing + + """ + assert swh_storage.flush() == {} diff --git a/swh/storage/validate.py b/swh/storage/validate.py --- a/swh/storage/validate.py +++ b/swh/storage/validate.py @@ -5,7 +5,7 @@ import datetime import contextlib -from typing import Dict, Iterable, List, Optional +from typing import Dict, Iterable, Optional, List from swh.model.model import ( BaseModel, @@ -110,3 +110,6 @@ def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: return self.storage.clear_buffers(object_types) + + def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: + return self.storage.flush(object_types)