Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/tests/test_cookers.py
Show All 19 Lines | |||||
import dulwich.fastexport | import dulwich.fastexport | ||||
import dulwich.index | import dulwich.index | ||||
import dulwich.objects | import dulwich.objects | ||||
import dulwich.porcelain | import dulwich.porcelain | ||||
import dulwich.repo | import dulwich.repo | ||||
import pytest | import pytest | ||||
from swh.loader.git.from_disk import GitLoaderFromDisk | from swh.loader.git.from_disk import GitLoaderFromDisk | ||||
from swh.model import from_disk, hashutil, identifiers | from swh.model import from_disk, hashutil | ||||
from swh.model.identifiers import CoreSWHID, ObjectType | |||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Directory, | Directory, | ||||
DirectoryEntry, | DirectoryEntry, | ||||
Person, | Person, | ||||
Revision, | Revision, | ||||
RevisionType, | RevisionType, | ||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 130 Lines • ▼ Show 20 Lines | def _create_loader(directory): | ||||
directory=directory, | directory=directory, | ||||
visit_date=datetime.datetime.now(datetime.timezone.utc), | visit_date=datetime.datetime.now(datetime.timezone.utc), | ||||
) | ) | ||||
return _create_loader | return _create_loader | ||||
@contextlib.contextmanager | @contextlib.contextmanager | ||||
def cook_extract_directory_dircooker(storage, obj_id, fsck=True): | def cook_extract_directory_dircooker(storage, swhid, fsck=True): | ||||
"""Context manager that cooks a directory and extract it.""" | """Context manager that cooks a directory and extract it.""" | ||||
backend = unittest.mock.MagicMock() | backend = unittest.mock.MagicMock() | ||||
backend.storage = storage | backend.storage = storage | ||||
cooker = DirectoryCooker("directory", obj_id, backend=backend, storage=storage) | cooker = DirectoryCooker(swhid, backend=backend, storage=storage) | ||||
cooker.fileobj = io.BytesIO() | cooker.fileobj = io.BytesIO() | ||||
assert cooker.check_exists() | assert cooker.check_exists() | ||||
cooker.prepare_bundle() | cooker.prepare_bundle() | ||||
cooker.fileobj.seek(0) | cooker.fileobj.seek(0) | ||||
with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: | with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: | ||||
with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: | with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: | ||||
tar.extractall(td) | tar.extractall(td) | ||||
yield pathlib.Path(td) / hashutil.hash_to_hex(obj_id) | yield pathlib.Path(td) / str(swhid) | ||||
cooker.storage = None | cooker.storage = None | ||||
@contextlib.contextmanager | @contextlib.contextmanager | ||||
def cook_extract_directory_gitfast(storage, obj_id, fsck=True): | def cook_extract_directory_gitfast(storage, swhid, fsck=True): | ||||
"""Context manager that cooks a revision containing a directory and extract it, | """Context manager that cooks a revision containing a directory and extract it, | ||||
using RevisionGitfastCooker""" | using RevisionGitfastCooker""" | ||||
test_repo = TestRepo() | test_repo = TestRepo() | ||||
with test_repo as p: | with test_repo as p: | ||||
date = TimestampWithTimezone.from_datetime( | date = TimestampWithTimezone.from_datetime( | ||||
datetime.datetime.now(datetime.timezone.utc) | datetime.datetime.now(datetime.timezone.utc) | ||||
) | ) | ||||
revision = Revision( | revision = Revision( | ||||
directory=obj_id, | directory=swhid.object_id, | ||||
message=b"dummy message", | message=b"dummy message", | ||||
author=Person.from_fullname(b"someone"), | author=Person.from_fullname(b"someone"), | ||||
committer=Person.from_fullname(b"someone"), | committer=Person.from_fullname(b"someone"), | ||||
date=date, | date=date, | ||||
committer_date=date, | committer_date=date, | ||||
type=RevisionType.GIT, | type=RevisionType.GIT, | ||||
synthetic=False, | synthetic=False, | ||||
) | ) | ||||
storage.revision_add([revision]) | storage.revision_add([revision]) | ||||
with cook_stream_revision_gitfast(storage, revision.id) as stream, test_repo as p: | with cook_stream_revision_gitfast( | ||||
storage, revision.swhid() | |||||
) as stream, test_repo as p: | |||||
processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) | processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) | ||||
processor.import_stream(stream) | processor.import_stream(stream) | ||||
test_repo.checkout(b"HEAD") | test_repo.checkout(b"HEAD") | ||||
shutil.rmtree(p / ".git") | shutil.rmtree(p / ".git") | ||||
yield p | yield p | ||||
@contextlib.contextmanager | @contextlib.contextmanager | ||||
def cook_extract_directory_git_bare( | def cook_extract_directory_git_bare(storage, swhid, fsck=True, direct_objstorage=False): | ||||
storage, obj_id, fsck=True, direct_objstorage=False | |||||
): | |||||
"""Context manager that cooks a revision and extract it, | """Context manager that cooks a revision and extract it, | ||||
using GitBareCooker""" | using GitBareCooker""" | ||||
backend = unittest.mock.MagicMock() | backend = unittest.mock.MagicMock() | ||||
backend.storage = storage | backend.storage = storage | ||||
# Cook the object | # Cook the object | ||||
cooker = GitBareCooker( | cooker = GitBareCooker( | ||||
"directory", | swhid, | ||||
obj_id, | |||||
backend=backend, | backend=backend, | ||||
storage=storage, | storage=storage, | ||||
objstorage=storage.objstorage if direct_objstorage else None, | objstorage=storage.objstorage if direct_objstorage else None, | ||||
) | ) | ||||
cooker.use_fsck = fsck # Some tests try edge-cases that git-fsck rejects | cooker.use_fsck = fsck # Some tests try edge-cases that git-fsck rejects | ||||
cooker.fileobj = io.BytesIO() | cooker.fileobj = io.BytesIO() | ||||
assert cooker.check_exists() | assert cooker.check_exists() | ||||
cooker.prepare_bundle() | cooker.prepare_bundle() | ||||
cooker.fileobj.seek(0) | cooker.fileobj.seek(0) | ||||
# Extract it | # Extract it | ||||
with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: | with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: | ||||
with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: | with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: | ||||
tar.extractall(td) | tar.extractall(td) | ||||
# Clone it with Dulwich | # Clone it with Dulwich | ||||
with tempfile.TemporaryDirectory(prefix="tmp-vault-clone-") as clone_dir: | with tempfile.TemporaryDirectory(prefix="tmp-vault-clone-") as clone_dir: | ||||
clone_dir = pathlib.Path(clone_dir) | clone_dir = pathlib.Path(clone_dir) | ||||
subprocess.check_call( | subprocess.check_call( | ||||
[ | ["git", "clone", os.path.join(td, f"{swhid}.git"), clone_dir,] | ||||
"git", | |||||
"clone", | |||||
os.path.join(td, f"swh:1:dir:{obj_id.hex()}.git"), | |||||
clone_dir, | |||||
] | |||||
) | ) | ||||
shutil.rmtree(clone_dir / ".git") | shutil.rmtree(clone_dir / ".git") | ||||
yield clone_dir | yield clone_dir | ||||
@pytest.fixture( | @pytest.fixture( | ||||
scope="module", | scope="module", | ||||
params=[ | params=[ | ||||
cook_extract_directory_dircooker, | cook_extract_directory_dircooker, | ||||
cook_extract_directory_gitfast, | cook_extract_directory_gitfast, | ||||
cook_extract_directory_git_bare, | cook_extract_directory_git_bare, | ||||
], | ], | ||||
) | ) | ||||
def cook_extract_directory(request): | def cook_extract_directory(request): | ||||
"""A fixture that is instantiated as either cook_extract_directory_dircooker or | """A fixture that is instantiated as either cook_extract_directory_dircooker or | ||||
cook_extract_directory_git_bare.""" | cook_extract_directory_git_bare.""" | ||||
return request.param | return request.param | ||||
@contextlib.contextmanager | @contextlib.contextmanager | ||||
def cook_stream_revision_gitfast(storage, obj_id): | def cook_stream_revision_gitfast(storage, swhid): | ||||
"""Context manager that cooks a revision and stream its fastexport.""" | """Context manager that cooks a revision and stream its fastexport.""" | ||||
backend = unittest.mock.MagicMock() | backend = unittest.mock.MagicMock() | ||||
backend.storage = storage | backend.storage = storage | ||||
cooker = RevisionGitfastCooker( | cooker = RevisionGitfastCooker(swhid, backend=backend, storage=storage) | ||||
"revision_gitfast", obj_id, backend=backend, storage=storage | |||||
) | |||||
cooker.fileobj = io.BytesIO() | cooker.fileobj = io.BytesIO() | ||||
assert cooker.check_exists() | assert cooker.check_exists() | ||||
cooker.prepare_bundle() | cooker.prepare_bundle() | ||||
cooker.fileobj.seek(0) | cooker.fileobj.seek(0) | ||||
fastexport_stream = gzip.GzipFile(fileobj=cooker.fileobj) | fastexport_stream = gzip.GzipFile(fileobj=cooker.fileobj) | ||||
yield fastexport_stream | yield fastexport_stream | ||||
cooker.storage = None | cooker.storage = None | ||||
@contextlib.contextmanager | @contextlib.contextmanager | ||||
def cook_extract_revision_gitfast(storage, obj_id, fsck=True): | def cook_extract_revision_gitfast(storage, swhid, fsck=True): | ||||
"""Context manager that cooks a revision and extract it, | """Context manager that cooks a revision and extract it, | ||||
using RevisionGitfastCooker""" | using RevisionGitfastCooker""" | ||||
test_repo = TestRepo() | test_repo = TestRepo() | ||||
with cook_stream_revision_gitfast(storage, obj_id) as stream, test_repo as p: | with cook_stream_revision_gitfast(storage, swhid) as stream, test_repo as p: | ||||
processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) | processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) | ||||
processor.import_stream(stream) | processor.import_stream(stream) | ||||
yield test_repo, p | yield test_repo, p | ||||
@contextlib.contextmanager | @contextlib.contextmanager | ||||
def cook_extract_git_bare(storage, swhid, fsck=True): | def cook_extract_git_bare(storage, swhid, fsck=True): | ||||
"""Context manager that cooks a revision and extract it, | """Context manager that cooks a revision and extract it, | ||||
using GitBareCooker""" | using GitBareCooker""" | ||||
backend = unittest.mock.MagicMock() | backend = unittest.mock.MagicMock() | ||||
backend.storage = storage | backend.storage = storage | ||||
# Cook the object | # Cook the object | ||||
cooker = GitBareCooker( | cooker = GitBareCooker(swhid, backend=backend, storage=storage) | ||||
swhid.object_type.name.lower(), | |||||
swhid.object_id, | |||||
backend=backend, | |||||
storage=storage, | |||||
) | |||||
cooker.use_fsck = fsck # Some tests try edge-cases that git-fsck rejects | cooker.use_fsck = fsck # Some tests try edge-cases that git-fsck rejects | ||||
cooker.fileobj = io.BytesIO() | cooker.fileobj = io.BytesIO() | ||||
assert cooker.check_exists() | assert cooker.check_exists() | ||||
cooker.prepare_bundle() | cooker.prepare_bundle() | ||||
cooker.fileobj.seek(0) | cooker.fileobj.seek(0) | ||||
# Extract it | # Extract it | ||||
with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: | with tempfile.TemporaryDirectory(prefix="tmp-vault-extract-") as td: | ||||
with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: | with tarfile.open(fileobj=cooker.fileobj, mode="r") as tar: | ||||
tar.extractall(td) | tar.extractall(td) | ||||
# Clone it with Dulwich | # Clone it with Dulwich | ||||
with tempfile.TemporaryDirectory(prefix="tmp-vault-clone-") as clone_dir: | with tempfile.TemporaryDirectory(prefix="tmp-vault-clone-") as clone_dir: | ||||
clone_dir = pathlib.Path(clone_dir) | clone_dir = pathlib.Path(clone_dir) | ||||
subprocess.check_call( | subprocess.check_call( | ||||
["git", "clone", os.path.join(td, f"{swhid}.git"), clone_dir,] | ["git", "clone", os.path.join(td, f"{swhid}.git"), clone_dir,] | ||||
) | ) | ||||
test_repo = TestRepo(clone_dir) | test_repo = TestRepo(clone_dir) | ||||
with test_repo: | with test_repo: | ||||
yield test_repo, clone_dir | yield test_repo, clone_dir | ||||
@contextlib.contextmanager | @contextlib.contextmanager | ||||
def cook_extract_revision_git_bare(storage, obj_id, fsck=True): | def cook_extract_revision_git_bare(storage, swhid, fsck=True): | ||||
with cook_extract_git_bare( | with cook_extract_git_bare(storage, swhid, fsck=fsck,) as res: | ||||
storage, | |||||
identifiers.CoreSWHID( | |||||
object_type=identifiers.ObjectType.REVISION, object_id=obj_id | |||||
), | |||||
fsck=fsck, | |||||
) as res: | |||||
yield res | yield res | ||||
@pytest.fixture( | @pytest.fixture( | ||||
scope="module", | scope="module", | ||||
params=[cook_extract_revision_gitfast, cook_extract_revision_git_bare], | params=[cook_extract_revision_gitfast, cook_extract_revision_git_bare], | ||||
) | ) | ||||
def cook_extract_revision(request): | def cook_extract_revision(request): | ||||
"""A fixture that is instantiated as either cook_extract_revision_gitfast or | """A fixture that is instantiated as either cook_extract_revision_gitfast or | ||||
cook_extract_revision_git_bare.""" | cook_extract_revision_git_bare.""" | ||||
return request.param | return request.param | ||||
@contextlib.contextmanager | @contextlib.contextmanager | ||||
def cook_extract_snapshot_git_bare(storage, obj_id, fsck=True): | def cook_extract_snapshot_git_bare(storage, swhid, fsck=True): | ||||
with cook_extract_git_bare( | with cook_extract_git_bare(storage, swhid, fsck=fsck,) as res: | ||||
storage, | |||||
identifiers.CoreSWHID( | |||||
object_type=identifiers.ObjectType.SNAPSHOT, object_id=obj_id | |||||
), | |||||
fsck=fsck, | |||||
) as res: | |||||
yield res | yield res | ||||
@pytest.fixture( | @pytest.fixture( | ||||
scope="module", params=[cook_extract_snapshot_git_bare], | scope="module", params=[cook_extract_snapshot_git_bare], | ||||
) | ) | ||||
def cook_extract_snapshot(request): | def cook_extract_snapshot(request): | ||||
"""Equivalent to cook_extract_snapshot_git_bare; but analogous to | """Equivalent to cook_extract_snapshot_git_bare; but analogous to | ||||
Show All 18 Lines | def test_directory_simple(self, git_loader, cook_extract_directory): | ||||
(rp / "dir1/dir2").mkdir(parents=True) | (rp / "dir1/dir2").mkdir(parents=True) | ||||
(rp / "dir1/dir2/file").write_text(TEST_CONTENT) | (rp / "dir1/dir2/file").write_text(TEST_CONTENT) | ||||
c = repo.commit() | c = repo.commit() | ||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
obj_id_hex = repo.repo[c].tree.decode() | obj_id_hex = repo.repo[c].tree.decode() | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj_id) | |||||
with cook_extract_directory(loader.storage, obj_id) as p: | with cook_extract_directory(loader.storage, swhid) as p: | ||||
assert (p / "file").stat().st_mode == 0o100644 | assert (p / "file").stat().st_mode == 0o100644 | ||||
assert (p / "file").read_text() == TEST_CONTENT | assert (p / "file").read_text() == TEST_CONTENT | ||||
assert (p / "executable").stat().st_mode == 0o100755 | assert (p / "executable").stat().st_mode == 0o100755 | ||||
assert (p / "executable").read_bytes() == TEST_EXECUTABLE | assert (p / "executable").read_bytes() == TEST_EXECUTABLE | ||||
assert (p / "link").is_symlink() | assert (p / "link").is_symlink() | ||||
assert os.readlink(str(p / "link")) == "file" | assert os.readlink(str(p / "link")) == "file" | ||||
assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 | assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 | ||||
assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT | assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT | ||||
Show All 13 Lines | def test_directory_filtered_objects(self, git_loader, cook_extract_directory): | ||||
(rp / "absent_file").write_bytes(file_3) | (rp / "absent_file").write_bytes(file_3) | ||||
c = repo.commit() | c = repo.commit() | ||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
obj_id_hex = repo.repo[c].tree.decode() | obj_id_hex = repo.repo[c].tree.decode() | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj_id) | |||||
# FIXME: storage.content_update() should be changed to allow things | # FIXME: storage.content_update() should be changed to allow things | ||||
# like that | # like that | ||||
with loader.storage.get_db().transaction() as cur: | with loader.storage.get_db().transaction() as cur: | ||||
cur.execute( | cur.execute( | ||||
"""update content set status = 'visible' | """update content set status = 'visible' | ||||
where sha1 = %s""", | where sha1 = %s""", | ||||
(id_1,), | (id_1,), | ||||
Show All 12 Lines | def test_directory_filtered_objects(self, git_loader, cook_extract_directory): | ||||
from content | from content | ||||
where sha1 = %s | where sha1 = %s | ||||
""", | """, | ||||
(id_3,), | (id_3,), | ||||
) | ) | ||||
cur.execute("delete from content where sha1 = %s", (id_3,)) | cur.execute("delete from content where sha1 = %s", (id_3,)) | ||||
with cook_extract_directory(loader.storage, obj_id) as p: | with cook_extract_directory(loader.storage, swhid) as p: | ||||
assert (p / "file").read_bytes() == b"test1" | assert (p / "file").read_bytes() == b"test1" | ||||
assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE | assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE | ||||
assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE | assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE | ||||
def test_directory_bogus_perms(self, git_loader, cook_extract_directory): | def test_directory_bogus_perms(self, git_loader, cook_extract_directory): | ||||
# Some early git repositories have 664/775 permissions... let's check | # Some early git repositories have 664/775 permissions... let's check | ||||
# if all the weird modes are properly normalized in the directory | # if all the weird modes are properly normalized in the directory | ||||
# cooker. | # cooker. | ||||
Show All 29 Lines | def test_directory_bogus_perms(self, git_loader, cook_extract_directory): | ||||
assert {entry["perms"] for entry in dir_entries} == { | assert {entry["perms"] for entry in dir_entries} == { | ||||
0o100664, | 0o100664, | ||||
0o100775, | 0o100775, | ||||
0o100604, | 0o100604, | ||||
} | } | ||||
obj_id_hex = repo.repo[c].tree.decode() | obj_id_hex = repo.repo[c].tree.decode() | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj_id) | |||||
with cook_extract_directory(loader.storage, obj_id) as p: | with cook_extract_directory(loader.storage, swhid) as p: | ||||
assert (p / "file").stat().st_mode == 0o100644 | assert (p / "file").stat().st_mode == 0o100644 | ||||
assert (p / "executable").stat().st_mode == 0o100755 | assert (p / "executable").stat().st_mode == 0o100755 | ||||
assert (p / "wat").stat().st_mode == 0o100644 | assert (p / "wat").stat().st_mode == 0o100644 | ||||
@pytest.mark.parametrize("direct_objstorage", [True, False]) | @pytest.mark.parametrize("direct_objstorage", [True, False]) | ||||
def test_directory_objstorage( | def test_directory_objstorage( | ||||
self, swh_storage, git_loader, mocker, direct_objstorage | self, swh_storage, git_loader, mocker, direct_objstorage | ||||
): | ): | ||||
"""Like test_directory_simple, but using swh_objstorage directly, without | """Like test_directory_simple, but using swh_objstorage directly, without | ||||
going through swh_storage.content_get_data()""" | going through swh_storage.content_get_data()""" | ||||
repo = TestRepo() | repo = TestRepo() | ||||
with repo as rp: | with repo as rp: | ||||
(rp / "file").write_text(TEST_CONTENT) | (rp / "file").write_text(TEST_CONTENT) | ||||
(rp / "executable").write_bytes(TEST_EXECUTABLE) | (rp / "executable").write_bytes(TEST_EXECUTABLE) | ||||
(rp / "executable").chmod(0o755) | (rp / "executable").chmod(0o755) | ||||
(rp / "link").symlink_to("file") | (rp / "link").symlink_to("file") | ||||
(rp / "dir1/dir2").mkdir(parents=True) | (rp / "dir1/dir2").mkdir(parents=True) | ||||
(rp / "dir1/dir2/file").write_text(TEST_CONTENT) | (rp / "dir1/dir2/file").write_text(TEST_CONTENT) | ||||
c = repo.commit() | c = repo.commit() | ||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
obj_id_hex = repo.repo[c].tree.decode() | obj_id_hex = repo.repo[c].tree.decode() | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj_id) | |||||
# Set-up spies | # Set-up spies | ||||
storage_content_get_data = mocker.patch.object( | storage_content_get_data = mocker.patch.object( | ||||
swh_storage, "content_get_data", wraps=swh_storage.content_get_data | swh_storage, "content_get_data", wraps=swh_storage.content_get_data | ||||
) | ) | ||||
objstorage_content_batch = mocker.patch.object( | objstorage_content_batch = mocker.patch.object( | ||||
swh_storage.objstorage, "get_batch", wraps=swh_storage.objstorage.get_batch | swh_storage.objstorage, "get_batch", wraps=swh_storage.objstorage.get_batch | ||||
) | ) | ||||
with cook_extract_directory_git_bare( | with cook_extract_directory_git_bare( | ||||
loader.storage, obj_id, direct_objstorage=direct_objstorage | loader.storage, swhid, direct_objstorage=direct_objstorage | ||||
) as p: | ) as p: | ||||
assert (p / "file").stat().st_mode == 0o100644 | assert (p / "file").stat().st_mode == 0o100644 | ||||
assert (p / "file").read_text() == TEST_CONTENT | assert (p / "file").read_text() == TEST_CONTENT | ||||
assert (p / "executable").stat().st_mode == 0o100755 | assert (p / "executable").stat().st_mode == 0o100755 | ||||
assert (p / "executable").read_bytes() == TEST_EXECUTABLE | assert (p / "executable").read_bytes() == TEST_EXECUTABLE | ||||
assert (p / "link").is_symlink() | assert (p / "link").is_symlink() | ||||
assert os.readlink(str(p / "link")) == "file" | assert os.readlink(str(p / "link")) == "file" | ||||
assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 | assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 | ||||
Show All 19 Lines | def test_directory_revision_data(self, swh_storage): | ||||
type="rev", | type="rev", | ||||
target=hashutil.hash_to_bytes(target_rev), | target=hashutil.hash_to_bytes(target_rev), | ||||
perms=0o100644, | perms=0o100644, | ||||
), | ), | ||||
), | ), | ||||
) | ) | ||||
swh_storage.directory_add([dir]) | swh_storage.directory_add([dir]) | ||||
with cook_extract_directory_dircooker(swh_storage, dir.id, fsck=False) as p: | with cook_extract_directory_dircooker( | ||||
swh_storage, dir.swhid(), fsck=False | |||||
) as p: | |||||
assert (p / "submodule").is_symlink() | assert (p / "submodule").is_symlink() | ||||
assert os.readlink(str(p / "submodule")) == target_rev | assert os.readlink(str(p / "submodule")) == target_rev | ||||
class RepoFixtures: | class RepoFixtures: | ||||
"""Shared loading and checking methods that can be reused by different types | """Shared loading and checking methods that can be reused by different types | ||||
of tests.""" | of tests.""" | ||||
def load_repo_simple(self, git_loader): | def load_repo_simple(self, git_loader): | ||||
# | # | ||||
# 1--2--3--4--5--6--7 | # 1--2--3--4--5--6--7 | ||||
# | # | ||||
repo = TestRepo() | repo = TestRepo() | ||||
with repo as rp: | with repo as rp: | ||||
(rp / "file1").write_text(TEST_CONTENT) | (rp / "file1").write_text(TEST_CONTENT) | ||||
repo.commit("add file1") | repo.commit("add file1") | ||||
(rp / "file2").write_text(TEST_CONTENT) | (rp / "file2").write_text(TEST_CONTENT) | ||||
repo.commit("add file2") | repo.commit("add file2") | ||||
(rp / "dir1/dir2").mkdir(parents=True) | (rp / "dir1/dir2").mkdir(parents=True) | ||||
(rp / "dir1/dir2/file").write_text(TEST_CONTENT) | (rp / "dir1/dir2/file").write_text(TEST_CONTENT) | ||||
repo.commit("add dir1/dir2/file") | |||||
(rp / "bin1").write_bytes(TEST_EXECUTABLE) | (rp / "bin1").write_bytes(TEST_EXECUTABLE) | ||||
(rp / "bin1").chmod(0o755) | (rp / "bin1").chmod(0o755) | ||||
repo.commit("add bin1") | repo.commit("add bin1") | ||||
(rp / "link1").symlink_to("file1") | (rp / "link1").symlink_to("file1") | ||||
repo.commit("link link1 to file1") | repo.commit("link link1 to file1") | ||||
(rp / "file2").unlink() | (rp / "file2").unlink() | ||||
repo.commit("remove file2") | repo.commit("remove file2") | ||||
(rp / "bin1").rename(rp / "bin") | (rp / "bin1").rename(rp / "bin") | ||||
repo.commit("rename bin1 to bin") | repo.commit("rename bin1 to bin") | ||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
obj_id_hex = repo.repo.refs[b"HEAD"].decode() | obj_id_hex = repo.repo.refs[b"HEAD"].decode() | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
return (loader, obj_id) | swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) | ||||
return (loader, swhid) | |||||
def check_revision_simple(self, ert, p, obj_id): | def check_revision_simple(self, ert, p, swhid): | ||||
ert.checkout(b"HEAD") | ert.checkout(b"HEAD") | ||||
assert (p / "file1").stat().st_mode == 0o100644 | assert (p / "file1").stat().st_mode == 0o100644 | ||||
assert (p / "file1").read_text() == TEST_CONTENT | assert (p / "file1").read_text() == TEST_CONTENT | ||||
assert (p / "link1").is_symlink() | assert (p / "link1").is_symlink() | ||||
assert os.readlink(str(p / "link1")) == "file1" | assert os.readlink(str(p / "link1")) == "file1" | ||||
assert (p / "bin").stat().st_mode == 0o100755 | assert (p / "bin").stat().st_mode == 0o100755 | ||||
assert (p / "bin").read_bytes() == TEST_EXECUTABLE | assert (p / "bin").read_bytes() == TEST_EXECUTABLE | ||||
assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT | assert (p / "dir1/dir2/file").read_text() == TEST_CONTENT | ||||
assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 | assert (p / "dir1/dir2/file").stat().st_mode == 0o100644 | ||||
assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() | assert ert.repo.refs[b"HEAD"].decode() == swhid.object_id.hex() | ||||
def load_repo_two_roots(self, git_loader): | def load_repo_two_roots(self, git_loader): | ||||
# | # | ||||
# 1----3---4 | # 1----3---4 | ||||
# / | # / | ||||
# 2---- | # 2---- | ||||
# | # | ||||
repo = TestRepo() | repo = TestRepo() | ||||
with repo as rp: | with repo as rp: | ||||
(rp / "file1").write_text(TEST_CONTENT) | (rp / "file1").write_text(TEST_CONTENT) | ||||
c1 = repo.commit("Add file1") | c1 = repo.commit("Add file1") | ||||
del repo.repo.refs[b"refs/heads/master"] # git update-ref -d HEAD | del repo.repo.refs[b"refs/heads/master"] # git update-ref -d HEAD | ||||
(rp / "file2").write_text(TEST_CONTENT) | (rp / "file2").write_text(TEST_CONTENT) | ||||
repo.commit("Add file2") | repo.commit("Add file2") | ||||
repo.merge([c1]) | repo.merge([c1]) | ||||
(rp / "file3").write_text(TEST_CONTENT) | (rp / "file3").write_text(TEST_CONTENT) | ||||
repo.commit("add file3") | repo.commit("add file3") | ||||
obj_id_hex = repo.repo.refs[b"HEAD"].decode() | obj_id_hex = repo.repo.refs[b"HEAD"].decode() | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) | |||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
return (loader, obj_id) | return (loader, swhid) | ||||
def check_revision_two_roots(self, ert, p, obj_id): | def check_revision_two_roots(self, ert, p, swhid): | ||||
assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() | assert ert.repo.refs[b"HEAD"].decode() == swhid.object_id.hex() | ||||
(c3,) = ert.repo[hashutil.hash_to_bytehex(obj_id)].parents | (c3,) = ert.repo[hashutil.hash_to_bytehex(swhid.object_id)].parents | ||||
assert len(ert.repo[c3].parents) == 2 | assert len(ert.repo[c3].parents) == 2 | ||||
def load_repo_two_heads(self, git_loader): | def load_repo_two_heads(self, git_loader): | ||||
# | # | ||||
# 1---2----4 <-- master and b1 | # 1---2----4 <-- master and b1 | ||||
# \ | # \ | ||||
# ----3 <-- b2 | # ----3 <-- b2 | ||||
# | # | ||||
Show All 11 Lines | def load_repo_two_heads(self, git_loader): | ||||
repo.commit("add file3", ref=b"refs/heads/b2") | repo.commit("add file3", ref=b"refs/heads/b2") | ||||
(rp / "file4").write_text(TEST_CONTENT) | (rp / "file4").write_text(TEST_CONTENT) | ||||
c4 = repo.commit("add file4", ref=b"refs/heads/master") | c4 = repo.commit("add file4", ref=b"refs/heads/master") | ||||
repo.repo.refs[b"refs/heads/b1"] = c4 # branch b1 from master | repo.repo.refs[b"refs/heads/b1"] = c4 # branch b1 from master | ||||
obj_id_hex = repo.repo.refs[b"HEAD"].decode() | obj_id_hex = repo.repo.refs[b"HEAD"].decode() | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) | |||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
return (loader, obj_id) | return (loader, swhid) | ||||
def check_snapshot_two_heads(self, ert, p, obj_id): | def check_snapshot_two_heads(self, ert, p, swhid): | ||||
assert ( | assert ( | ||||
hashutil.hash_to_bytehex(obj_id) | hashutil.hash_to_bytehex(swhid.object_id) | ||||
== ert.repo.refs[b"HEAD"] | == ert.repo.refs[b"HEAD"] | ||||
== ert.repo.refs[b"refs/heads/master"] | == ert.repo.refs[b"refs/heads/master"] | ||||
== ert.repo.refs[b"refs/remotes/origin/HEAD"] | == ert.repo.refs[b"refs/remotes/origin/HEAD"] | ||||
== ert.repo.refs[b"refs/remotes/origin/master"] | == ert.repo.refs[b"refs/remotes/origin/master"] | ||||
== ert.repo.refs[b"refs/remotes/origin/b1"] | == ert.repo.refs[b"refs/remotes/origin/b1"] | ||||
) | ) | ||||
c4_id = hashutil.hash_to_bytehex(obj_id) | c4_id = hashutil.hash_to_bytehex(swhid.object_id) | ||||
c3_id = ert.repo.refs[b"refs/remotes/origin/b2"] | c3_id = ert.repo.refs[b"refs/remotes/origin/b2"] | ||||
assert ert.repo[c3_id].parents == ert.repo[c4_id].parents | assert ert.repo[c3_id].parents == ert.repo[c4_id].parents | ||||
def load_repo_two_double_fork_merge(self, git_loader): | def load_repo_two_double_fork_merge(self, git_loader): | ||||
# | # | ||||
# 2---4---6 | # 2---4---6 | ||||
# / / / | # / / / | ||||
Show All 16 Lines | def load_repo_two_double_fork_merge(self, git_loader): | ||||
(rp / "file5").write_text(TEST_CONTENT) | (rp / "file5").write_text(TEST_CONTENT) | ||||
c5 = repo.commit("Add file3", ref=b"refs/heads/c3") # create commit 5 on c3 | c5 = repo.commit("Add file3", ref=b"refs/heads/c3") # create commit 5 on c3 | ||||
repo.merge([c5]) # create commit 6 | repo.merge([c5]) # create commit 6 | ||||
obj_id_hex = repo.repo.refs[b"HEAD"].decode() | obj_id_hex = repo.repo.refs[b"HEAD"].decode() | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) | |||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
return (loader, obj_id) | return (loader, swhid) | ||||
def check_revision_two_double_fork_merge(self, ert, p, obj_id): | def check_revision_two_double_fork_merge(self, ert, p, swhid): | ||||
assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() | assert ert.repo.refs[b"HEAD"].decode() == swhid.object_id.hex() | ||||
def check_snapshot_two_double_fork_merge(self, ert, p, obj_id): | def check_snapshot_two_double_fork_merge(self, ert, p, swhid): | ||||
assert ( | assert ( | ||||
hashutil.hash_to_bytehex(obj_id) | hashutil.hash_to_bytehex(swhid.object_id) | ||||
== ert.repo.refs[b"HEAD"] | == ert.repo.refs[b"HEAD"] | ||||
== ert.repo.refs[b"refs/heads/master"] | == ert.repo.refs[b"refs/heads/master"] | ||||
== ert.repo.refs[b"refs/remotes/origin/HEAD"] | == ert.repo.refs[b"refs/remotes/origin/HEAD"] | ||||
== ert.repo.refs[b"refs/remotes/origin/master"] | == ert.repo.refs[b"refs/remotes/origin/master"] | ||||
) | ) | ||||
(c4_id, c5_id) = ert.repo[obj_id.hex().encode()].parents | (c4_id, c5_id) = ert.repo[swhid.object_id.hex().encode()].parents | ||||
assert c5_id == ert.repo.refs[b"refs/remotes/origin/c3"] | assert c5_id == ert.repo.refs[b"refs/remotes/origin/c3"] | ||||
(c2_id, c3_id) = ert.repo[c4_id].parents | (c2_id, c3_id) = ert.repo[c4_id].parents | ||||
assert c3_id == ert.repo.refs[b"refs/remotes/origin/c1"] | assert c3_id == ert.repo.refs[b"refs/remotes/origin/c1"] | ||||
def load_repo_triple_merge(self, git_loader): | def load_repo_triple_merge(self, git_loader): | ||||
# | # | ||||
# .---.---5 | # .---.---5 | ||||
Show All 11 Lines | def load_repo_triple_merge(self, git_loader): | ||||
repo.commit("Commit 2") | repo.commit("Commit 2") | ||||
c3 = repo.commit("Commit 3", ref=b"refs/heads/b1") | c3 = repo.commit("Commit 3", ref=b"refs/heads/b1") | ||||
c4 = repo.commit("Commit 4", ref=b"refs/heads/b2") | c4 = repo.commit("Commit 4", ref=b"refs/heads/b2") | ||||
repo.merge([c3, c4]) | repo.merge([c3, c4]) | ||||
obj_id_hex = repo.repo.refs[b"HEAD"].decode() | obj_id_hex = repo.repo.refs[b"HEAD"].decode() | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) | |||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
return (loader, obj_id) | return (loader, swhid) | ||||
def check_revision_triple_merge(self, ert, p, obj_id): | def check_revision_triple_merge(self, ert, p, swhid): | ||||
assert ert.repo.refs[b"HEAD"].decode() == obj_id.hex() | assert ert.repo.refs[b"HEAD"].decode() == swhid.object_id.hex() | ||||
def check_snapshot_triple_merge(self, ert, p, obj_id): | def check_snapshot_triple_merge(self, ert, p, swhid): | ||||
assert ( | assert ( | ||||
hashutil.hash_to_bytehex(obj_id) | hashutil.hash_to_bytehex(swhid.object_id) | ||||
== ert.repo.refs[b"HEAD"] | == ert.repo.refs[b"HEAD"] | ||||
== ert.repo.refs[b"refs/heads/master"] | == ert.repo.refs[b"refs/heads/master"] | ||||
== ert.repo.refs[b"refs/remotes/origin/HEAD"] | == ert.repo.refs[b"refs/remotes/origin/HEAD"] | ||||
== ert.repo.refs[b"refs/remotes/origin/master"] | == ert.repo.refs[b"refs/remotes/origin/master"] | ||||
) | ) | ||||
(c2_id, c3_id, c4_id) = ert.repo[obj_id.hex().encode()].parents | (c2_id, c3_id, c4_id) = ert.repo[swhid.object_id.hex().encode()].parents | ||||
assert c3_id == ert.repo.refs[b"refs/remotes/origin/b1"] | assert c3_id == ert.repo.refs[b"refs/remotes/origin/b1"] | ||||
assert c4_id == ert.repo.refs[b"refs/remotes/origin/b2"] | assert c4_id == ert.repo.refs[b"refs/remotes/origin/b2"] | ||||
assert ( | assert ( | ||||
ert.repo[c2_id].parents | ert.repo[c2_id].parents | ||||
== ert.repo[c3_id].parents | == ert.repo[c3_id].parents | ||||
== ert.repo[c4_id].parents | == ert.repo[c4_id].parents | ||||
) | ) | ||||
def load_repo_filtered_objects(self, git_loader): | def load_repo_filtered_objects(self, git_loader): | ||||
repo = TestRepo() | repo = TestRepo() | ||||
with repo as rp: | with repo as rp: | ||||
file_1, id_1 = hash_content(b"test1") | file_1, id_1 = hash_content(b"test1") | ||||
file_2, id_2 = hash_content(b"test2") | file_2, id_2 = hash_content(b"test2") | ||||
file_3, id_3 = hash_content(b"test3") | file_3, id_3 = hash_content(b"test3") | ||||
(rp / "file").write_bytes(file_1) | (rp / "file").write_bytes(file_1) | ||||
(rp / "hidden_file").write_bytes(file_2) | (rp / "hidden_file").write_bytes(file_2) | ||||
(rp / "absent_file").write_bytes(file_3) | (rp / "absent_file").write_bytes(file_3) | ||||
repo.commit() | repo.commit() | ||||
obj_id_hex = repo.repo.refs[b"HEAD"].decode() | obj_id_hex = repo.repo.refs[b"HEAD"].decode() | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) | |||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
# FIXME: storage.content_update() should be changed to allow things | # FIXME: storage.content_update() should be changed to allow things | ||||
# like that | # like that | ||||
with loader.storage.get_db().transaction() as cur: | with loader.storage.get_db().transaction() as cur: | ||||
cur.execute( | cur.execute( | ||||
"""update content set status = 'visible' | """update content set status = 'visible' | ||||
Show All 13 Lines | def load_repo_filtered_objects(self, git_loader): | ||||
select sha1, sha1_git, sha256, blake2s256, length, 'no reason' | select sha1, sha1_git, sha256, blake2s256, length, 'no reason' | ||||
from content | from content | ||||
where sha1 = %s | where sha1 = %s | ||||
""", | """, | ||||
(id_3,), | (id_3,), | ||||
) | ) | ||||
cur.execute("delete from content where sha1 = %s", (id_3,)) | cur.execute("delete from content where sha1 = %s", (id_3,)) | ||||
return (loader, obj_id) | return (loader, swhid) | ||||
def check_revision_filtered_objects(self, ert, p, obj_id): | def check_revision_filtered_objects(self, ert, p, swhid): | ||||
ert.checkout(b"HEAD") | ert.checkout(b"HEAD") | ||||
assert (p / "file").read_bytes() == b"test1" | assert (p / "file").read_bytes() == b"test1" | ||||
assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE | assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE | ||||
assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE | assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE | ||||
def load_repo_null_fields(self, git_loader): | def load_repo_null_fields(self, git_loader): | ||||
# Our schema doesn't enforce a lot of non-null revision fields. We need | # Our schema doesn't enforce a lot of non-null revision fields. We need | ||||
# to check these cases don't break the cooker. | # to check these cases don't break the cooker. | ||||
Show All 17 Lines | def load_repo_null_fields(self, git_loader): | ||||
type=RevisionType.GIT, | type=RevisionType.GIT, | ||||
directory=dir_id, | directory=dir_id, | ||||
metadata={}, | metadata={}, | ||||
synthetic=True, | synthetic=True, | ||||
) | ) | ||||
storage = loader.storage | storage = loader.storage | ||||
storage.revision_add([test_revision]) | storage.revision_add([test_revision]) | ||||
return (loader, test_revision.id) | return (loader, test_revision.swhid()) | ||||
def check_revision_null_fields(self, ert, p, obj_id): | def check_revision_null_fields(self, ert, p, swhid): | ||||
ert.checkout(b"HEAD") | ert.checkout(b"HEAD") | ||||
assert (p / "file").stat().st_mode == 0o100644 | assert (p / "file").stat().st_mode == 0o100644 | ||||
def load_repo_tags(self, git_loader): | def load_repo_tags(self, git_loader): | ||||
# v-- t2 | # v-- t2 | ||||
# | # | ||||
# 1---2----5 <-- master, t5, and t5a (annotated) | # 1---2----5 <-- master, t5, and t5a (annotated) | ||||
# \ | # \ | ||||
Show All 23 Lines | def load_repo_tags(self, git_loader): | ||||
(rp / "file5").write_text(TEST_CONTENT) | (rp / "file5").write_text(TEST_CONTENT) | ||||
repo.commit("add file5") # create c5 | repo.commit("add file5") # create c5 | ||||
repo.tag(b"t5") | repo.tag(b"t5") | ||||
repo.tag(b"t5a", message=b"tag 5") | repo.tag(b"t5a", message=b"tag 5") | ||||
obj_id_hex = repo.repo.refs[b"HEAD"].decode() | obj_id_hex = repo.repo.refs[b"HEAD"].decode() | ||||
obj_id = hashutil.hash_to_bytes(obj_id_hex) | obj_id = hashutil.hash_to_bytes(obj_id_hex) | ||||
swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=obj_id) | |||||
loader = git_loader(str(rp)) | loader = git_loader(str(rp)) | ||||
loader.load() | loader.load() | ||||
return (loader, obj_id) | return (loader, swhid) | ||||
def check_snapshot_tags(self, ert, p, obj_id): | def check_snapshot_tags(self, ert, p, swhid): | ||||
assert ( | assert ( | ||||
hashutil.hash_to_bytehex(obj_id) | hashutil.hash_to_bytehex(swhid.object_id) | ||||
== ert.repo.refs[b"HEAD"] | == ert.repo.refs[b"HEAD"] | ||||
== ert.repo.refs[b"refs/heads/master"] | == ert.repo.refs[b"refs/heads/master"] | ||||
== ert.repo.refs[b"refs/remotes/origin/HEAD"] | == ert.repo.refs[b"refs/remotes/origin/HEAD"] | ||||
== ert.repo.refs[b"refs/remotes/origin/master"] | == ert.repo.refs[b"refs/remotes/origin/master"] | ||||
== ert.repo.refs[b"refs/tags/t5"] | == ert.repo.refs[b"refs/tags/t5"] | ||||
) | ) | ||||
c2_id = ert.repo.refs[b"refs/tags/t2"] | c2_id = ert.repo.refs[b"refs/tags/t2"] | ||||
c5_id = hashutil.hash_to_bytehex(obj_id) | c5_id = hashutil.hash_to_bytehex(swhid.object_id) | ||||
assert ert.repo[c5_id].parents == [c2_id] | assert ert.repo[c5_id].parents == [c2_id] | ||||
t5a = ert.repo[ert.repo.refs[b"refs/tags/t5a"]] | t5a = ert.repo[ert.repo.refs[b"refs/tags/t5a"]] | ||||
assert t5a.message == b"tag 5\n" # TODO: investigate why new dulwich adds \n | assert t5a.message == b"tag 5\n" # TODO: investigate why new dulwich adds \n | ||||
assert t5a.object == (dulwich.objects.Commit, c5_id) | assert t5a.object == (dulwich.objects.Commit, c5_id) | ||||
t4a = ert.repo[ert.repo.refs[b"refs/tags/t4a"]] | t4a = ert.repo[ert.repo.refs[b"refs/tags/t4a"]] | ||||
(_, c4_id) = t4a.object | (_, c4_id) = t4a.object | ||||
assert ert.repo[c4_id].message == b"add file4\n" # TODO: ditto | assert ert.repo[c4_id].message == b"add file4\n" # TODO: ditto | ||||
(c3_id,) = ert.repo[c4_id].parents | (c3_id,) = ert.repo[c4_id].parents | ||||
assert ert.repo[c3_id].message == b"add file3\n" # TODO: ditto | assert ert.repo[c3_id].message == b"add file3\n" # TODO: ditto | ||||
assert ert.repo[c3_id].parents == [c2_id] | assert ert.repo[c3_id].parents == [c2_id] | ||||
class TestRevisionCooker(RepoFixtures): | class TestRevisionCooker(RepoFixtures): | ||||
def test_revision_simple(self, git_loader, cook_extract_revision): | def test_revision_simple(self, git_loader, cook_extract_revision): | ||||
(loader, obj_id) = self.load_repo_simple(git_loader) | (loader, swhid) = self.load_repo_simple(git_loader) | ||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | with cook_extract_revision(loader.storage, swhid) as (ert, p): | ||||
self.check_revision_simple(ert, p, obj_id) | self.check_revision_simple(ert, p, swhid) | ||||
def test_revision_two_roots(self, git_loader, cook_extract_revision): | def test_revision_two_roots(self, git_loader, cook_extract_revision): | ||||
(loader, obj_id) = self.load_repo_two_roots(git_loader) | (loader, swhid) = self.load_repo_two_roots(git_loader) | ||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | with cook_extract_revision(loader.storage, swhid) as (ert, p): | ||||
self.check_revision_two_roots(ert, p, obj_id) | self.check_revision_two_roots(ert, p, swhid) | ||||
def test_revision_two_double_fork_merge(self, git_loader, cook_extract_revision): | def test_revision_two_double_fork_merge(self, git_loader, cook_extract_revision): | ||||
(loader, obj_id) = self.load_repo_two_double_fork_merge(git_loader) | (loader, swhid) = self.load_repo_two_double_fork_merge(git_loader) | ||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | with cook_extract_revision(loader.storage, swhid) as (ert, p): | ||||
self.check_revision_two_double_fork_merge(ert, p, obj_id) | self.check_revision_two_double_fork_merge(ert, p, swhid) | ||||
def test_revision_triple_merge(self, git_loader, cook_extract_revision): | def test_revision_triple_merge(self, git_loader, cook_extract_revision): | ||||
(loader, obj_id) = self.load_repo_triple_merge(git_loader) | (loader, swhid) = self.load_repo_triple_merge(git_loader) | ||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | with cook_extract_revision(loader.storage, swhid) as (ert, p): | ||||
self.check_revision_triple_merge(ert, p, obj_id) | self.check_revision_triple_merge(ert, p, swhid) | ||||
def test_revision_filtered_objects(self, git_loader, cook_extract_revision): | def test_revision_filtered_objects(self, git_loader, cook_extract_revision): | ||||
(loader, obj_id) = self.load_repo_filtered_objects(git_loader) | (loader, swhid) = self.load_repo_filtered_objects(git_loader) | ||||
with cook_extract_revision(loader.storage, obj_id) as (ert, p): | with cook_extract_revision(loader.storage, swhid) as (ert, p): | ||||
self.check_revision_filtered_objects(ert, p, obj_id) | self.check_revision_filtered_objects(ert, p, swhid) | ||||
def test_revision_null_fields(self, git_loader, cook_extract_revision): | def test_revision_null_fields(self, git_loader, cook_extract_revision): | ||||
(loader, obj_id) = self.load_repo_null_fields(git_loader) | (loader, swhid) = self.load_repo_null_fields(git_loader) | ||||
with cook_extract_revision(loader.storage, obj_id, fsck=False) as (ert, p): | with cook_extract_revision(loader.storage, swhid, fsck=False) as (ert, p): | ||||
self.check_revision_null_fields(ert, p, obj_id) | self.check_revision_null_fields(ert, p, swhid) | ||||
def test_revision_revision_data(self, swh_storage): | def test_revision_revision_data(self, swh_storage): | ||||
target_rev = "0e8a3ad980ec179856012b7eecf4327e99cd44cd" | target_rev = "0e8a3ad980ec179856012b7eecf4327e99cd44cd" | ||||
dir = Directory( | dir = Directory( | ||||
entries=( | entries=( | ||||
DirectoryEntry( | DirectoryEntry( | ||||
name=b"submodule", | name=b"submodule", | ||||
Show All 14 Lines | def test_revision_revision_data(self, swh_storage): | ||||
parents=(), | parents=(), | ||||
type=RevisionType.GIT, | type=RevisionType.GIT, | ||||
directory=dir.id, | directory=dir.id, | ||||
metadata={}, | metadata={}, | ||||
synthetic=True, | synthetic=True, | ||||
) | ) | ||||
swh_storage.revision_add([rev]) | swh_storage.revision_add([rev]) | ||||
with cook_stream_revision_gitfast(swh_storage, rev.id) as stream: | with cook_stream_revision_gitfast(swh_storage, rev.swhid()) as stream: | ||||
pattern = "M 160000 {} submodule".format(target_rev).encode() | pattern = "M 160000 {} submodule".format(target_rev).encode() | ||||
assert pattern in stream.read() | assert pattern in stream.read() | ||||
class TestSnapshotCooker(RepoFixtures): | class TestSnapshotCooker(RepoFixtures): | ||||
def test_snapshot_simple(self, git_loader, cook_extract_snapshot): | def test_snapshot_simple(self, git_loader, cook_extract_snapshot): | ||||
(loader, main_rev_id) = self.load_repo_simple(git_loader) | (loader, main_rev_id) = self.load_repo_simple(git_loader) | ||||
snp_id = loader.loaded_snapshot_id | snp_id = loader.loaded_snapshot_id | ||||
with cook_extract_snapshot(loader.storage, snp_id) as (ert, p): | swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) | ||||
with cook_extract_snapshot(loader.storage, swhid) as (ert, p): | |||||
self.check_revision_simple(ert, p, main_rev_id) | self.check_revision_simple(ert, p, main_rev_id) | ||||
def test_snapshot_two_roots(self, git_loader, cook_extract_snapshot): | def test_snapshot_two_roots(self, git_loader, cook_extract_snapshot): | ||||
(loader, main_rev_id) = self.load_repo_two_roots(git_loader) | (loader, main_rev_id) = self.load_repo_two_roots(git_loader) | ||||
snp_id = loader.loaded_snapshot_id | snp_id = loader.loaded_snapshot_id | ||||
with cook_extract_snapshot(loader.storage, snp_id) as (ert, p): | swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) | ||||
with cook_extract_snapshot(loader.storage, swhid) as (ert, p): | |||||
self.check_revision_two_roots(ert, p, main_rev_id) | self.check_revision_two_roots(ert, p, main_rev_id) | ||||
def test_snapshot_two_heads(self, git_loader, cook_extract_snapshot): | def test_snapshot_two_heads(self, git_loader, cook_extract_snapshot): | ||||
(loader, main_rev_id) = self.load_repo_two_heads(git_loader) | (loader, main_rev_id) = self.load_repo_two_heads(git_loader) | ||||
snp_id = loader.loaded_snapshot_id | snp_id = loader.loaded_snapshot_id | ||||
with cook_extract_snapshot(loader.storage, snp_id) as (ert, p): | swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) | ||||
with cook_extract_snapshot(loader.storage, swhid) as (ert, p): | |||||
self.check_snapshot_two_heads(ert, p, main_rev_id) | self.check_snapshot_two_heads(ert, p, main_rev_id) | ||||
def test_snapshot_two_double_fork_merge(self, git_loader, cook_extract_snapshot): | def test_snapshot_two_double_fork_merge(self, git_loader, cook_extract_snapshot): | ||||
(loader, main_rev_id) = self.load_repo_two_double_fork_merge(git_loader) | (loader, main_rev_id) = self.load_repo_two_double_fork_merge(git_loader) | ||||
snp_id = loader.loaded_snapshot_id | snp_id = loader.loaded_snapshot_id | ||||
with cook_extract_snapshot(loader.storage, snp_id) as (ert, p): | swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) | ||||
with cook_extract_snapshot(loader.storage, swhid) as (ert, p): | |||||
self.check_revision_two_double_fork_merge(ert, p, main_rev_id) | self.check_revision_two_double_fork_merge(ert, p, main_rev_id) | ||||
self.check_snapshot_two_double_fork_merge(ert, p, main_rev_id) | self.check_snapshot_two_double_fork_merge(ert, p, main_rev_id) | ||||
def test_snapshot_triple_merge(self, git_loader, cook_extract_snapshot): | def test_snapshot_triple_merge(self, git_loader, cook_extract_snapshot): | ||||
(loader, main_rev_id) = self.load_repo_triple_merge(git_loader) | (loader, main_rev_id) = self.load_repo_triple_merge(git_loader) | ||||
snp_id = loader.loaded_snapshot_id | snp_id = loader.loaded_snapshot_id | ||||
with cook_extract_snapshot(loader.storage, snp_id) as (ert, p): | swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) | ||||
with cook_extract_snapshot(loader.storage, swhid) as (ert, p): | |||||
self.check_revision_triple_merge(ert, p, main_rev_id) | self.check_revision_triple_merge(ert, p, main_rev_id) | ||||
self.check_snapshot_triple_merge(ert, p, main_rev_id) | self.check_snapshot_triple_merge(ert, p, main_rev_id) | ||||
def test_snapshot_filtered_objects(self, git_loader, cook_extract_snapshot): | def test_snapshot_filtered_objects(self, git_loader, cook_extract_snapshot): | ||||
(loader, main_rev_id) = self.load_repo_filtered_objects(git_loader) | (loader, main_rev_id) = self.load_repo_filtered_objects(git_loader) | ||||
snp_id = loader.loaded_snapshot_id | snp_id = loader.loaded_snapshot_id | ||||
with cook_extract_snapshot(loader.storage, snp_id) as (ert, p): | swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) | ||||
with cook_extract_snapshot(loader.storage, swhid) as (ert, p): | |||||
self.check_revision_filtered_objects(ert, p, main_rev_id) | self.check_revision_filtered_objects(ert, p, main_rev_id) | ||||
def test_snapshot_tags(self, git_loader, cook_extract_snapshot): | def test_snapshot_tags(self, git_loader, cook_extract_snapshot): | ||||
(loader, main_rev_id) = self.load_repo_tags(git_loader) | (loader, main_rev_id) = self.load_repo_tags(git_loader) | ||||
snp_id = loader.loaded_snapshot_id | snp_id = loader.loaded_snapshot_id | ||||
with cook_extract_snapshot(loader.storage, snp_id) as (ert, p): | swhid = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=snp_id) | ||||
with cook_extract_snapshot(loader.storage, swhid) as (ert, p): | |||||
self.check_snapshot_tags(ert, p, main_rev_id) | self.check_snapshot_tags(ert, p, main_rev_id) |