Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_buffer.py
Show All 9 Lines | def get_storage_with_buffer_config(**buffer_config): | ||||
storage_config = { | storage_config = { | ||||
"cls": "pipeline", | "cls": "pipeline", | ||||
"steps": [{"cls": "buffer", **buffer_config}, {"cls": "memory"},], | "steps": [{"cls": "buffer", **buffer_config}, {"cls": "memory"},], | ||||
} | } | ||||
return get_storage(**storage_config) | return get_storage(**storage_config) | ||||
def test_buffering_proxy_storage_content_threshold_not_hit(sample_data_model): | def test_buffering_proxy_storage_content_threshold_not_hit(sample_data): | ||||
contents = sample_data_model["content"][:2] | contents = sample_data["content"][:2] | ||||
contents_dict = [c.to_dict() for c in contents] | contents_dict = [c.to_dict() for c in contents] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"content": 10,}) | storage = get_storage_with_buffer_config(min_batch_size={"content": 10,}) | ||||
s = storage.content_add(contents) | s = storage.content_add(contents) | ||||
assert s == {} | assert s == {} | ||||
# contents have not been written to storage | # contents have not been written to storage | ||||
missing_contents = storage.content_missing(contents_dict) | missing_contents = storage.content_missing(contents_dict) | ||||
assert set(missing_contents) == set([contents[0].sha1, contents[1].sha1]) | assert set(missing_contents) == set([contents[0].sha1, contents[1].sha1]) | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == { | assert s == { | ||||
"content:add": 1 + 1, | "content:add": 1 + 1, | ||||
"content:add:bytes": contents[0].length + contents[1].length, | "content:add:bytes": contents[0].length + contents[1].length, | ||||
} | } | ||||
missing_contents = storage.content_missing(contents_dict) | missing_contents = storage.content_missing(contents_dict) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data_model): | def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data): | ||||
content = sample_data_model["content"][0] | content = sample_data["content"][0] | ||||
content_dict = content.to_dict() | content_dict = content.to_dict() | ||||
storage = get_storage_with_buffer_config(min_batch_size={"content": 1,}) | storage = get_storage_with_buffer_config(min_batch_size={"content": 1,}) | ||||
s = storage.content_add([content]) | s = storage.content_add([content]) | ||||
assert s == { | assert s == { | ||||
"content:add": 1, | "content:add": 1, | ||||
"content:add:bytes": content.length, | "content:add:bytes": content.length, | ||||
} | } | ||||
missing_contents = storage.content_missing([content_dict]) | missing_contents = storage.content_missing([content_dict]) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_content_deduplicate(sample_data_model): | def test_buffering_proxy_storage_content_deduplicate(sample_data): | ||||
contents = sample_data_model["content"][:2] | contents = sample_data["content"][:2] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"content": 2,}) | storage = get_storage_with_buffer_config(min_batch_size={"content": 2,}) | ||||
s = storage.content_add([contents[0], contents[0]]) | s = storage.content_add([contents[0], contents[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.content_add([contents[0]]) | s = storage.content_add([contents[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.content_add([contents[1]]) | s = storage.content_add([contents[1]]) | ||||
assert s == { | assert s == { | ||||
"content:add": 1 + 1, | "content:add": 1 + 1, | ||||
"content:add:bytes": contents[0].length + contents[1].length, | "content:add:bytes": contents[0].length + contents[1].length, | ||||
} | } | ||||
missing_contents = storage.content_missing([c.to_dict() for c in contents]) | missing_contents = storage.content_missing([c.to_dict() for c in contents]) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data_model): | def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data): | ||||
contents = sample_data_model["content"][:2] | contents = sample_data["content"][:2] | ||||
content_bytes_min_batch_size = 2 | content_bytes_min_batch_size = 2 | ||||
storage = get_storage_with_buffer_config( | storage = get_storage_with_buffer_config( | ||||
min_batch_size={"content": 10, "content_bytes": content_bytes_min_batch_size,} | min_batch_size={"content": 10, "content_bytes": content_bytes_min_batch_size,} | ||||
) | ) | ||||
assert contents[0].length > content_bytes_min_batch_size | assert contents[0].length > content_bytes_min_batch_size | ||||
s = storage.content_add([contents[0]]) | s = storage.content_add([contents[0]]) | ||||
assert s == { | assert s == { | ||||
"content:add": 1, | "content:add": 1, | ||||
"content:add:bytes": contents[0].length, | "content:add:bytes": contents[0].length, | ||||
} | } | ||||
missing_contents = storage.content_missing([contents[0].to_dict()]) | missing_contents = storage.content_missing([contents[0].to_dict()]) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_skipped_content_threshold_not_hit(sample_data_model): | def test_buffering_proxy_storage_skipped_content_threshold_not_hit(sample_data): | ||||
contents = sample_data_model["skipped_content"] | contents = sample_data["skipped_content"] | ||||
contents_dict = [c.to_dict() for c in contents] | contents_dict = [c.to_dict() for c in contents] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 10,}) | storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 10,}) | ||||
s = storage.skipped_content_add([contents[0], contents[1]]) | s = storage.skipped_content_add([contents[0], contents[1]]) | ||||
assert s == {} | assert s == {} | ||||
# contents have not been written to storage | # contents have not been written to storage | ||||
missing_contents = storage.skipped_content_missing(contents_dict) | missing_contents = storage.skipped_content_missing(contents_dict) | ||||
assert {c["sha1"] for c in missing_contents} == {c.sha1 for c in contents} | assert {c["sha1"] for c in missing_contents} == {c.sha1 for c in contents} | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {"skipped_content:add": 1 + 1} | assert s == {"skipped_content:add": 1 + 1} | ||||
missing_contents = storage.skipped_content_missing(contents_dict) | missing_contents = storage.skipped_content_missing(contents_dict) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data_model): | def test_buffering_proxy_storage_skipped_content_threshold_nb_hit(sample_data): | ||||
contents = sample_data_model["skipped_content"] | contents = sample_data["skipped_content"] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 1,}) | storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 1,}) | ||||
s = storage.skipped_content_add([contents[0]]) | s = storage.skipped_content_add([contents[0]]) | ||||
assert s == {"skipped_content:add": 1} | assert s == {"skipped_content:add": 1} | ||||
missing_contents = storage.skipped_content_missing([contents[0].to_dict()]) | missing_contents = storage.skipped_content_missing([contents[0].to_dict()]) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_skipped_content_deduplicate(sample_data_model): | def test_buffering_proxy_storage_skipped_content_deduplicate(sample_data): | ||||
contents = sample_data_model["skipped_content"][:2] | contents = sample_data["skipped_content"][:2] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 2,}) | storage = get_storage_with_buffer_config(min_batch_size={"skipped_content": 2,}) | ||||
s = storage.skipped_content_add([contents[0], contents[0]]) | s = storage.skipped_content_add([contents[0], contents[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.skipped_content_add([contents[0]]) | s = storage.skipped_content_add([contents[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.skipped_content_add([contents[1]]) | s = storage.skipped_content_add([contents[1]]) | ||||
assert s == { | assert s == { | ||||
"skipped_content:add": 1 + 1, | "skipped_content:add": 1 + 1, | ||||
} | } | ||||
missing_contents = storage.skipped_content_missing([c.to_dict() for c in contents]) | missing_contents = storage.skipped_content_missing([c.to_dict() for c in contents]) | ||||
assert list(missing_contents) == [] | assert list(missing_contents) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data_model): | def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data): | ||||
directories = sample_data_model["directory"] | directories = sample_data["directory"] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"directory": 10,}) | storage = get_storage_with_buffer_config(min_batch_size={"directory": 10,}) | ||||
s = storage.directory_add([directories[0]]) | s = storage.directory_add([directories[0]]) | ||||
assert s == {} | assert s == {} | ||||
directory_id = directories[0].id | directory_id = directories[0].id | ||||
missing_directories = storage.directory_missing([directory_id]) | missing_directories = storage.directory_missing([directory_id]) | ||||
assert list(missing_directories) == [directory_id] | assert list(missing_directories) == [directory_id] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == { | assert s == { | ||||
"directory:add": 1, | "directory:add": 1, | ||||
} | } | ||||
missing_directories = storage.directory_missing([directory_id]) | missing_directories = storage.directory_missing([directory_id]) | ||||
assert list(missing_directories) == [] | assert list(missing_directories) == [] | ||||
def test_buffering_proxy_storage_directory_threshold_hit(sample_data_model): | def test_buffering_proxy_storage_directory_threshold_hit(sample_data): | ||||
directories = sample_data_model["directory"] | directories = sample_data["directory"] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"directory": 1,}) | storage = get_storage_with_buffer_config(min_batch_size={"directory": 1,}) | ||||
s = storage.directory_add([directories[0]]) | s = storage.directory_add([directories[0]]) | ||||
assert s == { | assert s == { | ||||
"directory:add": 1, | "directory:add": 1, | ||||
} | } | ||||
missing_directories = storage.directory_missing([directories[0].id]) | missing_directories = storage.directory_missing([directories[0].id]) | ||||
assert list(missing_directories) == [] | assert list(missing_directories) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_directory_deduplicate(sample_data_model): | def test_buffering_proxy_storage_directory_deduplicate(sample_data): | ||||
directories = sample_data_model["directory"][:2] | directories = sample_data["directory"][:2] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"directory": 2,}) | storage = get_storage_with_buffer_config(min_batch_size={"directory": 2,}) | ||||
s = storage.directory_add([directories[0], directories[0]]) | s = storage.directory_add([directories[0], directories[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.directory_add([directories[0]]) | s = storage.directory_add([directories[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.directory_add([directories[1]]) | s = storage.directory_add([directories[1]]) | ||||
assert s == { | assert s == { | ||||
"directory:add": 1 + 1, | "directory:add": 1 + 1, | ||||
} | } | ||||
missing_directories = storage.directory_missing([d.id for d in directories]) | missing_directories = storage.directory_missing([d.id for d in directories]) | ||||
assert list(missing_directories) == [] | assert list(missing_directories) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data_model): | def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data): | ||||
revisions = sample_data_model["revision"] | revisions = sample_data["revision"] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"revision": 10,}) | storage = get_storage_with_buffer_config(min_batch_size={"revision": 10,}) | ||||
s = storage.revision_add([revisions[0]]) | s = storage.revision_add([revisions[0]]) | ||||
assert s == {} | assert s == {} | ||||
revision_id = revisions[0].id | revision_id = revisions[0].id | ||||
missing_revisions = storage.revision_missing([revision_id]) | missing_revisions = storage.revision_missing([revision_id]) | ||||
assert list(missing_revisions) == [revision_id] | assert list(missing_revisions) == [revision_id] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == { | assert s == { | ||||
"revision:add": 1, | "revision:add": 1, | ||||
} | } | ||||
missing_revisions = storage.revision_missing([revision_id]) | missing_revisions = storage.revision_missing([revision_id]) | ||||
assert list(missing_revisions) == [] | assert list(missing_revisions) == [] | ||||
def test_buffering_proxy_storage_revision_threshold_hit(sample_data_model): | def test_buffering_proxy_storage_revision_threshold_hit(sample_data): | ||||
revisions = sample_data_model["revision"] | revisions = sample_data["revision"] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"revision": 1,}) | storage = get_storage_with_buffer_config(min_batch_size={"revision": 1,}) | ||||
s = storage.revision_add([revisions[0]]) | s = storage.revision_add([revisions[0]]) | ||||
assert s == { | assert s == { | ||||
"revision:add": 1, | "revision:add": 1, | ||||
} | } | ||||
missing_revisions = storage.revision_missing([revisions[0].id]) | missing_revisions = storage.revision_missing([revisions[0].id]) | ||||
assert list(missing_revisions) == [] | assert list(missing_revisions) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_revision_deduplicate(sample_data_model): | def test_buffering_proxy_storage_revision_deduplicate(sample_data): | ||||
revisions = sample_data_model["revision"][:2] | revisions = sample_data["revision"][:2] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"revision": 2,}) | storage = get_storage_with_buffer_config(min_batch_size={"revision": 2,}) | ||||
s = storage.revision_add([revisions[0], revisions[0]]) | s = storage.revision_add([revisions[0], revisions[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.revision_add([revisions[0]]) | s = storage.revision_add([revisions[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.revision_add([revisions[1]]) | s = storage.revision_add([revisions[1]]) | ||||
assert s == { | assert s == { | ||||
"revision:add": 1 + 1, | "revision:add": 1 + 1, | ||||
} | } | ||||
missing_revisions = storage.revision_missing([r.id for r in revisions]) | missing_revisions = storage.revision_missing([r.id for r in revisions]) | ||||
assert list(missing_revisions) == [] | assert list(missing_revisions) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_release_threshold_not_hit(sample_data_model): | def test_buffering_proxy_storage_release_threshold_not_hit(sample_data): | ||||
releases = sample_data_model["release"] | releases = sample_data["release"] | ||||
threshold = 10 | threshold = 10 | ||||
assert len(releases) < threshold | assert len(releases) < threshold | ||||
storage = get_storage_with_buffer_config( | storage = get_storage_with_buffer_config( | ||||
min_batch_size={"release": threshold,} # configuration set | min_batch_size={"release": threshold,} # configuration set | ||||
) | ) | ||||
s = storage.release_add(releases) | s = storage.release_add(releases) | ||||
assert s == {} | assert s == {} | ||||
release_ids = [r.id for r in releases] | release_ids = [r.id for r in releases] | ||||
missing_releases = storage.release_missing(release_ids) | missing_releases = storage.release_missing(release_ids) | ||||
assert list(missing_releases) == release_ids | assert list(missing_releases) == release_ids | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == { | assert s == { | ||||
"release:add": len(releases), | "release:add": len(releases), | ||||
} | } | ||||
missing_releases = storage.release_missing(release_ids) | missing_releases = storage.release_missing(release_ids) | ||||
assert list(missing_releases) == [] | assert list(missing_releases) == [] | ||||
def test_buffering_proxy_storage_release_threshold_hit(sample_data_model): | def test_buffering_proxy_storage_release_threshold_hit(sample_data): | ||||
releases = sample_data_model["release"] | releases = sample_data["release"] | ||||
threshold = 2 | threshold = 2 | ||||
assert len(releases) > threshold | assert len(releases) > threshold | ||||
storage = get_storage_with_buffer_config( | storage = get_storage_with_buffer_config( | ||||
min_batch_size={"release": threshold,} # configuration set | min_batch_size={"release": threshold,} # configuration set | ||||
) | ) | ||||
s = storage.release_add(releases) | s = storage.release_add(releases) | ||||
assert s == { | assert s == { | ||||
"release:add": len(releases), | "release:add": len(releases), | ||||
} | } | ||||
release_ids = [r.id for r in releases] | release_ids = [r.id for r in releases] | ||||
missing_releases = storage.release_missing(release_ids) | missing_releases = storage.release_missing(release_ids) | ||||
assert list(missing_releases) == [] | assert list(missing_releases) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_release_deduplicate(sample_data_model): | def test_buffering_proxy_storage_release_deduplicate(sample_data): | ||||
releases = sample_data_model["release"][:2] | releases = sample_data["release"][:2] | ||||
storage = get_storage_with_buffer_config(min_batch_size={"release": 2,}) | storage = get_storage_with_buffer_config(min_batch_size={"release": 2,}) | ||||
s = storage.release_add([releases[0], releases[0]]) | s = storage.release_add([releases[0], releases[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.release_add([releases[0]]) | s = storage.release_add([releases[0]]) | ||||
assert s == {} | assert s == {} | ||||
s = storage.release_add([releases[1]]) | s = storage.release_add([releases[1]]) | ||||
assert s == { | assert s == { | ||||
"release:add": 1 + 1, | "release:add": 1 + 1, | ||||
} | } | ||||
missing_releases = storage.release_missing([r.id for r in releases]) | missing_releases = storage.release_missing([r.id for r in releases]) | ||||
assert list(missing_releases) == [] | assert list(missing_releases) == [] | ||||
s = storage.flush() | s = storage.flush() | ||||
assert s == {} | assert s == {} | ||||
def test_buffering_proxy_storage_clear(sample_data_model): | def test_buffering_proxy_storage_clear(sample_data): | ||||
"""Clear operation on buffer | """Clear operation on buffer | ||||
""" | """ | ||||
threshold = 10 | threshold = 10 | ||||
contents = sample_data_model["content"] | contents = sample_data["content"] | ||||
assert 0 < len(contents) < threshold | assert 0 < len(contents) < threshold | ||||
skipped_contents = sample_data_model["skipped_content"] | skipped_contents = sample_data["skipped_content"] | ||||
assert 0 < len(skipped_contents) < threshold | assert 0 < len(skipped_contents) < threshold | ||||
directories = sample_data_model["directory"] | directories = sample_data["directory"] | ||||
assert 0 < len(directories) < threshold | assert 0 < len(directories) < threshold | ||||
revisions = sample_data_model["revision"] | revisions = sample_data["revision"] | ||||
assert 0 < len(revisions) < threshold | assert 0 < len(revisions) < threshold | ||||
releases = sample_data_model["release"] | releases = sample_data["release"] | ||||
assert 0 < len(releases) < threshold | assert 0 < len(releases) < threshold | ||||
storage = get_storage_with_buffer_config( | storage = get_storage_with_buffer_config( | ||||
min_batch_size={ | min_batch_size={ | ||||
"content": threshold, | "content": threshold, | ||||
"skipped_content": threshold, | "skipped_content": threshold, | ||||
"directory": threshold, | "directory": threshold, | ||||
"revision": threshold, | "revision": threshold, | ||||
▲ Show 20 Lines • Show All 42 Lines • Show Last 20 Lines |