Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
Show First 20 Lines • Show All 60 Lines • ▼ Show 20 Lines | def normalize_entity(entity): | ||||
entity = copy.deepcopy(entity) | entity = copy.deepcopy(entity) | ||||
for key in ("date", "committer_date"): | for key in ("date", "committer_date"): | ||||
if key in entity: | if key in entity: | ||||
entity[key] = identifiers.normalize_timestamp(entity[key]) | entity[key] = identifiers.normalize_timestamp(entity[key]) | ||||
return entity | return entity | ||||
def transform_entries(dir_, *, prefix=b""): | def transform_entries(dir_, *, prefix=b""): | ||||
for ent in dir_["entries"]: | for ent in dir_.entries: | ||||
yield { | yield { | ||||
"dir_id": dir_["id"], | "dir_id": dir_.id, | ||||
"type": ent["type"], | "type": ent.type, | ||||
"target": ent["target"], | "target": ent.target, | ||||
"name": prefix + ent["name"], | "name": prefix + ent.name, | ||||
"perms": ent["perms"], | "perms": ent.perms, | ||||
"status": None, | "status": None, | ||||
"sha1": None, | "sha1": None, | ||||
"sha1_git": None, | "sha1_git": None, | ||||
"sha256": None, | "sha256": None, | ||||
"length": None, | "length": None, | ||||
} | } | ||||
▲ Show 20 Lines • Show All 573 Lines • ▼ Show 20 Lines | def test_generate_content_get_partition_pagination(self, swh_storage, swh_contents): | ||||
actual_contents.extend(actual_result["contents"]) | actual_contents.extend(actual_result["contents"]) | ||||
page_token = actual_result["next_page_token"] | page_token = actual_result["next_page_token"] | ||||
if page_token is None: | if page_token is None: | ||||
break | break | ||||
assert_contents_ok(expected_contents, actual_contents, ["sha1"]) | assert_contents_ok(expected_contents, actual_contents, ["sha1"]) | ||||
def test_content_get_metadata(self, swh_storage): | def test_content_get_metadata(self, swh_storage, sample_data_model): | ||||
cont1 = data.cont | cont1, cont2 = sample_data_model["content"][:2] | ||||
cont2 = data.cont2 | |||||
swh_storage.content_add([cont1, cont2]) | swh_storage.content_add([cont1, cont2]) | ||||
actual_md = swh_storage.content_get_metadata([cont1["sha1"], cont2["sha1"]]) | actual_md = swh_storage.content_get_metadata([cont1.sha1, cont2.sha1]) | ||||
# we only retrieve the metadata | # we only retrieve the metadata so no data nor ctime within | ||||
cont1.pop("data") | expected_cont1, expected_cont2 = [ | ||||
cont2.pop("data") | attr.evolve(c, data=None).to_dict() for c in [cont1, cont2] | ||||
] | |||||
expected_cont1.pop("ctime") | |||||
expected_cont2.pop("ctime") | |||||
assert tuple(actual_md[cont1["sha1"]]) == (cont1,) | assert tuple(actual_md[cont1.sha1]) == (expected_cont1,) | ||||
assert tuple(actual_md[cont2["sha1"]]) == (cont2,) | assert tuple(actual_md[cont2.sha1]) == (expected_cont2,) | ||||
assert len(actual_md.keys()) == 2 | assert len(actual_md.keys()) == 2 | ||||
def test_content_get_metadata_missing_sha1(self, swh_storage): | def test_content_get_metadata_missing_sha1(self, swh_storage, sample_data_model): | ||||
cont1 = data.cont | cont1, cont2 = sample_data_model["content"][:2] | ||||
cont2 = data.cont2 | missing_cont = sample_data_model["skipped_content"][0] | ||||
missing_cont = data.missing_cont | |||||
swh_storage.content_add([cont1, cont2]) | swh_storage.content_add([cont1, cont2]) | ||||
actual_contents = swh_storage.content_get_metadata([missing_cont["sha1"]]) | actual_contents = swh_storage.content_get_metadata([missing_cont.sha1]) | ||||
assert len(actual_contents) == 1 | assert len(actual_contents) == 1 | ||||
assert tuple(actual_contents[missing_cont["sha1"]]) == () | assert tuple(actual_contents[missing_cont.sha1]) == () | ||||
def test_content_get_random(self, swh_storage): | def test_content_get_random(self, swh_storage, sample_data_model): | ||||
swh_storage.content_add([data.cont, data.cont2, data.cont3]) | cont, cont2 = sample_data_model["content"][:2] | ||||
cont3 = sample_data_model["content_metadata"][0] | |||||
swh_storage.content_add([cont, cont2, cont3]) | |||||
assert swh_storage.content_get_random() in { | assert swh_storage.content_get_random() in { | ||||
data.cont["sha1_git"], | cont.sha1_git, | ||||
data.cont2["sha1_git"], | cont2.sha1_git, | ||||
data.cont3["sha1_git"], | cont3.sha1_git, | ||||
} | } | ||||
def test_directory_add(self, swh_storage): | def test_directory_add(self, swh_storage, sample_data_model): | ||||
init_missing = list(swh_storage.directory_missing([data.dir["id"]])) | directory = sample_data_model["directory"][1] | ||||
assert [data.dir["id"]] == init_missing | |||||
actual_result = swh_storage.directory_add([data.dir]) | init_missing = list(swh_storage.directory_missing([directory.id])) | ||||
assert [directory.id] == init_missing | |||||
actual_result = swh_storage.directory_add([directory]) | |||||
assert actual_result == {"directory:add": 1} | assert actual_result == {"directory:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("directory", Directory.from_dict(data.dir)) | ("directory", Directory.from_dict(data.dir)) | ||||
] | ] | ||||
actual_data = list(swh_storage.directory_ls(data.dir["id"])) | actual_data = list(swh_storage.directory_ls(directory.id)) | ||||
expected_data = list(transform_entries(data.dir)) | expected_data = list(transform_entries(directory)) | ||||
assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | ||||
after_missing = list(swh_storage.directory_missing([data.dir["id"]])) | after_missing = list(swh_storage.directory_missing([directory.id])) | ||||
assert after_missing == [] | assert after_missing == [] | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["directory"] == 1 | assert swh_storage.stat_counters()["directory"] == 1 | ||||
def test_directory_add_from_generator(self, swh_storage): | def test_directory_add_from_generator(self, swh_storage, sample_data_model): | ||||
directory = sample_data_model["directory"][1] | |||||
def _dir_gen(): | def _dir_gen(): | ||||
yield data.dir | yield directory | ||||
actual_result = swh_storage.directory_add(directories=_dir_gen()) | actual_result = swh_storage.directory_add(directories=_dir_gen()) | ||||
assert actual_result == {"directory:add": 1} | assert actual_result == {"directory:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("directory", Directory.from_dict(data.dir)) | ("directory", directory) | ||||
] | ] | ||||
swh_storage.refresh_stat_counters() | swh_storage.refresh_stat_counters() | ||||
assert swh_storage.stat_counters()["directory"] == 1 | assert swh_storage.stat_counters()["directory"] == 1 | ||||
def test_directory_add_validation(self, swh_storage): | def test_directory_add_validation(self, swh_storage, sample_data_model): | ||||
dir_ = copy.deepcopy(data.dir) | directory = sample_data_model["directory"][1] | ||||
dir_ = directory.to_dict() | |||||
dir_["entries"][0]["type"] = "foobar" | dir_["entries"][0]["type"] = "foobar" | ||||
with pytest.raises(StorageArgumentException, match="type.*foobar"): | with pytest.raises(StorageArgumentException, match="type.*foobar"): | ||||
swh_storage.directory_add([dir_]) | swh_storage.directory_add([dir_]) | ||||
dir_ = copy.deepcopy(data.dir) | dir_ = directory.to_dict() | ||||
del dir_["entries"][0]["target"] | del dir_["entries"][0]["target"] | ||||
with pytest.raises(StorageArgumentException, match="target") as cm: | with pytest.raises(StorageArgumentException, match="target") as cm: | ||||
swh_storage.directory_add([dir_]) | swh_storage.directory_add([dir_]) | ||||
if type(cm.value) == psycopg2.IntegrityError: | if type(cm.value) == psycopg2.IntegrityError: | ||||
assert cm.value.pgcode == psycopg2.errorcodes.NOT_NULL_VIOLATION | assert cm.value.pgcode == psycopg2.errorcodes.NOT_NULL_VIOLATION | ||||
def test_directory_add_twice(self, swh_storage): | def test_directory_add_twice(self, swh_storage, sample_data_model): | ||||
actual_result = swh_storage.directory_add([data.dir]) | directory = sample_data_model["directory"][1] | ||||
actual_result = swh_storage.directory_add([directory]) | |||||
assert actual_result == {"directory:add": 1} | assert actual_result == {"directory:add": 1} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("directory", Directory.from_dict(data.dir)) | ("directory", directory) | ||||
] | ] | ||||
actual_result = swh_storage.directory_add([data.dir]) | actual_result = swh_storage.directory_add([directory]) | ||||
assert actual_result == {"directory:add": 0} | assert actual_result == {"directory:add": 0} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("directory", Directory.from_dict(data.dir)) | ("directory", directory) | ||||
] | ] | ||||
def test_directory_get_recursive(self, swh_storage): | def test_directory_get_recursive(self, swh_storage, sample_data_model): | ||||
init_missing = list(swh_storage.directory_missing([data.dir["id"]])) | dir1, dir2, dir3 = sample_data_model["directory"][:3] | ||||
assert init_missing == [data.dir["id"]] | |||||
actual_result = swh_storage.directory_add([data.dir, data.dir2, data.dir3]) | init_missing = list(swh_storage.directory_missing([dir1.id])) | ||||
assert init_missing == [dir1.id] | |||||
actual_result = swh_storage.directory_add([dir1, dir2, dir3]) | |||||
assert actual_result == {"directory:add": 3} | assert actual_result == {"directory:add": 3} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("directory", Directory.from_dict(data.dir)), | ("directory", dir1), | ||||
("directory", Directory.from_dict(data.dir2)), | ("directory", dir2), | ||||
("directory", Directory.from_dict(data.dir3)), | ("directory", dir3), | ||||
] | ] | ||||
# List directory containing a file and an unknown subdirectory | # List directory containing a file and an unknown subdirectory | ||||
actual_data = list(swh_storage.directory_ls(data.dir["id"], recursive=True)) | actual_data = list(swh_storage.directory_ls(dir1.id, recursive=True)) | ||||
expected_data = list(transform_entries(data.dir)) | expected_data = list(transform_entries(dir1)) | ||||
assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | ||||
# List directory containing a file and an unknown subdirectory | # List directory containing a file and an unknown subdirectory | ||||
actual_data = list(swh_storage.directory_ls(data.dir2["id"], recursive=True)) | actual_data = list(swh_storage.directory_ls(dir2.id, recursive=True)) | ||||
expected_data = list(transform_entries(data.dir2)) | expected_data = list(transform_entries(dir2)) | ||||
assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | ||||
# List directory containing a known subdirectory, entries should | # List directory containing a known subdirectory, entries should | ||||
# be both those of the directory and of the subdir | # be both those of the directory and of the subdir | ||||
actual_data = list(swh_storage.directory_ls(data.dir3["id"], recursive=True)) | actual_data = list(swh_storage.directory_ls(dir3.id, recursive=True)) | ||||
expected_data = list( | expected_data = list( | ||||
itertools.chain( | itertools.chain( | ||||
transform_entries(data.dir3), | transform_entries(dir3), transform_entries(dir2, prefix=b"subdir/"), | ||||
transform_entries(data.dir, prefix=b"subdir/"), | |||||
) | ) | ||||
) | ) | ||||
assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | ||||
def test_directory_get_non_recursive(self, swh_storage): | def test_directory_get_non_recursive(self, swh_storage, sample_data_model): | ||||
init_missing = list(swh_storage.directory_missing([data.dir["id"]])) | dir1, dir2, dir3 = sample_data_model["directory"][:3] | ||||
assert init_missing == [data.dir["id"]] | |||||
init_missing = list(swh_storage.directory_missing([dir1.id])) | |||||
assert init_missing == [dir1.id] | |||||
actual_result = swh_storage.directory_add([data.dir, data.dir2, data.dir3]) | actual_result = swh_storage.directory_add([dir1, dir2, dir3]) | ||||
assert actual_result == {"directory:add": 3} | assert actual_result == {"directory:add": 3} | ||||
assert list(swh_storage.journal_writer.journal.objects) == [ | assert list(swh_storage.journal_writer.journal.objects) == [ | ||||
("directory", Directory.from_dict(data.dir)), | ("directory", dir1), | ||||
("directory", Directory.from_dict(data.dir2)), | ("directory", dir2), | ||||
("directory", Directory.from_dict(data.dir3)), | ("directory", dir3), | ||||
] | ] | ||||
# List directory containing a file and an unknown subdirectory | # List directory containing a file and an unknown subdirectory | ||||
actual_data = list(swh_storage.directory_ls(data.dir["id"])) | actual_data = list(swh_storage.directory_ls(dir1.id)) | ||||
expected_data = list(transform_entries(data.dir)) | expected_data = list(transform_entries(dir1)) | ||||
assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | ||||
# List directory contaiining a single file | # List directory contaiining a single file | ||||
actual_data = list(swh_storage.directory_ls(data.dir2["id"])) | actual_data = list(swh_storage.directory_ls(dir2.id)) | ||||
expected_data = list(transform_entries(data.dir2)) | expected_data = list(transform_entries(dir2)) | ||||
assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | ||||
# List directory containing a known subdirectory, entries should | # List directory containing a known subdirectory, entries should | ||||
# only be those of the parent directory, not of the subdir | # only be those of the parent directory, not of the subdir | ||||
actual_data = list(swh_storage.directory_ls(data.dir3["id"])) | actual_data = list(swh_storage.directory_ls(dir3.id)) | ||||
expected_data = list(transform_entries(data.dir3)) | expected_data = list(transform_entries(dir3)) | ||||
assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir) | ||||
def test_directory_entry_get_by_path(self, swh_storage): | def test_directory_entry_get_by_path(self, swh_storage, sample_data_model): | ||||
cont = sample_data_model["content"][0] | |||||
dir1, dir2, dir3, dir4 = sample_data_model["directory"][:4] | |||||
# given | # given | ||||
init_missing = list(swh_storage.directory_missing([data.dir3["id"]])) | init_missing = list(swh_storage.directory_missing([dir3.id])) | ||||
assert [data.dir3["id"]] == init_missing | assert init_missing == [dir3.id] | ||||
actual_result = swh_storage.directory_add([data.dir3, data.dir4]) | actual_result = swh_storage.directory_add([dir3, dir4]) | ||||
assert actual_result == {"directory:add": 2} | assert actual_result == {"directory:add": 2} | ||||
expected_entries = [ | expected_entries = [ | ||||
{ | { | ||||
"dir_id": data.dir3["id"], | "dir_id": dir3.id, | ||||
"name": b"foo", | "name": b"foo", | ||||
"type": "file", | "type": "file", | ||||
"target": data.cont["sha1_git"], | "target": cont.sha1_git, | ||||
"sha1": None, | "sha1": None, | ||||
"sha1_git": None, | "sha1_git": None, | ||||
"sha256": None, | "sha256": None, | ||||
"status": None, | "status": None, | ||||
"perms": from_disk.DentryPerms.content, | "perms": from_disk.DentryPerms.content, | ||||
"length": None, | "length": None, | ||||
}, | }, | ||||
{ | { | ||||
"dir_id": data.dir3["id"], | "dir_id": dir3.id, | ||||
"name": b"subdir", | "name": b"subdir", | ||||
"type": "dir", | "type": "dir", | ||||
"target": data.dir["id"], | "target": dir2.id, | ||||
"sha1": None, | "sha1": None, | ||||
"sha1_git": None, | "sha1_git": None, | ||||
"sha256": None, | "sha256": None, | ||||
"status": None, | "status": None, | ||||
"perms": from_disk.DentryPerms.directory, | "perms": from_disk.DentryPerms.directory, | ||||
"length": None, | "length": None, | ||||
}, | }, | ||||
{ | { | ||||
"dir_id": data.dir3["id"], | "dir_id": dir3.id, | ||||
"name": b"hello", | "name": b"hello", | ||||
"type": "file", | "type": "file", | ||||
"target": b"12345678901234567890", | "target": b"12345678901234567890", | ||||
"sha1": None, | "sha1": None, | ||||
"sha1_git": None, | "sha1_git": None, | ||||
"sha256": None, | "sha256": None, | ||||
"status": None, | "status": None, | ||||
"perms": from_disk.DentryPerms.content, | "perms": from_disk.DentryPerms.content, | ||||
"length": None, | "length": None, | ||||
}, | }, | ||||
] | ] | ||||
# when (all must be found here) | # when (all must be found here) | ||||
for entry, expected_entry in zip(data.dir3["entries"], expected_entries): | for entry, expected_entry in zip(dir3.entries, expected_entries): | ||||
actual_entry = swh_storage.directory_entry_get_by_path( | actual_entry = swh_storage.directory_entry_get_by_path( | ||||
data.dir3["id"], [entry["name"]] | dir3.id, [entry.name] | ||||
) | ) | ||||
assert actual_entry == expected_entry | assert actual_entry == expected_entry | ||||
# same, but deeper | # same, but deeper | ||||
for entry, expected_entry in zip(data.dir3["entries"], expected_entries): | for entry, expected_entry in zip(dir3.entries, expected_entries): | ||||
actual_entry = swh_storage.directory_entry_get_by_path( | actual_entry = swh_storage.directory_entry_get_by_path( | ||||
data.dir4["id"], [b"subdir1", entry["name"]] | dir4.id, [b"subdir1", entry.name] | ||||
) | ) | ||||
expected_entry = expected_entry.copy() | expected_entry = expected_entry.copy() | ||||
expected_entry["name"] = b"subdir1/" + expected_entry["name"] | expected_entry["name"] = b"subdir1/" + expected_entry["name"] | ||||
assert actual_entry == expected_entry | assert actual_entry == expected_entry | ||||
# when (nothing should be found here since data.dir is not persisted.) | # when (nothing should be found here since data.dir is not persisted.) | ||||
for entry in data.dir["entries"]: | for entry in dir2.entries: | ||||
actual_entry = swh_storage.directory_entry_get_by_path( | actual_entry = swh_storage.directory_entry_get_by_path( | ||||
data.dir["id"], [entry["name"]] | dir2.id, [entry.name] | ||||
) | ) | ||||
assert actual_entry is None | assert actual_entry is None | ||||
def test_directory_get_random(self, swh_storage): | def test_directory_get_random(self, swh_storage, sample_data_model): | ||||
swh_storage.directory_add([data.dir, data.dir2, data.dir3]) | dir1, dir2, dir3 = sample_data_model["directory"][:3] | ||||
swh_storage.directory_add([dir1, dir2, dir3]) | |||||
assert swh_storage.directory_get_random() in { | assert swh_storage.directory_get_random() in { | ||||
data.dir["id"], | dir1.id, | ||||
data.dir2["id"], | dir2.id, | ||||
data.dir3["id"], | dir3.id, | ||||
} | } | ||||
def test_revision_add(self, swh_storage): | def test_revision_add(self, swh_storage): | ||||
init_missing = swh_storage.revision_missing([data.revision["id"]]) | init_missing = swh_storage.revision_missing([data.revision["id"]]) | ||||
assert list(init_missing) == [data.revision["id"]] | assert list(init_missing) == [data.revision["id"]] | ||||
actual_result = swh_storage.revision_add([data.revision]) | actual_result = swh_storage.revision_add([data.revision]) | ||||
assert actual_result == {"revision:add": 1} | assert actual_result == {"revision:add": 1} | ||||
▲ Show 20 Lines • Show All 3,339 Lines • Show Last 20 Lines |