Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/tests/test_cookers.py
Show First 20 Lines • Show All 537 Lines • ▼ Show 20 Lines | def test_directory_revision_data(self, swh_storage): | ||||
) | ) | ||||
swh_storage.directory_add([dir]) | swh_storage.directory_add([dir]) | ||||
with cook_extract_directory_dircooker(swh_storage, dir.id, fsck=False) as p: | with cook_extract_directory_dircooker(swh_storage, dir.id, fsck=False) as p: | ||||
assert (p / "submodule").is_symlink() | assert (p / "submodule").is_symlink() | ||||
assert os.readlink(str(p / "submodule")) == target_rev | assert os.readlink(str(p / "submodule")) == target_rev | ||||
class TestRevisionCooker: | class RepoFixtures: | ||||
"""Shared loading and checking methods that can be reused by different types | |||||
of tests.""" | |||||
def load_repo_simple(self, git_loader): | def load_repo_simple(self, git_loader): | ||||
# | # | ||||
# 1--2--3--4--5--6--7 | # 1--2--3--4--5--6--7 | ||||
# | # | ||||
repo = TestRepo() | repo = TestRepo() | ||||
with repo as rp: | with repo as rp: | ||||
(rp / "file1").write_text(TEST_CONTENT) | (rp / "file1").write_text(TEST_CONTENT) | ||||
repo.commit("add file1") | repo.commit("add file1") | ||||
Show All 24 Lines | def check_revision_simple(self, ert, p, obj_id): | ||||
assert (p / "link1").is_symlink() | assert (p / "link1").is_symlink() | ||||
assert os.readlink(str(p / "link1")) == "file1" | assert os.readlink(str(p / "link1")) == "file1" | ||||
assert (p / "bin").stat().st_mode == 0o100755 | assert (p / "bin").stat().st_mode == 0o100755 | ||||
assert (p / "bin").read_bytes() == TEST_EXECUTABLE | assert (p / "bin").read_bytes() == TEST_EXECUTABLE | ||||
assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT | assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT | ||||
assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 | assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 | ||||
assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() | assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() | ||||
def test_revision_simple(self, git_loader, cook_extract_revision): | |||||
(loader, obj_id) = self.load_repo_simple(git_loader) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | |||||
self.check_revision_simple(ert, p, obj_id) | |||||
def load_repo_two_roots(self, git_loader): | def load_repo_two_roots(self, git_loader): | ||||
# | # | ||||
# 1----3---4 | # 1----3---4 | ||||
# / | # / | ||||
# 2---- | # 2---- | ||||
# | # | ||||
repo = TestRepo() | repo = TestRepo() | ||||
with repo as rp: | with repo as rp: | ||||
Show All 9 Lines | def load_repo_two_roots(self, git_loader): | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
return (loader, obj_id) | return (loader, obj_id) | ||||
def check_revision_two_roots(self, ert, p, obj_id): | def check_revision_two_roots(self, ert, p, obj_id): | ||||
assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() | assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() | ||||
def test_revision_two_roots(self, git_loader, cook_extract_revision): | |||||
(loader, obj_id) = self.load_repo_two_roots(git_loader) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | |||||
self.check_revision_two_roots(ert, p, obj_id) | |||||
def load_repo_two_double_fork_merge(self, git_loader): | def load_repo_two_double_fork_merge(self, git_loader): | ||||
# | # | ||||
# 2---4---6 | # 2---4---6 | ||||
# / / / | # / / / | ||||
# 1---3---5 | # 1---3---5 | ||||
# | # | ||||
repo = TestRepo() | repo = TestRepo() | ||||
with repo as rp: | with repo as rp: | ||||
Show All 19 Lines | def load_repo_two_double_fork_merge(self, git_loader): | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
return (loader, obj_id) | return (loader, obj_id) | ||||
def check_revision_two_double_fork_merge(self, ert, p, obj_id): | def check_revision_two_double_fork_merge(self, ert, p, obj_id): | ||||
assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() | assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() | ||||
def test_revision_two_double_fork_merge(self, git_loader, cook_extract_revision): | |||||
(loader, obj_id) = self.load_repo_two_double_fork_merge(git_loader) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | |||||
self.check_revision_two_double_fork_merge(ert, p, obj_id) | |||||
def load_repo_triple_merge(self, git_loader): | def load_repo_triple_merge(self, git_loader): | ||||
# | # | ||||
# .---.---5 | # .---.---5 | ||||
# / / / | # / / / | ||||
# 2 3 4 | # 2 3 4 | ||||
# / / / | # / / / | ||||
# 1---.---. | # 1---.---. | ||||
# | # | ||||
Show All 13 Lines | def load_repo_triple_merge(self, git_loader): | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
return (loader, obj_id) | return (loader, obj_id) | ||||
def check_revision_triple_merge(self, ert, p, obj_id): | def check_revision_triple_merge(self, ert, p, obj_id): | ||||
assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() | assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() | ||||
def test_revision_triple_merge(self, git_loader, cook_extract_revision): | |||||
(loader, obj_id) = self.load_repo_triple_merge(git_loader) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | |||||
self.check_revision_triple_merge(ert, p, obj_id) | |||||
def load_repo_filtered_objects(self, git_loader): | def load_repo_filtered_objects(self, git_loader): | ||||
repo = TestRepo() | repo = TestRepo() | ||||
with repo as rp: | with repo as rp: | ||||
file_1, id_1 = hash_content(b"test1") | file_1, id_1 = hash_content(b"test1") | ||||
file_2, id_2 = hash_content(b"test2") | file_2, id_2 = hash_content(b"test2") | ||||
file_3, id_3 = hash_content(b"test3") | file_3, id_3 = hash_content(b"test3") | ||||
(rp / "file").write_bytes(file_1) | (rp / "file").write_bytes(file_1) | ||||
Show All 35 Lines | def load_repo_filtered_objects(self, git_loader): | ||||
return (loader, obj_id) | return (loader, obj_id) | ||||
def check_revision_filtered_objects(self, ert, p, obj_id): | def check_revision_filtered_objects(self, ert, p, obj_id): | ||||
ert.checkout(b"HEAD") | ert.checkout(b"HEAD") | ||||
assert (p / "file").read_bytes() == b"test1" | assert (p / "file").read_bytes() == b"test1" | ||||
assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE | assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE | ||||
assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE | assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE | ||||
def test_revision_filtered_objects(self, git_loader, cook_extract_revision): | |||||
(loader, obj_id) = self.load_repo_filtered_objects(git_loader) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | |||||
self.check_revision_filtered_objects(ert, p, obj_id) | |||||
def load_repo_null_fields(self, git_loader): | def load_repo_null_fields(self, git_loader): | ||||
# Our schema doesn't enforce a lot of non-null revision fields. We need | # Our schema doesn't enforce a lot of non-null revision fields. We need | ||||
# to check these cases don't break the cooker. | # to check these cases don't break the cooker. | ||||
repo = TestRepo() | repo = TestRepo() | ||||
with repo as rp: | with repo as rp: | ||||
(rp / "file").write_text(TEST_CONTENT) | (rp / "file").write_text(TEST_CONTENT) | ||||
c = repo.commit("initial commit") | c = repo.commit("initial commit") | ||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
Show All 18 Lines | def load_repo_null_fields(self, git_loader): | ||||
storage = loader.storage | storage = loader.storage | ||||
storage.revision_add([test_revision]) | storage.revision_add([test_revision]) | ||||
return (loader, test_revision.id) | return (loader, test_revision.id) | ||||
def check_revision_null_fields(self, ert, p, obj_id): | def check_revision_null_fields(self, ert, p, obj_id): | ||||
ert.checkout(b"HEAD") | ert.checkout(b"HEAD") | ||||
assert (p / "file").stat().st_mode == 0o100644 | assert (p / "file").stat().st_mode == 0o100644 | ||||
class TestRevisionCooker(RepoFixtures): | |||||
def test_revision_simple(self, git_loader, cook_extract_revision): | |||||
(loader, obj_id) = self.load_repo_simple(git_loader) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | |||||
self.check_revision_simple(ert, p, obj_id) | |||||
def test_revision_two_roots(self, git_loader, cook_extract_revision): | |||||
(loader, obj_id) = self.load_repo_two_roots(git_loader) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | |||||
self.check_revision_two_roots(ert, p, obj_id) | |||||
def test_revision_two_double_fork_merge(self, git_loader, cook_extract_revision): | |||||
(loader, obj_id) = self.load_repo_two_double_fork_merge(git_loader) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | |||||
self.check_revision_two_double_fork_merge(ert, p, obj_id) | |||||
def test_revision_triple_merge(self, git_loader, cook_extract_revision): | |||||
(loader, obj_id) = self.load_repo_triple_merge(git_loader) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | |||||
self.check_revision_triple_merge(ert, p, obj_id) | |||||
def test_revision_filtered_objects(self, git_loader, cook_extract_revision): | |||||
(loader, obj_id) = self.load_repo_filtered_objects(git_loader) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | |||||
self.check_revision_filtered_objects(ert, p, obj_id) | |||||
def test_revision_null_fields(self, git_loader, cook_extract_revision): | def test_revision_null_fields(self, git_loader, cook_extract_revision): | ||||
(loader, obj_id) = self.load_repo_null_fields(git_loader) | (loader, obj_id) = self.load_repo_null_fields(git_loader) | ||||
with cook_extract_revision(loader.storage, obj_id, fsck=False) as (ert, p): | with cook_extract_revision(loader.storage, obj_id, fsck=False) as (ert, p): | ||||
self.check_revision_null_fields(ert, p, obj_id) | self.check_revision_null_fields(ert, p, obj_id) | ||||
def test_revision_revision_data(self, swh_storage): | def test_revision_revision_data(self, swh_storage): | ||||
target_rev = "0e8a3ad980ec179856012b7eecf4327e99cd44cd" | target_rev = "0e8a3ad980ec179856012b7eecf4327e99cd44cd" | ||||
Show All 29 Lines |