Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
Show First 20 Lines • Show All 3,846 Lines • ▼ Show 20 Lines | def test_generate_content_get_metadata(self, swh_storage, swh_contents): | ||||
actual_contents.extend(contents) | actual_contents.extend(contents) | ||||
keys_to_check = {"length", "status", "sha1", "sha1_git", "sha256", "blake2s256"} | keys_to_check = {"length", "status", "sha1", "sha1_git", "sha256", "blake2s256"} | ||||
assert_contents_ok( | assert_contents_ok( | ||||
expected_contents, actual_contents, keys_to_check=keys_to_check | expected_contents, actual_contents, keys_to_check=keys_to_check | ||||
) | ) | ||||
def test_generate_content_get_range(self, swh_storage, swh_contents): | |||||
"""content_get_range returns complete range""" | |||||
present_contents = [c.to_dict() for c in swh_contents if c.status != "absent"] | |||||
get_sha1s = sorted([c.sha1 for c in swh_contents if c.status != "absent"]) | |||||
start = get_sha1s[2] | |||||
end = get_sha1s[-2] | |||||
actual_result = swh_storage.content_get_range(start, end) | |||||
assert actual_result["next"] is None | |||||
actual_contents = actual_result["contents"] | |||||
expected_contents = [c for c in present_contents if start <= c["sha1"] <= end] | |||||
if expected_contents: | |||||
assert_contents_ok(expected_contents, actual_contents, ["sha1"]) | |||||
else: | |||||
assert actual_contents == [] | |||||
def test_generate_content_get_range_full(self, swh_storage, swh_contents): | |||||
"""content_get_range for a full range returns all available contents""" | |||||
present_contents = [c.to_dict() for c in swh_contents if c.status != "absent"] | |||||
start = b"0" * 40 | |||||
end = b"f" * 40 | |||||
actual_result = swh_storage.content_get_range(start, end) | |||||
assert actual_result["next"] is None | |||||
actual_contents = actual_result["contents"] | |||||
expected_contents = [c for c in present_contents if start <= c["sha1"] <= end] | |||||
if expected_contents: | |||||
assert_contents_ok(expected_contents, actual_contents, ["sha1"]) | |||||
else: | |||||
assert actual_contents == [] | |||||
def test_generate_content_get_range_empty(self, swh_storage, swh_contents): | |||||
"""content_get_range for an empty range returns nothing""" | |||||
start = b"0" * 40 | |||||
end = b"f" * 40 | |||||
actual_result = swh_storage.content_get_range(end, start) | |||||
assert actual_result["next"] is None | |||||
assert len(actual_result["contents"]) == 0 | |||||
def test_generate_content_get_range_limit_none(self, swh_storage): | |||||
"""content_get_range call with wrong limit input should fail""" | |||||
with pytest.raises(StorageArgumentException) as e: | |||||
swh_storage.content_get_range(start=None, end=None, limit=None) | |||||
assert e.value.args == ("limit should not be None",) | |||||
def test_generate_content_get_range_no_limit(self, swh_storage, swh_contents): | |||||
"""content_get_range returns contents within range provided""" | |||||
# input the list of sha1s we want from storage | |||||
get_sha1s = sorted([c.sha1 for c in swh_contents if c.status != "absent"]) | |||||
start = get_sha1s[0] | |||||
end = get_sha1s[-1] | |||||
# retrieve contents | |||||
actual_result = swh_storage.content_get_range(start, end) | |||||
actual_contents = actual_result["contents"] | |||||
assert actual_result["next"] is None | |||||
assert len(actual_contents) == len(get_sha1s) | |||||
expected_contents = [c.to_dict() for c in swh_contents if c.status != "absent"] | |||||
assert_contents_ok(expected_contents, actual_contents, ["sha1"]) | |||||
def test_generate_content_get_range_limit(self, swh_storage, swh_contents): | |||||
"""content_get_range paginates results if limit exceeded""" | |||||
contents_map = {c.sha1: c.to_dict() for c in swh_contents} | |||||
# input the list of sha1s we want from storage | |||||
get_sha1s = sorted([c.sha1 for c in swh_contents if c.status != "absent"]) | |||||
start = get_sha1s[0] | |||||
end = get_sha1s[-1] | |||||
# retrieve contents limited to n-1 results | |||||
limited_results = len(get_sha1s) - 1 | |||||
actual_result = swh_storage.content_get_range(start, end, limit=limited_results) | |||||
actual_contents = actual_result["contents"] | |||||
assert actual_result["next"] == get_sha1s[-1] | |||||
assert len(actual_contents) == limited_results | |||||
expected_contents = [contents_map[sha1] for sha1 in get_sha1s[:-1]] | |||||
assert_contents_ok(expected_contents, actual_contents, ["sha1"]) | |||||
# retrieve next part | |||||
actual_results2 = swh_storage.content_get_range(start=end, end=end) | |||||
assert actual_results2["next"] is None | |||||
actual_contents2 = actual_results2["contents"] | |||||
assert len(actual_contents2) == 1 | |||||
assert_contents_ok([contents_map[get_sha1s[-1]]], actual_contents2, ["sha1"]) | |||||
@pytest.mark.parametrize("limit", [1, 7, 10, 100, 1000]) | @pytest.mark.parametrize("limit", [1, 7, 10, 100, 1000]) | ||||
def test_origin_list(self, swh_storage, swh_origins, limit): | def test_origin_list(self, swh_storage, swh_origins, limit): | ||||
returned_origins = [] | returned_origins = [] | ||||
page_token = None | page_token = None | ||||
i = 0 | i = 0 | ||||
while True: | while True: | ||||
actual_page = swh_storage.origin_list(page_token=page_token, limit=limit) | actual_page = swh_storage.origin_list(page_token=page_token, limit=limit) | ||||
▲ Show 20 Lines • Show All 344 Lines • Show Last 20 Lines |