Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/tests/test_cookers.py
Show All 38 Lines | |||||
from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE | from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE | ||||
class TestRepo: | class TestRepo: | ||||
"""A tiny context manager for a test git repository, with some utility | """A tiny context manager for a test git repository, with some utility | ||||
functions to perform basic git stuff. | functions to perform basic git stuff. | ||||
""" | """ | ||||
def __init__(self, repo_dir=None): | |||||
self.repo_dir = repo_dir | |||||
def __enter__(self): | def __enter__(self): | ||||
if self.repo_dir: | |||||
self.tmp_dir = None | |||||
self.repo = dulwich.repo.Repo(self.repo_dir) | |||||
else: | |||||
self.tmp_dir = tempfile.TemporaryDirectory(prefix="tmp-vault-repo-") | self.tmp_dir = tempfile.TemporaryDirectory(prefix="tmp-vault-repo-") | ||||
self.repo_dir = self.tmp_dir.__enter__() | self.repo_dir = self.tmp_dir.__enter__() | ||||
self.repo = dulwich.repo.Repo.init(self.repo_dir) | self.repo = dulwich.repo.Repo.init(self.repo_dir) | ||||
self.author_name = b"Test Author" | self.author_name = b"Test Author" | ||||
self.author_email = b"test@softwareheritage.org" | self.author_email = b"test@softwareheritage.org" | ||||
self.author = b"%s <%s>" % (self.author_name, self.author_email) | self.author = b"%s <%s>" % (self.author_name, self.author_email) | ||||
self.base_date = 258244200 | self.base_date = 258244200 | ||||
self.counter = 0 | self.counter = 0 | ||||
return pathlib.Path(self.repo_dir) | return pathlib.Path(self.repo_dir) | ||||
def __exit__(self, exc, value, tb): | def __exit__(self, exc, value, tb): | ||||
if self.tmp_dir is not None: | |||||
self.tmp_dir.__exit__(exc, value, tb) | self.tmp_dir.__exit__(exc, value, tb) | ||||
self.repo_dir = None | |||||
def checkout(self, rev_sha): | def checkout(self, rev_sha): | ||||
rev = self.repo[rev_sha] | rev = self.repo[rev_sha] | ||||
dulwich.index.build_index_from_tree( | dulwich.index.build_index_from_tree( | ||||
self.repo_dir, self.repo.index_path(), self.repo.object_store, rev.tree | self.repo_dir, self.repo.index_path(), self.repo.object_store, rev.tree | ||||
) | ) | ||||
def git_shell(self, *cmd, stdout=subprocess.DEVNULL, **kwargs): | def git_shell(self, *cmd, stdout=subprocess.DEVNULL, **kwargs): | ||||
▲ Show 20 Lines • Show All 155 Lines • ▼ Show 20 Lines | ): | ||||
cooker.fileobj.seek(0) | cooker.fileobj.seek(0) | ||||
# Extract it | # Extract it | ||||
with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: | with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: | ||||
with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: | with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: | ||||
tar.extractall(td) | tar.extractall(td) | ||||
# Clone it with Dulwich | # Clone it with Dulwich | ||||
test_repo = TestRepo() | with tempfile.TemporaryDirectory(prefix="tmp-vault-clone-") as clone_dir: | ||||
with test_repo as p: | clone_dir = pathlib.Path(clone_dir) | ||||
test_repo.git_shell( | subprocess.check_call( | ||||
"pull", os.path.join(td, f"swh:1:dir:{obj_id.hex()}.git") | [ | ||||
"git", | |||||
"clone", | |||||
os.path.join(td, f"swh:1:dir:{obj_id.hex()}.git"), | |||||
clone_dir, | |||||
] | |||||
) | ) | ||||
shutil.rmtree(p / ".git") | shutil.rmtree(clone_dir / ".git") | ||||
yield p | yield clone_dir | ||||
@pytest.fixture( | @pytest.fixture( | ||||
scope="module", | scope="module", | ||||
params=[ | params=[ | ||||
cook_extract_directory_dircooker, | cook_extract_directory_dircooker, | ||||
cook_extract_directory_gitfast, | cook_extract_directory_gitfast, | ||||
cook_extract_directory_git_bare, | cook_extract_directory_git_bare, | ||||
▲ Show 20 Lines • Show All 49 Lines • ▼ Show 20 Lines | def cook_extract_revision_git_bare(storage, obj_id, fsck=True): | ||||
cooker.fileobj.seek(0) | cooker.fileobj.seek(0) | ||||
# Extract it | # Extract it | ||||
with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: | with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: | ||||
with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: | with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: | ||||
tar.extractall(td) | tar.extractall(td) | ||||
# Clone it with Dulwich | # Clone it with Dulwich | ||||
test_repo = TestRepo() | with tempfile.TemporaryDirectory(prefix="tmp-vault-clone-") as clone_dir: | ||||
with test_repo as p: | clone_dir = pathlib.Path(clone_dir) | ||||
test_repo.git_shell( | subprocess.check_call( | ||||
"pull", os.path.join(td, f"swh:1:rev:{obj_id.hex()}.git") | [ | ||||
"git", | |||||
"clone", | |||||
os.path.join(td, f"swh:1:rev:{obj_id.hex()}.git"), | |||||
clone_dir, | |||||
] | |||||
) | ) | ||||
yield test_repo, p | test_repo = TestRepo(clone_dir) | ||||
with test_repo: | |||||
yield test_repo, clone_dir | |||||
@pytest.fixture( | @pytest.fixture( | ||||
scope="module", | scope="module", | ||||
params=[cook_extract_revision_gitfast, cook_extract_revision_git_bare], | params=[cook_extract_revision_gitfast, cook_extract_revision_git_bare], | ||||
) | ) | ||||
def cook_extract_revision(request): | def cook_extract_revision(request): | ||||
"""A fixture that is instantiated as either cook_extract_revision_gitfast or | """A fixture that is instantiated as either cook_extract_revision_gitfast or | ||||
Show All 33 Lines | def test_directory_simple(self, git_loader, cook_extract_directory): | ||||
assert os.readlink(str(p / "link")) == "file" | assert os.readlink(str(p / "link")) == "file" | ||||
assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 | assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 | ||||
assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT | assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT | ||||
directory = from_disk.Directory.from_disk(path=bytes(p)) | directory = from_disk.Directory.from_disk(path=bytes(p)) | ||||
assert obj_id_hex == hashutil.hash_to_hex(directory.hash) | assert obj_id_hex == hashutil.hash_to_hex(directory.hash) | ||||
def test_directory_filtered_objects(self, git_loader, cook_extract_directory): | def test_directory_filtered_objects(self, git_loader, cook_extract_directory): | ||||
if cook_extract_directory is cook_extract_directory_git_bare: | |||||
pytest.xfail("GitBareCooker does not support filtered objects (yet?)") | |||||
repo = TestRepo() | repo = TestRepo() | ||||
with repo as rp: | with repo as rp: | ||||
file_1, id_1 = hash_content(b"test1") | file_1, id_1 = hash_content(b"test1") | ||||
file_2, id_2 = hash_content(b"test2") | file_2, id_2 = hash_content(b"test2") | ||||
file_3, id_3 = hash_content(b"test3") | file_3, id_3 = hash_content(b"test3") | ||||
(rp / "file").write_bytes(file_1) | (rp / "file").write_bytes(file_1) | ||||
(rp / "hidden_file").write_bytes(file_2) | (rp / "hidden_file").write_bytes(file_2) | ||||
Show All 14 Lines | def test_directory_filtered_objects(self, git_loader, cook_extract_directory): | ||||
where sha1 = %s""", | where sha1 = %s""", | ||||
(id_1,), | (id_1,), | ||||
) | ) | ||||
cur.execute( | cur.execute( | ||||
"""update content set status = 'hidden' | """update content set status = 'hidden' | ||||
where sha1 = %s""", | where sha1 = %s""", | ||||
(id_2,), | (id_2,), | ||||
) | ) | ||||
cur.execute( | cur.execute( | ||||
"""update content set status = 'absent' | """ | ||||
where sha1 = %s""", | insert into skipped_content | ||||
(sha1, sha1_git, sha256, blake2s256, length, reason) | |||||
select sha1, sha1_git, sha256, blake2s256, length, 'no reason' | |||||
from content | |||||
where sha1 = %s | |||||
""", | |||||
(id_3,), | (id_3,), | ||||
) | ) | ||||
cur.execute("delete from content where sha1 = %s", (id_3,)) | |||||
with cook_extract_directory(loader.storage, obj_id) as p: | with cook_extract_directory(loader.storage, obj_id) as p: | ||||
assert (p / "file").read_bytes() == b"test1" | assert (p / "file").read_bytes() == b"test1" | ||||
assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE | assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE | ||||
assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE | assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE | ||||
def test_directory_bogus_perms(self, git_loader, cook_extract_directory): | def test_directory_bogus_perms(self, git_loader, cook_extract_directory): | ||||
# Some early git repositories have 664/775 permissions... let's check | # Some early git repositories have 664/775 permissions... let's check | ||||
# if all the weird modes are properly normalized in the directory | # if all the weird modes are properly normalized in the directory | ||||
▲ Show 20 Lines • Show All 234 Lines • ▼ Show 20 Lines | def test_revision_triple_merge(self, git_loader, cook_extract_revision): | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | with cook_extract_revision(loader.storage, obj_id) as (ert, p): | ||||
assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex | assert ert.repo.refs[b"HEAD"].decode() == obj_id_hex | ||||
def test_revision_filtered_objects(self, git_loader, cook_extract_revision): | def test_revision_filtered_objects(self, git_loader, cook_extract_revision): | ||||
if cook_extract_revision is cook_extract_revision_git_bare: | |||||
pytest.xfail("GitBareCooker does not support filtered objects (yet?)") | |||||
repo = TestRepo() | repo = TestRepo() | ||||
with repo as rp: | with repo as rp: | ||||
file_1, id_1 = hash_content(b"test1") | file_1, id_1 = hash_content(b"test1") | ||||
file_2, id_2 = hash_content(b"test2") | file_2, id_2 = hash_content(b"test2") | ||||
file_3, id_3 = hash_content(b"test3") | file_3, id_3 = hash_content(b"test3") | ||||
(rp / "file").write_bytes(file_1) | (rp / "file").write_bytes(file_1) | ||||
(rp / "hidden_file").write_bytes(file_2) | (rp / "hidden_file").write_bytes(file_2) | ||||
Show All 13 Lines | def test_revision_filtered_objects(self, git_loader, cook_extract_revision): | ||||
where sha1 = %s""", | where sha1 = %s""", | ||||
(id_1,), | (id_1,), | ||||
) | ) | ||||
cur.execute( | cur.execute( | ||||
"""update content set status = 'hidden' | """update content set status = 'hidden' | ||||
where sha1 = %s""", | where sha1 = %s""", | ||||
(id_2,), | (id_2,), | ||||
) | ) | ||||
cur.execute( | cur.execute( | ||||
"""update content set status = 'absent' | """ | ||||
where sha1 = %s""", | insert into skipped_content | ||||
(sha1, sha1_git, sha256, blake2s256, length, reason) | |||||
select sha1, sha1_git, sha256, blake2s256, length, 'no reason' | |||||
from content | |||||
where sha1 = %s | |||||
""", | |||||
(id_3,), | (id_3,), | ||||
) | ) | ||||
cur.execute("delete from content where sha1 = %s", (id_3,)) | |||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | with cook_extract_revision(loader.storage, obj_id) as (ert, p): | ||||
ert.checkout(b"HEAD") | ert.checkout(b"HEAD") | ||||
assert (p / "file").read_bytes() == b"test1" | assert (p / "file").read_bytes() == b"test1" | ||||
assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE | assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE | ||||
assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE | assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE | ||||
def test_revision_null_fields(self, git_loader, cook_extract_revision): | def test_revision_null_fields(self, git_loader, cook_extract_revision): | ||||
# Our schema doesn't enforce a lot of non-null revision fields. We need | # Our schema doesn't enforce a lot of non-null revision fields. We need | ||||
▲ Show 20 Lines • Show All 63 Lines • Show Last 20 Lines |