Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_buffer.py
Show First 20 Lines • Show All 475 Lines • ▼ Show 20 Lines | def test_buffering_proxy_storage_clear(sample_data) -> None: | ||||
assert len(storage._objects["release"]) == 0 | assert len(storage._objects["release"]) == 0 | ||||
assert len(storage._objects["snapshot"]) == 0 | assert len(storage._objects["snapshot"]) == 0 | ||||
def test_buffer_proxy_with_default_args() -> None: | def test_buffer_proxy_with_default_args() -> None: | ||||
storage = get_storage_with_buffer_config() | storage = get_storage_with_buffer_config() | ||||
assert storage is not None | assert storage is not None | ||||
def test_buffer_flush_stats(sample_data) -> None: | |||||
storage = get_storage_with_buffer_config() | |||||
s = storage.content_add(sample_data.contents) | |||||
assert s == {} | |||||
s = storage.skipped_content_add(sample_data.skipped_contents) | |||||
assert s == {} | |||||
s = storage.directory_add(sample_data.directories) | |||||
assert s == {} | |||||
s = storage.revision_add(sample_data.revisions) | |||||
assert s == {} | |||||
s = storage.release_add(sample_data.releases) | |||||
assert s == {} | |||||
s = storage.snapshot_add(sample_data.snapshots) | |||||
assert s == {} | |||||
# Flush all the things | |||||
s = storage.flush() | |||||
assert s["content:add"] > 0 | |||||
assert s["content:add:bytes"] > 0 | |||||
assert s["skipped_content:add"] > 0 | |||||
assert s["directory:add"] > 0 | |||||
assert s["revision:add"] > 0 | |||||
assert s["release:add"] > 0 | |||||
assert s["snapshot:add"] > 0 |