Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_buffer.py
# Copyright (C) 2019-2021 The Software Heritage developers | # Copyright (C) 2019-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import Counter | |||||
from typing import Optional | from typing import Optional | ||||
from unittest.mock import Mock | from unittest.mock import Mock | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.proxies.buffer import BufferingProxyStorage | from swh.storage.proxies.buffer import BufferingProxyStorage | ||||
def get_storage_with_buffer_config(**buffer_config) -> BufferingProxyStorage: | def get_storage_with_buffer_config(**buffer_config) -> BufferingProxyStorage: | ||||
▲ Show 20 Lines • Show All 261 Lines • ▼ Show 20 Lines | def test_buffering_proxy_storage_directory_deduplicate(sample_data) -> None: | ||||
missing_directories = storage.directory_missing([d.id for d in directories]) | missing_directories = storage.directory_missing([d.id for d in directories]) | ||||
assert list(missing_directories) == [] | assert list(missing_directories) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_directory_entries_threshold(sample_data) -> None: | |||||
directories = sample_data.directories | |||||
n_entries = sum(len(d.entries) for d in directories) | |||||
threshold = sum(len(d.entries) for d in directories[:-2]) | |||||
# ensure the threshold is in the middle | |||||
assert 0 < threshold < n_entries | |||||
storage = get_storage_with_buffer_config( | |||||
min_batch_size={"directory_entries": threshold} | |||||
) | |||||
storage.storage = Mock(wraps=storage.storage) | |||||
for directory in directories: | |||||
storage.directory_add([directory]) | |||||
storage.flush() | |||||
# We should have called the underlying directory_add at least twice, as | |||||
# we have hit the threshold for number of entries on directory n-2 | |||||
method_calls = Counter(c[0] for c in storage.storage.method_calls) | |||||
assert method_calls["directory_add"] >= 2 | |||||
def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data) -> None: | def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data) -> None: | ||||
revision = sample_data.revision | revision = sample_data.revision | ||||
storage = get_storage_with_buffer_config(min_batch_size={"revision": 10,}) | storage = get_storage_with_buffer_config(min_batch_size={"revision": 10,}) | ||||
s = storage.revision_add([revision]) | s = storage.revision_add([revision]) | ||||
assert s == {} | assert s == {} | ||||
missing_revisions = storage.revision_missing([revision.id]) | missing_revisions = storage.revision_missing([revision.id]) | ||||
assert list(missing_revisions) == [revision.id] | assert list(missing_revisions) == [revision.id] | ||||
▲ Show 20 Lines • Show All 342 Lines • Show Last 20 Lines |