Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_buffer.py
# Copyright (C) 2019-2021 The Software Heritage developers | # Copyright (C) 2019-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import Counter | from collections import Counter | ||||
from typing import Optional | from typing import Optional | ||||
from unittest.mock import Mock | from unittest.mock import Mock | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.proxies.buffer import BufferingProxyStorage | from swh.storage.proxies.buffer import ( | ||||
BufferingProxyStorage, | |||||
estimate_release_size, | |||||
estimate_revision_size, | |||||
) | |||||
def get_storage_with_buffer_config(**buffer_config) -> BufferingProxyStorage: | def get_storage_with_buffer_config(**buffer_config) -> BufferingProxyStorage: | ||||
steps = [ | steps = [ | ||||
{"cls": "buffer", **buffer_config}, | {"cls": "buffer", **buffer_config}, | ||||
{"cls": "memory"}, | {"cls": "memory"}, | ||||
] | ] | ||||
▲ Show 20 Lines • Show All 357 Lines • ▼ Show 20 Lines | def test_buffering_proxy_storage_revision_parents_threshold(sample_data) -> None: | ||||
storage.flush() | storage.flush() | ||||
# We should have called the underlying revision_add at least twice, as | # We should have called the underlying revision_add at least twice, as | ||||
# we have hit the threshold for number of parents on revision n-2 | # we have hit the threshold for number of parents on revision n-2 | ||||
method_calls = Counter(c[0] for c in storage.storage.method_calls) | method_calls = Counter(c[0] for c in storage.storage.method_calls) | ||||
assert method_calls["revision_add"] >= 2 | assert method_calls["revision_add"] >= 2 | ||||
def test_buffering_proxy_storage_revision_size_threshold(sample_data) -> None: | |||||
revisions = sample_data.revisions | |||||
total_size = sum(estimate_revision_size(r) for r in revisions) | |||||
threshold = sum(estimate_revision_size(r) for r in revisions[:-2]) | |||||
# ensure the threshold is in the middle | |||||
assert 0 < threshold < total_size | |||||
storage = get_storage_with_buffer_config( | |||||
min_batch_size={"revision_bytes": threshold} | |||||
) | |||||
storage.storage = Mock(wraps=storage.storage) | |||||
for revision in revisions: | |||||
storage.revision_add([revision]) | |||||
storage.flush() | |||||
# We should have called the underlying revision_add at least twice, as | |||||
# we have hit the threshold for number of parents on revision n-2 | |||||
method_calls = Counter(c[0] for c in storage.storage.method_calls) | |||||
assert method_calls["revision_add"] >= 2 | |||||
def test_buffering_proxy_storage_release_threshold_not_hit(sample_data) -> None: | def test_buffering_proxy_storage_release_threshold_not_hit(sample_data) -> None: | ||||
releases = sample_data.releases | releases = sample_data.releases | ||||
threshold = 10 | threshold = 10 | ||||
assert len(releases) < threshold | assert len(releases) < threshold | ||||
storage = get_storage_with_buffer_config( | storage = get_storage_with_buffer_config( | ||||
min_batch_size={"release": threshold,} # configuration set | min_batch_size={"release": threshold,} # configuration set | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | def test_buffering_proxy_storage_release_deduplicate(sample_data) -> None: | ||||
missing_releases = storage.release_missing([r.id for r in releases]) | missing_releases = storage.release_missing([r.id for r in releases]) | ||||
assert list(missing_releases) == [] | assert list(missing_releases) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_release_size_threshold(sample_data) -> None: | |||||
releases = sample_data.releases | |||||
total_size = sum(estimate_release_size(r) for r in releases) | |||||
threshold = sum(estimate_release_size(r) for r in releases[:-2]) | |||||
# ensure the threshold is in the middle | |||||
assert 0 < threshold < total_size | |||||
storage = get_storage_with_buffer_config( | |||||
min_batch_size={"release_bytes": threshold} | |||||
) | |||||
storage.storage = Mock(wraps=storage.storage) | |||||
for release in releases: | |||||
storage.release_add([release]) | |||||
storage.flush() | |||||
# We should have called the underlying release_add at least twice, as | |||||
# we have hit the threshold for number of parents on release n-2 | |||||
method_calls = Counter(c[0] for c in storage.storage.method_calls) | |||||
assert method_calls["release_add"] >= 2 | |||||
def test_buffering_proxy_storage_snapshot_threshold_not_hit(sample_data) -> None: | def test_buffering_proxy_storage_snapshot_threshold_not_hit(sample_data) -> None: | ||||
snapshots = sample_data.snapshots | snapshots = sample_data.snapshots | ||||
threshold = 10 | threshold = 10 | ||||
assert len(snapshots) < threshold | assert len(snapshots) < threshold | ||||
storage = get_storage_with_buffer_config( | storage = get_storage_with_buffer_config( | ||||
min_batch_size={"snapshot": threshold,} # configuration set | min_batch_size={"snapshot": threshold,} # configuration set | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 219 Lines • Show Last 20 Lines |