diff --git a/swh/storage/proxies/buffer.py b/swh/storage/proxies/buffer.py --- a/swh/storage/proxies/buffer.py +++ b/swh/storage/proxies/buffer.py @@ -9,7 +9,7 @@ from typing_extensions import Literal from swh.core.utils import grouper -from swh.model.model import BaseModel, Content, SkippedContent +from swh.model.model import BaseModel, Content, Directory, SkippedContent from swh.storage import get_storage from swh.storage.interface import StorageInterface @@ -37,6 +37,7 @@ "content_bytes": 100 * 1024 * 1024, "skipped_content": 10000, "directory": 25000, + "directory_entries": 200000, "revision": 100000, "release": 100000, "snapshot": 25000, @@ -65,6 +66,7 @@ content_bytes: 100000000 skipped_content: 10000 directory: 5000 + directory_entries: 100000 revision: 1000 release: 10000 snapshot: 5000 @@ -80,6 +82,7 @@ k: {} for k in OBJECT_TYPES } self._contents_size: int = 0 + self._directory_entries: int = 0 def __getattr__(self, key: str): if key.endswith("_add"): @@ -105,7 +108,8 @@ object_type="content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) - if not stats: # We did not flush already + if not stats: + # We did not flush based on number of objects; check total size self._contents_size += sum(c.length for c in contents) if self._contents_size >= self._buffer_thresholds["content_bytes"]: return self.flush(["content"]) @@ -119,6 +123,17 @@ keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) + def directory_add(self, directories: Sequence[Directory]) -> Dict[str, int]: + stats = self.object_add(directories, object_type="directory", keys=["id"]) + + if not stats: + # We did not flush based on number of objects; check the number of entries + self._directory_entries += sum(len(d.entries) for d in directories) + if self._directory_entries >= self._buffer_thresholds["directory_entries"]: + return self.flush(["content", "directory"]) + + return stats + def object_add( self, objects: Sequence[BaseModel], @@ -179,5 +194,7 @@ buffer_.clear() if object_type == "content": self._contents_size = 0 + elif object_type == "directory": + self._directory_entries = 0 self.storage.clear_buffers(object_types) diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import Counter from typing import Optional from unittest.mock import Mock @@ -280,6 +281,29 @@ assert s == {} +def test_buffering_proxy_storage_directory_entries_threshold(sample_data) -> None: + directories = sample_data.directories + n_entries = sum(len(d.entries) for d in directories) + threshold = sum(len(d.entries) for d in directories[:-2]) + + # ensure the threshold is in the middle + assert 0 < threshold < n_entries + + storage = get_storage_with_buffer_config( + min_batch_size={"directory_entries": threshold} + ) + storage.storage = Mock(wraps=storage.storage) + + for directory in directories: + storage.directory_add([directory]) + storage.flush() + + # We should have called the underlying directory_add at least twice, as + # we have hit the threshold for number of entries on directory n-2 + method_calls = Counter(c[0] for c in storage.storage.method_calls) + assert method_calls["directory_add"] >= 2 + + def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data) -> None: revision = sample_data.revision storage = get_storage_with_buffer_config(min_batch_size={"revision": 10,})