diff --git a/swh/storage/proxies/buffer.py b/swh/storage/proxies/buffer.py --- a/swh/storage/proxies/buffer.py +++ b/swh/storage/proxies/buffer.py @@ -9,7 +9,14 @@ from typing_extensions import Literal from swh.core.utils import grouper -from swh.model.model import BaseModel, Content, Directory, Revision, SkippedContent +from swh.model.model import ( + BaseModel, + Content, + Directory, + Release, + Revision, + SkippedContent, +) from swh.storage import get_storage from swh.storage.interface import StorageInterface @@ -40,12 +47,39 @@ "directory_entries": 200000, "revision": 100000, "revision_parents": 200000, + "revision_bytes": 100 * 1024 * 1024, "release": 100000, + "release_bytes": 100 * 1024 * 1024, "snapshot": 25000, "extid": 10000, } +def estimate_revision_size(revision: Revision) -> int: + """Estimate the size of a revision, by summing the size of variable length fields""" + s = 20 * len(revision.parents) + + if revision.message: + s += len(revision.message) + + s += len(revision.author.fullname) + s += len(revision.committer.fullname) + s += sum(len(h) + len(v) for h, v in revision.extra_headers) + + return s + + +def estimate_release_size(release: Release) -> int: + """Estimate the size of a release, by summing the size of variable length fields""" + s = 0 + if release.message: + s += len(release.message) + if release.author: + s += len(release.author.fullname) + + return s + + class BufferingProxyStorage: """Storage implementation in charge of accumulating objects prior to discussing with the "main" storage. @@ -70,7 +104,9 @@ directory_entries: 100000 revision: 1000 revision_parents: 2000 + revision_bytes: 100000000 release: 10000 + release_bytes: 100000000 snapshot: 5000 """ @@ -86,6 +122,8 @@ self._contents_size: int = 0 self._directory_entries: int = 0 self._revision_parents: int = 0 + self._revision_size: int = 0 + self._release_size: int = 0 def __getattr__(self, key: str): if key.endswith("_add"): @@ -141,13 +179,29 @@ stats = self.object_add(revisions, object_type="revision", keys=["id"]) if not stats: - # We did not flush based on number of objects; check the number of parents + # We did not flush based on number of objects; check the number of + # parents and estimated size self._revision_parents += sum(len(r.parents) for r in revisions) - if self._revision_parents >= self._buffer_thresholds["revision_parents"]: + self._revision_size += sum(estimate_revision_size(r) for r in revisions) + if ( + self._revision_parents >= self._buffer_thresholds["revision_parents"] + or self._revision_size >= self._buffer_thresholds["revision_bytes"] + ): return self.flush(["content", "directory", "revision"]) return stats + def release_add(self, releases: Sequence[Release]) -> Dict[str, int]: + stats = self.object_add(releases, object_type="release", keys=["id"]) + + if not stats: + # We did not flush based on number of objects; check the estimated size + self._release_size += sum(estimate_release_size(r) for r in releases) + if self._release_size >= self._buffer_thresholds["release_bytes"]: + return self.flush(["content", "directory", "release", "release"]) + + return stats + def object_add( self, objects: Sequence[BaseModel], @@ -212,5 +266,8 @@ self._directory_entries = 0 elif object_type == "revision": self._revision_parents = 0 + self._revision_size = 0 + elif object_type == "release": + self._release_size = 0 self.storage.clear_buffers(object_types) diff --git a/swh/storage/tests/test_buffer.py b/swh/storage/tests/test_buffer.py --- a/swh/storage/tests/test_buffer.py +++ b/swh/storage/tests/test_buffer.py @@ -8,7 +8,11 @@ from unittest.mock import Mock from swh.storage import get_storage -from swh.storage.proxies.buffer import BufferingProxyStorage +from swh.storage.proxies.buffer import ( + BufferingProxyStorage, + estimate_release_size, + estimate_revision_size, +) def get_storage_with_buffer_config(**buffer_config) -> BufferingProxyStorage: @@ -382,6 +386,29 @@ assert method_calls["revision_add"] >= 2 +def test_buffering_proxy_storage_revision_size_threshold(sample_data) -> None: + revisions = sample_data.revisions + total_size = sum(estimate_revision_size(r) for r in revisions) + threshold = sum(estimate_revision_size(r) for r in revisions[:-2]) + + # ensure the threshold is in the middle + assert 0 < threshold < total_size + + storage = get_storage_with_buffer_config( + min_batch_size={"revision_bytes": threshold} + ) + storage.storage = Mock(wraps=storage.storage) + + for revision in revisions: + storage.revision_add([revision]) + storage.flush() + + # We should have called the underlying revision_add at least twice, as + # we have hit the threshold for number of parents on revision n-2 + method_calls = Counter(c[0] for c in storage.storage.method_calls) + assert method_calls["revision_add"] >= 2 + + def test_buffering_proxy_storage_release_threshold_not_hit(sample_data) -> None: releases = sample_data.releases threshold = 10 @@ -450,6 +477,29 @@ assert s == {} +def test_buffering_proxy_storage_release_size_threshold(sample_data) -> None: + releases = sample_data.releases + total_size = sum(estimate_release_size(r) for r in releases) + threshold = sum(estimate_release_size(r) for r in releases[:-2]) + + # ensure the threshold is in the middle + assert 0 < threshold < total_size + + storage = get_storage_with_buffer_config( + min_batch_size={"release_bytes": threshold} + ) + storage.storage = Mock(wraps=storage.storage) + + for release in releases: + storage.release_add([release]) + storage.flush() + + # We should have called the underlying release_add at least twice, as + # we have hit the threshold for number of parents on release n-2 + method_calls = Counter(c[0] for c in storage.storage.method_calls) + assert method_calls["release_add"] >= 2 + + def test_buffering_proxy_storage_snapshot_threshold_not_hit(sample_data) -> None: snapshots = sample_data.snapshots threshold = 10