Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
Show First 20 Lines • Show All 583 Lines • ▼ Show 20 Lines | def test_content_missing_per_sha1_git(self, swh_storage, sample_data_model): | ||||
contents = [cont.sha1_git, cont2.sha1_git, missing_cont.sha1_git] | contents = [cont.sha1_git, cont2.sha1_git, missing_cont.sha1_git] | ||||
missing_contents = swh_storage.content_missing_per_sha1_git(contents) | missing_contents = swh_storage.content_missing_per_sha1_git(contents) | ||||
assert list(missing_contents) == [missing_cont.sha1_git] | assert list(missing_contents) == [missing_cont.sha1_git] | ||||
def test_content_get_partition(self, swh_storage, swh_contents): | def test_content_get_partition(self, swh_storage, swh_contents): | ||||
"""content_get_partition paginates results if limit exceeded""" | """content_get_partition paginates results if limit exceeded""" | ||||
expected_contents = [c for c in swh_contents if c["status"] != "absent"] | expected_contents = [c.to_dict() for c in swh_contents if c.status != "absent"] | ||||
actual_contents = [] | actual_contents = [] | ||||
for i in range(16): | for i in range(16): | ||||
actual_result = swh_storage.content_get_partition(i, 16) | actual_result = swh_storage.content_get_partition(i, 16) | ||||
assert actual_result["next_page_token"] is None | assert actual_result["next_page_token"] is None | ||||
actual_contents.extend(actual_result["contents"]) | actual_contents.extend(actual_result["contents"]) | ||||
assert_contents_ok(expected_contents, actual_contents, ["sha1"]) | assert_contents_ok(expected_contents, actual_contents, ["sha1"]) | ||||
def test_content_get_partition_full(self, swh_storage, swh_contents): | def test_content_get_partition_full(self, swh_storage, swh_contents): | ||||
"""content_get_partition for a single partition returns all available | """content_get_partition for a single partition returns all available | ||||
contents""" | contents""" | ||||
expected_contents = [c for c in swh_contents if c["status"] != "absent"] | expected_contents = [c.to_dict() for c in swh_contents if c.status != "absent"] | ||||
actual_result = swh_storage.content_get_partition(0, 1) | actual_result = swh_storage.content_get_partition(0, 1) | ||||
assert actual_result["next_page_token"] is None | assert actual_result["next_page_token"] is None | ||||
actual_contents = actual_result["contents"] | actual_contents = actual_result["contents"] | ||||
assert_contents_ok(expected_contents, actual_contents, ["sha1"]) | assert_contents_ok(expected_contents, actual_contents, ["sha1"]) | ||||
def test_content_get_partition_empty(self, swh_storage, swh_contents): | def test_content_get_partition_empty(self, swh_storage, swh_contents): | ||||
"""content_get_partition when at least one of the partitions is | """content_get_partition when at least one of the partitions is | ||||
empty""" | empty""" | ||||
expected_contents = { | expected_contents = { | ||||
cont["sha1"] for cont in swh_contents if cont["status"] != "absent" | cont.sha1 for cont in swh_contents if cont.status != "absent" | ||||
} | } | ||||
# nb_partitions = smallest power of 2 such that at least one of | # nb_partitions = smallest power of 2 such that at least one of | ||||
# the partitions is empty | # the partitions is empty | ||||
nb_partitions = 1 << math.floor(math.log2(len(swh_contents)) + 1) | nb_partitions = 1 << math.floor(math.log2(len(swh_contents)) + 1) | ||||
seen_sha1s = [] | seen_sha1s = [] | ||||
for i in range(nb_partitions): | for i in range(nb_partitions): | ||||
Show All 13 Lines | def test_content_get_partition_limit_none(self, swh_storage): | ||||
"""content_get_partition call with wrong limit input should fail""" | """content_get_partition call with wrong limit input should fail""" | ||||
with pytest.raises(StorageArgumentException) as e: | with pytest.raises(StorageArgumentException) as e: | ||||
swh_storage.content_get_partition(1, 16, limit=None) | swh_storage.content_get_partition(1, 16, limit=None) | ||||
assert e.value.args == ("limit should not be None",) | assert e.value.args == ("limit should not be None",) | ||||
def test_generate_content_get_partition_pagination(self, swh_storage, swh_contents): | def test_generate_content_get_partition_pagination(self, swh_storage, swh_contents): | ||||
"""content_get_partition returns contents within range provided""" | """content_get_partition returns contents within range provided""" | ||||
expected_contents = [c for c in swh_contents if c["status"] != "absent"] | expected_contents = [c.to_dict() for c in swh_contents if c.status != "absent"] | ||||
# retrieve contents | # retrieve contents | ||||
actual_contents = [] | actual_contents = [] | ||||
for i in range(4): | for i in range(4): | ||||
page_token = None | page_token = None | ||||
while True: | while True: | ||||
actual_result = swh_storage.content_get_partition( | actual_result = swh_storage.content_get_partition( | ||||
i, 4, limit=3, page_token=page_token | i, 4, limit=3, page_token=page_token | ||||
▲ Show 20 Lines • Show All 3,063 Lines • ▼ Show 20 Lines | def test_origin_metadata_get__invalid_id_type(self, swh_storage): | ||||
with pytest.raises(StorageArgumentException, match="SWHID"): | with pytest.raises(StorageArgumentException, match="SWHID"): | ||||
swh_storage.object_metadata_get( | swh_storage.object_metadata_get( | ||||
MetadataTargetType.ORIGIN, data.content_metadata.id, authority, | MetadataTargetType.ORIGIN, data.content_metadata.id, authority, | ||||
) | ) | ||||
class TestStorageGeneratedData: | class TestStorageGeneratedData: | ||||
def test_generate_content_get(self, swh_storage, swh_contents): | def test_generate_content_get(self, swh_storage, swh_contents): | ||||
contents_with_data = [c for c in swh_contents if c["status"] != "absent"] | contents_with_data = [c.to_dict() for c in swh_contents if c.status != "absent"] | ||||
# input the list of sha1s we want from storage | # input the list of sha1s we want from storage | ||||
get_sha1s = [c["sha1"] for c in contents_with_data] | get_sha1s = [c["sha1"] for c in contents_with_data] | ||||
# retrieve contents | # retrieve contents | ||||
actual_contents = list(swh_storage.content_get(get_sha1s)) | actual_contents = list(swh_storage.content_get(get_sha1s)) | ||||
assert None not in actual_contents | assert None not in actual_contents | ||||
assert_contents_ok(contents_with_data, actual_contents) | assert_contents_ok(contents_with_data, actual_contents) | ||||
def test_generate_content_get_metadata(self, swh_storage, swh_contents): | def test_generate_content_get_metadata(self, swh_storage, swh_contents): | ||||
# input the list of sha1s we want from storage | # input the list of sha1s we want from storage | ||||
expected_contents = [c for c in swh_contents if c["status"] != "absent"] | expected_contents = [c.to_dict() for c in swh_contents if c.status != "absent"] | ||||
get_sha1s = [c["sha1"] for c in expected_contents] | get_sha1s = [c["sha1"] for c in expected_contents] | ||||
# retrieve contents | # retrieve contents | ||||
meta_contents = swh_storage.content_get_metadata(get_sha1s) | meta_contents = swh_storage.content_get_metadata(get_sha1s) | ||||
assert len(list(meta_contents)) == len(get_sha1s) | assert len(list(meta_contents)) == len(get_sha1s) | ||||
actual_contents = [] | actual_contents = [] | ||||
for contents in meta_contents.values(): | for contents in meta_contents.values(): | ||||
actual_contents.extend(contents) | actual_contents.extend(contents) | ||||
keys_to_check = {"length", "status", "sha1", "sha1_git", "sha256", "blake2s256"} | keys_to_check = {"length", "status", "sha1", "sha1_git", "sha256", "blake2s256"} | ||||
assert_contents_ok( | assert_contents_ok( | ||||
expected_contents, actual_contents, keys_to_check=keys_to_check | expected_contents, actual_contents, keys_to_check=keys_to_check | ||||
) | ) | ||||
def test_generate_content_get_range(self, swh_storage, swh_contents): | def test_generate_content_get_range(self, swh_storage, swh_contents): | ||||
"""content_get_range returns complete range""" | """content_get_range returns complete range""" | ||||
present_contents = [c for c in swh_contents if c["status"] != "absent"] | present_contents = [c.to_dict() for c in swh_contents if c.status != "absent"] | ||||
get_sha1s = sorted([c["sha1"] for c in swh_contents if c["status"] != "absent"]) | get_sha1s = sorted([c.sha1 for c in swh_contents if c.status != "absent"]) | ||||
start = get_sha1s[2] | start = get_sha1s[2] | ||||
end = get_sha1s[-2] | end = get_sha1s[-2] | ||||
actual_result = swh_storage.content_get_range(start, end) | actual_result = swh_storage.content_get_range(start, end) | ||||
assert actual_result["next"] is None | assert actual_result["next"] is None | ||||
actual_contents = actual_result["contents"] | actual_contents = actual_result["contents"] | ||||
expected_contents = [c for c in present_contents if start <= c["sha1"] <= end] | expected_contents = [c for c in present_contents if start <= c["sha1"] <= end] | ||||
if expected_contents: | if expected_contents: | ||||
assert_contents_ok(expected_contents, actual_contents, ["sha1"]) | assert_contents_ok(expected_contents, actual_contents, ["sha1"]) | ||||
else: | else: | ||||
assert actual_contents == [] | assert actual_contents == [] | ||||
def test_generate_content_get_range_full(self, swh_storage, swh_contents): | def test_generate_content_get_range_full(self, swh_storage, swh_contents): | ||||
"""content_get_range for a full range returns all available contents""" | """content_get_range for a full range returns all available contents""" | ||||
present_contents = [c for c in swh_contents if c["status"] != "absent"] | present_contents = [c.to_dict() for c in swh_contents if c.status != "absent"] | ||||
start = b"0" * 40 | start = b"0" * 40 | ||||
end = b"f" * 40 | end = b"f" * 40 | ||||
actual_result = swh_storage.content_get_range(start, end) | actual_result = swh_storage.content_get_range(start, end) | ||||
assert actual_result["next"] is None | assert actual_result["next"] is None | ||||
actual_contents = actual_result["contents"] | actual_contents = actual_result["contents"] | ||||
expected_contents = [c for c in present_contents if start <= c["sha1"] <= end] | expected_contents = [c for c in present_contents if start <= c["sha1"] <= end] | ||||
Show All 15 Lines | def test_generate_content_get_range_limit_none(self, swh_storage): | ||||
with pytest.raises(StorageArgumentException) as e: | with pytest.raises(StorageArgumentException) as e: | ||||
swh_storage.content_get_range(start=None, end=None, limit=None) | swh_storage.content_get_range(start=None, end=None, limit=None) | ||||
assert e.value.args == ("limit should not be None",) | assert e.value.args == ("limit should not be None",) | ||||
def test_generate_content_get_range_no_limit(self, swh_storage, swh_contents): | def test_generate_content_get_range_no_limit(self, swh_storage, swh_contents): | ||||
"""content_get_range returns contents within range provided""" | """content_get_range returns contents within range provided""" | ||||
# input the list of sha1s we want from storage | # input the list of sha1s we want from storage | ||||
get_sha1s = sorted([c["sha1"] for c in swh_contents if c["status"] != "absent"]) | get_sha1s = sorted([c.sha1 for c in swh_contents if c.status != "absent"]) | ||||
start = get_sha1s[0] | start = get_sha1s[0] | ||||
end = get_sha1s[-1] | end = get_sha1s[-1] | ||||
# retrieve contents | # retrieve contents | ||||
actual_result = swh_storage.content_get_range(start, end) | actual_result = swh_storage.content_get_range(start, end) | ||||
actual_contents = actual_result["contents"] | actual_contents = actual_result["contents"] | ||||
assert actual_result["next"] is None | assert actual_result["next"] is None | ||||
assert len(actual_contents) == len(get_sha1s) | assert len(actual_contents) == len(get_sha1s) | ||||
expected_contents = [c for c in swh_contents if c["status"] != "absent"] | expected_contents = [c.to_dict() for c in swh_contents if c.status != "absent"] | ||||
assert_contents_ok(expected_contents, actual_contents, ["sha1"]) | assert_contents_ok(expected_contents, actual_contents, ["sha1"]) | ||||
def test_generate_content_get_range_limit(self, swh_storage, swh_contents): | def test_generate_content_get_range_limit(self, swh_storage, swh_contents): | ||||
"""content_get_range paginates results if limit exceeded""" | """content_get_range paginates results if limit exceeded""" | ||||
contents_map = {c["sha1"]: c for c in swh_contents} | contents_map = {c.sha1: c.to_dict() for c in swh_contents} | ||||
# input the list of sha1s we want from storage | # input the list of sha1s we want from storage | ||||
get_sha1s = sorted([c["sha1"] for c in swh_contents if c["status"] != "absent"]) | get_sha1s = sorted([c.sha1 for c in swh_contents if c.status != "absent"]) | ||||
start = get_sha1s[0] | start = get_sha1s[0] | ||||
end = get_sha1s[-1] | end = get_sha1s[-1] | ||||
# retrieve contents limited to n-1 results | # retrieve contents limited to n-1 results | ||||
limited_results = len(get_sha1s) - 1 | limited_results = len(get_sha1s) - 1 | ||||
actual_result = swh_storage.content_get_range(start, end, limit=limited_results) | actual_result = swh_storage.content_get_range(start, end, limit=limited_results) | ||||
actual_contents = actual_result["contents"] | actual_contents = actual_result["contents"] | ||||
▲ Show 20 Lines • Show All 420 Lines • Show Last 20 Lines |