Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/tests/test_cookers.py
Show First 20 Lines • Show All 538 Lines • ▼ Show 20 Lines | def test_directory_revision_data(self, swh_storage): | ||||
swh_storage.directory_add([dir]) | swh_storage.directory_add([dir]) | ||||
with cook_extract_directory_dircooker(swh_storage, dir.id, fsck=False) as p: | with cook_extract_directory_dircooker(swh_storage, dir.id, fsck=False) as p: | ||||
assert (p / "submodule").is_symlink() | assert (p / "submodule").is_symlink() | ||||
assert os.readlink(str(p / "submodule")) == target_rev | assert os.readlink(str(p / "submodule")) == target_rev | ||||
class TestRevisionCooker: | class TestRevisionCooker: | ||||
def test_revision_simple(self, git_loader, cook_extract_revision): | def load_repo_simple(self, git_loader): | ||||
# | # | ||||
# 1--2--3--4--5--6--7 | # 1--2--3--4--5--6--7 | ||||
# | # | ||||
repo = TestRepo() | repo = TestRepo() | ||||
with repo as rp: | with repo as rp: | ||||
(rp / "file1").write_text(TEST_CONTENT) | (rp / "file1").write_text(TEST_CONTENT) | ||||
repo.commit("add file1") | repo.commit("add file1") | ||||
(rp / "file2").write_text(TEST_CONTENT) | (rp / "file2").write_text(TEST_CONTENT) | ||||
Show All 9 Lines | def load_repo_simple(self, git_loader): | ||||
(rp / "file2").unlink() | (rp / "file2").unlink() | ||||
repo.commit("remove file2") | repo.commit("remove file2") | ||||
(rp / "bin1").rename(rp / "bin") | (rp / "bin1").rename(rp / "bin") | ||||
repo.commit("rename bin1 to bin") | repo.commit("rename bin1 to bin") | ||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
obj_id_hex = repo.repo.refs[b"HEAD"].decode() | obj_id_hex = repo.repo.refs[b"HEAD"].decode() | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
return (loader, obj_id) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | def check_revision_simple(self, ert, p, obj_id): | ||||
ert.checkout(b"HEAD") | ert.checkout(b"HEAD") | ||||
assert (p / "file1").stat().st_mode == 0o100644 | assert (p / "file1").stat().st_mode == 0o100644 | ||||
assert (p / "file1").read_text() == TEST_CONTENT | assert (p / "file1").read_text() == TEST_CONTENT | ||||
assert (p / "link1").is_symlink() | assert (p / "link1").is_symlink() | ||||
assert os.readlink(str(p / "link1")) == "file1" | assert os.readlink(str(p / "link1")) == "file1" | ||||
assert (p / "bin").stat().st_mode == 0o100755 | assert (p / "bin").stat().st_mode == 0o100755 | ||||
assert (p / "bin").read_bytes() == TEST_EXECUTABLE | assert (p / "bin").read_bytes() == TEST_EXECUTABLE | ||||
assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT | assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT | ||||
assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 | assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 | ||||
assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex | assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() | ||||
def test_revision_two_roots(self, git_loader, cook_extract_revision): | def test_revision_simple(self, git_loader, cook_extract_revision): | ||||
(loader, obj_id) = self.load_repo_simple(git_loader) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | |||||
self.check_revision_simple(ert, p, obj_id) | |||||
def load_repo_two_roots(self, git_loader): | |||||
# | # | ||||
# 1----3---4 | # 1----3---4 | ||||
# / | # / | ||||
# 2---- | # 2---- | ||||
# | # | ||||
repo = TestRepo() | repo = TestRepo() | ||||
with repo as rp: | with repo as rp: | ||||
(rp / "file1").write_text(TEST_CONTENT) | (rp / "file1").write_text(TEST_CONTENT) | ||||
c1 = repo.commit("Add file1") | c1 = repo.commit("Add file1") | ||||
del repo.repo.refs[b"refs/heads/master"] # git update-ref -d HEAD | del repo.repo.refs[b"refs/heads/master"] # git update-ref -d HEAD | ||||
(rp / "file2").write_text(TEST_CONTENT) | (rp / "file2").write_text(TEST_CONTENT) | ||||
repo.commit("Add file2") | repo.commit("Add file2") | ||||
repo.merge([c1]) | repo.merge([c1]) | ||||
(rp / "file3").write_text(TEST_CONTENT) | (rp / "file3").write_text(TEST_CONTENT) | ||||
repo.commit("add file3") | repo.commit("add file3") | ||||
obj_id_hex = repo.repo.refs[b"HEAD"].decode() | obj_id_hex = repo.repo.refs[b"HEAD"].decode() | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
return (loader, obj_id) | |||||
def check_revision_two_roots(self, ert, p, obj_id): | |||||
assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() | |||||
def test_revision_two_roots(self, git_loader, cook_extract_revision): | |||||
(loader, obj_id) = self.load_repo_two_roots(git_loader) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | with cook_extract_revision(loader.storage, obj_id) as (ert, p): | ||||
assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex | self.check_revision_two_roots(ert, p, obj_id) | ||||
def test_revision_two_double_fork_merge(self, git_loader, cook_extract_revision): | def load_repo_two_double_fork_merge(self, git_loader): | ||||
# | # | ||||
# 2---4---6 | # 2---4---6 | ||||
# / / / | # / / / | ||||
# 1---3---5 | # 1---3---5 | ||||
# | # | ||||
repo = TestRepo() | repo = TestRepo() | ||||
with repo as rp: | with repo as rp: | ||||
(rp / "file1").write_text(TEST_CONTENT) | (rp / "file1").write_text(TEST_CONTENT) | ||||
Show All 13 Lines | def load_repo_two_double_fork_merge(self, git_loader): | ||||
c5 = repo.commit("Add file3", ref=b"refs/heads/c3") | c5 = repo.commit("Add file3", ref=b"refs/heads/c3") | ||||
repo.merge([c5]) | repo.merge([c5]) | ||||
obj_id_hex = repo.repo.refs[b"HEAD"].decode() | obj_id_hex = repo.repo.refs[b"HEAD"].decode() | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
return (loader, obj_id) | |||||
def check_revision_two_double_fork_merge(self, ert, p, obj_id): | |||||
assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() | |||||
def test_revision_two_double_fork_merge(self, git_loader, cook_extract_revision): | |||||
(loader, obj_id) = self.load_repo_two_double_fork_merge(git_loader) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | with cook_extract_revision(loader.storage, obj_id) as (ert, p): | ||||
assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex | self.check_revision_two_double_fork_merge(ert, p, obj_id) | ||||
def test_revision_triple_merge(self, git_loader, cook_extract_revision): | def load_repo_triple_merge(self, git_loader): | ||||
# | # | ||||
# .---.---5 | # .---.---5 | ||||
# / / / | # / / / | ||||
# 2 3 4 | # 2 3 4 | ||||
# / / / | # / / / | ||||
# 1---.---. | # 1---.---. | ||||
# | # | ||||
repo = TestRepo() | repo = TestRepo() | ||||
with repo as rp: | with repo as rp: | ||||
(rp / "file1").write_text(TEST_CONTENT) | (rp / "file1").write_text(TEST_CONTENT) | ||||
c1 = repo.commit("Commit 1") | c1 = repo.commit("Commit 1") | ||||
repo.repo.refs[b"refs/heads/b1"] = c1 | repo.repo.refs[b"refs/heads/b1"] = c1 | ||||
repo.repo.refs[b"refs/heads/b2"] = c1 | repo.repo.refs[b"refs/heads/b2"] = c1 | ||||
repo.commit("Commit 2") | repo.commit("Commit 2") | ||||
c3 = repo.commit("Commit 3", ref=b"refs/heads/b1") | c3 = repo.commit("Commit 3", ref=b"refs/heads/b1") | ||||
c4 = repo.commit("Commit 4", ref=b"refs/heads/b2") | c4 = repo.commit("Commit 4", ref=b"refs/heads/b2") | ||||
repo.merge([c3, c4]) | repo.merge([c3, c4]) | ||||
obj_id_hex = repo.repo.refs[b"HEAD"].decode() | obj_id_hex = repo.repo.refs[b"HEAD"].decode() | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
return (loader, obj_id) | |||||
def check_revision_triple_merge(self, ert, p, obj_id): | |||||
assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() | |||||
def test_revision_triple_merge(self, git_loader, cook_extract_revision): | |||||
(loader, obj_id) = self.load_repo_triple_merge(git_loader) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | with cook_extract_revision(loader.storage, obj_id) as (ert, p): | ||||
assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex | self.check_revision_triple_merge(ert, p, obj_id) | ||||
def test_revision_filtered_objects(self, git_loader, cook_extract_revision): | def load_repo_filtered_objects(self, git_loader): | ||||
repo = TestRepo() | repo = TestRepo() | ||||
with repo as rp: | with repo as rp: | ||||
file_1, id_1 = hash_content(b"test1") | file_1, id_1 = hash_content(b"test1") | ||||
file_2, id_2 = hash_content(b"test2") | file_2, id_2 = hash_content(b"test2") | ||||
file_3, id_3 = hash_content(b"test3") | file_3, id_3 = hash_content(b"test3") | ||||
(rp / "file").write_bytes(file_1) | (rp / "file").write_bytes(file_1) | ||||
(rp / "hidden_file").write_bytes(file_2) | (rp / "hidden_file").write_bytes(file_2) | ||||
Show All 26 Lines | def load_repo_filtered_objects(self, git_loader): | ||||
select sha1, sha1_git, sha256, blake2s256, length, 'no reason' | select sha1, sha1_git, sha256, blake2s256, length, 'no reason' | ||||
from content | from content | ||||
where sha1 = %s | where sha1 = %s | ||||
""", | """, | ||||
(id_3,), | (id_3,), | ||||
) | ) | ||||
cur.execute("delete from content where sha1 = %s", (id_3,)) | cur.execute("delete from content where sha1 = %s", (id_3,)) | ||||
return (loader, obj_id) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | def check_revision_filtered_objects(self, ert, p, obj_id): | ||||
ert.checkout(b"HEAD") | ert.checkout(b"HEAD") | ||||
assert (p / "file").read_bytes() == b"test1" | assert (p / "file").read_bytes() == b"test1" | ||||
assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE | assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE | ||||
assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE | assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE | ||||
def test_revision_null_fields(self, git_loader, cook_extract_revision): | def test_revision_filtered_objects(self, git_loader, cook_extract_revision): | ||||
(loader, obj_id) = self.load_repo_filtered_objects(git_loader) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | |||||
self.check_revision_filtered_objects(ert, p, obj_id) | |||||
def load_repo_null_fields(self, git_loader): | |||||
# Our schema doesn't enforce a lot of non-null revision fields. We need | # Our schema doesn't enforce a lot of non-null revision fields. We need | ||||
# to check these cases don't break the cooker. | # to check these cases don't break the cooker. | ||||
repo = TestRepo() | repo = TestRepo() | ||||
with repo as rp: | with repo as rp: | ||||
(rp / "file").write_text(TEST_CONTENT) | (rp / "file").write_text(TEST_CONTENT) | ||||
c = repo.commit("initial commit") | c = repo.commit("initial commit") | ||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
Show All 11 Lines | def load_repo_null_fields(self, git_loader): | ||||
type=RevisionType.GIT, | type=RevisionType.GIT, | ||||
directory=dir_id, | directory=dir_id, | ||||
metadata={}, | metadata={}, | ||||
synthetic=True, | synthetic=True, | ||||
) | ) | ||||
storage = loader.storage | storage = loader.storage | ||||
storage.revision_add([test_revision]) | storage.revision_add([test_revision]) | ||||
return (loader, test_revision.id) | |||||
with cook_extract_revision(storage, test_revision.id, fsck=False) as (ert, p): | def check_revision_null_fields(self, ert, p, obj_id): | ||||
ert.checkout(b"HEAD") | ert.checkout(b"HEAD") | ||||
assert (p / "file").stat().st_mode == 0o100644 | assert (p / "file").stat().st_mode == 0o100644 | ||||
def test_revision_null_fields(self, git_loader, cook_extract_revision): | |||||
(loader, obj_id) = self.load_repo_null_fields(git_loader) | |||||
with cook_extract_revision(loader.storage, obj_id, fsck=False) as (ert, p): | |||||
self.check_revision_null_fields(ert, p, obj_id) | |||||
def test_revision_revision_data(self, swh_storage): | def test_revision_revision_data(self, swh_storage): | ||||
target_rev = "0e8a3ad980ec179856012b7eecf4327e99cd44cd" | target_rev = "0e8a3ad980ec179856012b7eecf4327e99cd44cd" | ||||
dir = Directory( | dir = Directory( | ||||
entries=( | entries=( | ||||
DirectoryEntry( | DirectoryEntry( | ||||
name=b"submodule", | name=b"submodule", | ||||
type="rev", | type="rev", | ||||
Show All 24 Lines |