diff --git a/swh/storage/proxies/buffer.py b/swh/storage/proxies/buffer.py --- a/swh/storage/proxies/buffer.py +++ b/swh/storage/proxies/buffer.py @@ -9,7 +9,7 @@ from typing_extensions import Literal from swh.core.utils import grouper -from swh.model.model import BaseModel, Content, Directory, SkippedContent +from swh.model.model import BaseModel, Content, Directory, Revision, SkippedContent from swh.storage import get_storage from swh.storage.interface import StorageInterface @@ -39,6 +39,7 @@ "directory": 25000, "directory_entries": 200000, "revision": 100000, + "revision_parents": 200000, "release": 100000, "snapshot": 25000, "extid": 10000, @@ -68,6 +69,7 @@ directory: 5000 directory_entries: 100000 revision: 1000 + revision_parents: 2000 release: 10000 snapshot: 5000 @@ -83,6 +85,7 @@ } self._contents_size: int = 0 self._directory_entries: int = 0 + self._revision_parents: int = 0 def __getattr__(self, key: str): if key.endswith("_add"): @@ -134,6 +137,17 @@ return stats + def revision_add(self, revisions: Sequence[Revision]) -> Dict[str, int]: + stats = self.object_add(revisions, object_type="revision", keys=["id"]) + + if not stats: + # We did not flush based on number of objects; check the number of parents + self._revision_parents += sum(len(r.parents) for r in revisions) + if self._revision_parents >= self._buffer_thresholds["revision_parents"]: + return self.flush(["content", "directory", "revision"]) + + return stats + def object_add( self, objects: Sequence[BaseModel], @@ -196,5 +210,7 @@ self._contents_size = 0 elif object_type == "directory": self._directory_entries = 0 + elif object_type == "revision": + self._revision_parents = 0 self.storage.clear_buffers(object_types) diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -359,6 +359,29 @@ assert s == {} +def test_buffering_proxy_storage_revision_parents_threshold(sample_data) -> None: + revisions = sample_data.revisions + n_parents = sum(len(r.parents) for r in revisions) + threshold = sum(len(r.parents) for r in revisions[:-2]) + + # ensure the threshold is in the middle + assert 0 < threshold < n_parents + + storage = get_storage_with_buffer_config( + min_batch_size={"revision_parents": threshold} + ) + storage.storage = Mock(wraps=storage.storage) + + for revision in revisions: + storage.revision_add([revision]) + storage.flush() + + # We should have called the underlying revision_add at least twice, as + # we have hit the threshold for number of parents on revision n-2 + method_calls = Counter(c[0] for c in storage.storage.method_calls) + assert method_calls["revision_add"] >= 2 + + def test_buffering_proxy_storage_release_threshold_not_hit(sample_data) -> None: releases = sample_data.releases threshold = 10