Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/mercurial/tests/test_loader.py
- This file was moved from swh/loader/mercurial/tests/test_from_disk.py.
Show All 20 Lines | |||||
) | ) | ||||
from swh.model.from_disk import Content, DentryPerms | from swh.model.from_disk import Content, DentryPerms | ||||
from swh.model.hashutil import hash_to_bytes, hash_to_hex | from swh.model.hashutil import hash_to_bytes, hash_to_hex | ||||
from swh.model.identifiers import ObjectType | from swh.model.identifiers import ObjectType | ||||
from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType | from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.algos.snapshot import snapshot_get_latest | from swh.storage.algos.snapshot import snapshot_get_latest | ||||
from ..from_disk import EXTID_VERSION, HgDirectory, HgLoaderFromDisk | from ..loader import EXTID_VERSION, HgDirectory, HgLoader | ||||
from .loader_checker import ExpectedSwhids, LoaderChecker | from .loader_checker import ExpectedSwhids, LoaderChecker | ||||
VISIT_DATE = parse_visit_date("2016-05-03 15:16:32+00") | VISIT_DATE = parse_visit_date("2016-05-03 15:16:32+00") | ||||
assert VISIT_DATE is not None | assert VISIT_DATE is not None | ||||
def random_content() -> Content: | def random_content() -> Content: | ||||
"""Create minimal content object.""" | """Create minimal content object.""" | ||||
▲ Show 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | @pytest.mark.parametrize( | ||||
"archive_name", ("hello", "transplant", "the-sandbox", "example") | "archive_name", ("hello", "transplant", "the-sandbox", "example") | ||||
) | ) | ||||
def test_examples(swh_storage, datadir, tmp_path, archive_name): | def test_examples(swh_storage, datadir, tmp_path, archive_name): | ||||
archive_path = Path(datadir, f"{archive_name}.tgz") | archive_path = Path(datadir, f"{archive_name}.tgz") | ||||
json_path = Path(datadir, f"{archive_name}.json") | json_path = Path(datadir, f"{archive_name}.json") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
LoaderChecker( | LoaderChecker( | ||||
loader=HgLoaderFromDisk(swh_storage, repo_url), | loader=HgLoader(swh_storage, repo_url), expected=ExpectedSwhids.load(json_path), | ||||
expected=ExpectedSwhids.load(json_path), | |||||
).check() | ).check() | ||||
# This test has as been adapted from the historical `HgBundle20Loader` tests | # This test has as been adapted from the historical `HgBundle20Loader` tests | ||||
# to ensure compatibility of `HgLoaderFromDisk`. | # to ensure compatibility of `HgLoader`. | ||||
# Hashes as been produced by copy pasting the result of the implementation | # Hashes as been produced by copy pasting the result of the implementation | ||||
# to prevent regressions. | # to prevent regressions. | ||||
def test_loader_hg_new_visit_no_release(swh_storage, datadir, tmp_path): | def test_loader_hg_new_visit_no_release(swh_storage, datadir, tmp_path): | ||||
"""Eventful visit should yield 1 snapshot""" | """Eventful visit should yield 1 snapshot""" | ||||
archive_name = "the-sandbox" | archive_name = "the-sandbox" | ||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | archive_path = os.path.join(datadir, f"{archive_name}.tgz") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
loader = HgLoaderFromDisk(swh_storage, url=repo_url) | loader = HgLoader(swh_storage, url=repo_url) | ||||
assert loader.load() == {"status": "eventful"} | assert loader.load() == {"status": "eventful"} | ||||
tips = { | tips = { | ||||
b"branch-tip/default": "70e750bb046101fdced06f428e73fee471509c56", | b"branch-tip/default": "70e750bb046101fdced06f428e73fee471509c56", | ||||
b"branch-tip/develop": "a9c4534552df370f43f0ef97146f393ef2f2a08c", | b"branch-tip/develop": "a9c4534552df370f43f0ef97146f393ef2f2a08c", | ||||
} | } | ||||
closed = { | closed = { | ||||
▲ Show 20 Lines • Show All 49 Lines • ▼ Show 20 Lines | expected_stats = { | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 1, | "origin_visit": 1, | ||||
"release": 0, | "release": 0, | ||||
"revision": 58, | "revision": 58, | ||||
"skipped_content": 0, | "skipped_content": 0, | ||||
"snapshot": 1, | "snapshot": 1, | ||||
} | } | ||||
assert stats == expected_stats | assert stats == expected_stats | ||||
loader2 = HgLoaderFromDisk(swh_storage, url=repo_url) | loader2 = HgLoader(swh_storage, url=repo_url) | ||||
assert loader2.load() == {"status": "uneventful"} # nothing new happened | assert loader2.load() == {"status": "uneventful"} # nothing new happened | ||||
stats2 = get_stats(loader2.storage) | stats2 = get_stats(loader2.storage) | ||||
expected_stats2 = expected_stats.copy() | expected_stats2 = expected_stats.copy() | ||||
expected_stats2["origin_visit"] = 2 # one new visit recorded | expected_stats2["origin_visit"] = 2 # one new visit recorded | ||||
assert stats2 == expected_stats2 | assert stats2 == expected_stats2 | ||||
assert_last_visit_matches( | assert_last_visit_matches( | ||||
loader2.storage, | loader2.storage, | ||||
repo_url, | repo_url, | ||||
status="full", | status="full", | ||||
type="hg", | type="hg", | ||||
snapshot=expected_snapshot.id, | snapshot=expected_snapshot.id, | ||||
) # but we got a snapshot nonetheless | ) # but we got a snapshot nonetheless | ||||
# This test has as been adapted from the historical `HgBundle20Loader` tests | # This test has as been adapted from the historical `HgBundle20Loader` tests | ||||
# to ensure compatibility of `HgLoaderFromDisk`. | # to ensure compatibility of `HgLoader`. | ||||
# Hashes as been produced by copy pasting the result of the implementation | # Hashes as been produced by copy pasting the result of the implementation | ||||
# to prevent regressions. | # to prevent regressions. | ||||
def test_loader_hg_new_visit_with_release(swh_storage, datadir, tmp_path): | def test_loader_hg_new_visit_with_release(swh_storage, datadir, tmp_path): | ||||
"""Eventful visit with release should yield 1 snapshot""" | """Eventful visit with release should yield 1 snapshot""" | ||||
archive_name = "hello" | archive_name = "hello" | ||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | archive_path = os.path.join(datadir, f"{archive_name}.tgz") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
loader = HgLoaderFromDisk(swh_storage, url=repo_url, visit_date=VISIT_DATE,) | loader = HgLoader(swh_storage, url=repo_url, visit_date=VISIT_DATE,) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status == {"status": "eventful"} | assert actual_load_status == {"status": "eventful"} | ||||
# then | # then | ||||
stats = get_stats(loader.storage) | stats = get_stats(loader.storage) | ||||
assert stats == { | assert stats == { | ||||
"content": 3, | "content": 3, | ||||
Show All 36 Lines | assert_last_visit_matches( | ||||
repo_url, | repo_url, | ||||
type=RevisionType.MERCURIAL.value, | type=RevisionType.MERCURIAL.value, | ||||
status="full", | status="full", | ||||
snapshot=expected_snapshot.id, | snapshot=expected_snapshot.id, | ||||
) | ) | ||||
# This test has as been adapted from the historical `HgBundle20Loader` tests | # This test has as been adapted from the historical `HgBundle20Loader` tests | ||||
# to ensure compatibility of `HgLoaderFromDisk`. | # to ensure compatibility of `HgLoader`. | ||||
# Hashes as been produced by copy pasting the result of the implementation | # Hashes as been produced by copy pasting the result of the implementation | ||||
# to prevent regressions. | # to prevent regressions. | ||||
def test_visit_repository_with_transplant_operations(swh_storage, datadir, tmp_path): | def test_visit_repository_with_transplant_operations(swh_storage, datadir, tmp_path): | ||||
"""Visit a mercurial repository visit transplant operations within should yield a | """Visit a mercurial repository visit transplant operations within should yield a | ||||
snapshot as well. | snapshot as well. | ||||
""" | """ | ||||
archive_name = "transplant" | archive_name = "transplant" | ||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | archive_path = os.path.join(datadir, f"{archive_name}.tgz") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
loader = HgLoaderFromDisk(swh_storage, url=repo_url, visit_date=VISIT_DATE,) | loader = HgLoader(swh_storage, url=repo_url, visit_date=VISIT_DATE,) | ||||
# load hg repository | # load hg repository | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status == {"status": "eventful"} | assert actual_load_status == {"status": "eventful"} | ||||
# collect swh revisions | # collect swh revisions | ||||
assert_last_visit_matches( | assert_last_visit_matches( | ||||
loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full" | loader.storage, repo_url, type=RevisionType.MERCURIAL.value, status="full" | ||||
▲ Show 20 Lines • Show All 66 Lines • ▼ Show 20 Lines | def test_load_unchanged_repo_should_be_uneventful( | ||||
swh_storage, datadir, tmp_path, | swh_storage, datadir, tmp_path, | ||||
): | ): | ||||
"""Checks the loader can find which revisions it already loaded, using ExtIDs.""" | """Checks the loader can find which revisions it already loaded, using ExtIDs.""" | ||||
archive_name = "hello" | archive_name = "hello" | ||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | archive_path = os.path.join(datadir, f"{archive_name}.tgz") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
repo_path = repo_url.replace("file://", "") | repo_path = repo_url.replace("file://", "") | ||||
loader = HgLoaderFromDisk(swh_storage, repo_path) | loader = HgLoader(swh_storage, repo_path) | ||||
assert loader.load() == {"status": "eventful"} | assert loader.load() == {"status": "eventful"} | ||||
assert get_stats(loader.storage) == { | assert get_stats(loader.storage) == { | ||||
"content": 3, | "content": 3, | ||||
"directory": 3, | "directory": 3, | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 1, | "origin_visit": 1, | ||||
"release": 1, | "release": 1, | ||||
"revision": 3, | "revision": 3, | ||||
"skipped_content": 0, | "skipped_content": 0, | ||||
"snapshot": 1, | "snapshot": 1, | ||||
} | } | ||||
visit_status = assert_last_visit_matches( | visit_status = assert_last_visit_matches( | ||||
loader.storage, repo_path, type=RevisionType.MERCURIAL.value, status="full", | loader.storage, repo_path, type=RevisionType.MERCURIAL.value, status="full", | ||||
) | ) | ||||
assert visit_status.snapshot is not None | assert visit_status.snapshot is not None | ||||
# Create a new loader (to start with a clean slate, eg. remove the caches), | # Create a new loader (to start with a clean slate, eg. remove the caches), | ||||
# with the new, partial, storage | # with the new, partial, storage | ||||
loader2 = HgLoaderFromDisk(swh_storage, repo_path) | loader2 = HgLoader(swh_storage, repo_path) | ||||
assert loader2.load() == {"status": "uneventful"} | assert loader2.load() == {"status": "uneventful"} | ||||
# Should have all the objects | # Should have all the objects | ||||
assert get_stats(loader.storage) == { | assert get_stats(loader.storage) == { | ||||
"content": 3, | "content": 3, | ||||
"directory": 3, | "directory": 3, | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 2, | "origin_visit": 2, | ||||
Show All 10 Lines | |||||
def test_closed_branch_incremental(swh_storage, datadir, tmp_path): | def test_closed_branch_incremental(swh_storage, datadir, tmp_path): | ||||
"""Test that a repository with a closed branch does not trip an incremental load""" | """Test that a repository with a closed branch does not trip an incremental load""" | ||||
archive_name = "example" | archive_name = "example" | ||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | archive_path = os.path.join(datadir, f"{archive_name}.tgz") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
repo_path = repo_url.replace("file://", "") | repo_path = repo_url.replace("file://", "") | ||||
loader = HgLoaderFromDisk(swh_storage, repo_path) | loader = HgLoader(swh_storage, repo_path) | ||||
# Test 3 loads: full, and two incremental. | # Test 3 loads: full, and two incremental. | ||||
assert loader.load() == {"status": "eventful"} | assert loader.load() == {"status": "eventful"} | ||||
expected_stats = { | expected_stats = { | ||||
"content": 7, | "content": 7, | ||||
"directory": 16, | "directory": 16, | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 1, | "origin_visit": 1, | ||||
Show All 12 Lines | |||||
def test_load_unchanged_repo__dangling_extid(swh_storage, datadir, tmp_path): | def test_load_unchanged_repo__dangling_extid(swh_storage, datadir, tmp_path): | ||||
"""Checks the loader will load revisions targeted by an ExtID if the | """Checks the loader will load revisions targeted by an ExtID if the | ||||
revisions are missing from the storage""" | revisions are missing from the storage""" | ||||
archive_name = "hello" | archive_name = "hello" | ||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | archive_path = os.path.join(datadir, f"{archive_name}.tgz") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
repo_path = repo_url.replace("file://", "") | repo_path = repo_url.replace("file://", "") | ||||
loader = HgLoaderFromDisk(swh_storage, repo_path) | loader = HgLoader(swh_storage, repo_path) | ||||
assert loader.load() == {"status": "eventful"} | assert loader.load() == {"status": "eventful"} | ||||
assert get_stats(loader.storage) == { | assert get_stats(loader.storage) == { | ||||
"content": 3, | "content": 3, | ||||
"directory": 3, | "directory": 3, | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 1, | "origin_visit": 1, | ||||
"release": 1, | "release": 1, | ||||
"revision": 3, | "revision": 3, | ||||
"skipped_content": 0, | "skipped_content": 0, | ||||
"snapshot": 1, | "snapshot": 1, | ||||
} | } | ||||
old_storage = swh_storage | old_storage = swh_storage | ||||
# Create a new storage, and only copy ExtIDs or head revisions to it. | # Create a new storage, and only copy ExtIDs or head revisions to it. | ||||
# This should be enough for the loader to know revisions were already loaded | # This should be enough for the loader to know revisions were already loaded | ||||
new_storage = _partial_copy_storage( | new_storage = _partial_copy_storage( | ||||
old_storage, repo_path, mechanism="extid", copy_revisions=False | old_storage, repo_path, mechanism="extid", copy_revisions=False | ||||
) | ) | ||||
# Create a new loader (to start with a clean slate, eg. remove the caches), | # Create a new loader (to start with a clean slate, eg. remove the caches), | ||||
# with the new, partial, storage | # with the new, partial, storage | ||||
loader = HgLoaderFromDisk(new_storage, repo_path) | loader = HgLoader(new_storage, repo_path) | ||||
assert get_stats(loader.storage) == { | assert get_stats(loader.storage) == { | ||||
"content": 0, | "content": 0, | ||||
"directory": 0, | "directory": 0, | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 1, | "origin_visit": 1, | ||||
"release": 0, | "release": 0, | ||||
"revision": 0, | "revision": 0, | ||||
Show All 16 Lines | |||||
def test_missing_filelog_should_not_crash(swh_storage, datadir, tmp_path): | def test_missing_filelog_should_not_crash(swh_storage, datadir, tmp_path): | ||||
archive_name = "missing-filelog" | archive_name = "missing-filelog" | ||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | archive_path = os.path.join(datadir, f"{archive_name}.tgz") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
directory = repo_url.replace("file://", "") | directory = repo_url.replace("file://", "") | ||||
loader = HgLoaderFromDisk( | loader = HgLoader( | ||||
storage=swh_storage, | storage=swh_storage, | ||||
url=repo_url, | url=repo_url, | ||||
directory=directory, # specify directory to avoid clone | directory=directory, # specify directory to avoid clone | ||||
visit_date=VISIT_DATE, | visit_date=VISIT_DATE, | ||||
) | ) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status == {"status": "eventful"} | assert actual_load_status == {"status": "eventful"} | ||||
assert_last_visit_matches(swh_storage, repo_url, status="partial", type="hg") | assert_last_visit_matches(swh_storage, repo_url, status="partial", type="hg") | ||||
def test_multiple_open_heads(swh_storage, datadir, tmp_path): | def test_multiple_open_heads(swh_storage, datadir, tmp_path): | ||||
archive_name = "multiple-heads" | archive_name = "multiple-heads" | ||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | archive_path = os.path.join(datadir, f"{archive_name}.tgz") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
loader = HgLoaderFromDisk(storage=swh_storage, url=repo_url,) | loader = HgLoader(storage=swh_storage, url=repo_url,) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status == {"status": "eventful"} | assert actual_load_status == {"status": "eventful"} | ||||
assert_last_visit_matches(swh_storage, repo_url, status="full", type="hg") | assert_last_visit_matches(swh_storage, repo_url, status="full", type="hg") | ||||
snapshot = snapshot_get_latest(swh_storage, repo_url) | snapshot = snapshot_get_latest(swh_storage, repo_url) | ||||
expected_branches = [ | expected_branches = [ | ||||
b"HEAD", | b"HEAD", | ||||
b"branch-heads/default/0", | b"branch-heads/default/0", | ||||
b"branch-heads/default/1", | b"branch-heads/default/1", | ||||
b"branch-tip/default", | b"branch-tip/default", | ||||
] | ] | ||||
assert sorted(snapshot.branches.keys()) == expected_branches | assert sorted(snapshot.branches.keys()) == expected_branches | ||||
# Check that we don't load anything the second time | # Check that we don't load anything the second time | ||||
loader = HgLoaderFromDisk(storage=swh_storage, url=repo_url,) | loader = HgLoader(storage=swh_storage, url=repo_url,) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status == {"status": "uneventful"} | assert actual_load_status == {"status": "uneventful"} | ||||
def hg_strip(repo: str, revset: str) -> None: | def hg_strip(repo: str, revset: str) -> None: | ||||
"""Removes `revset` and all of their descendants from the local repository.""" | """Removes `revset` and all of their descendants from the local repository.""" | ||||
# Previously called `hg strip`, it was renamed to `hg debugstrip` in Mercurial 5.7 | # Previously called `hg strip`, it was renamed to `hg debugstrip` in Mercurial 5.7 | ||||
# because it's most likely not what most users want to do (they should use some kind | # because it's most likely not what most users want to do (they should use some kind | ||||
# of history-rewriting tool like `histedit` or `prune`). | # of history-rewriting tool like `histedit` or `prune`). | ||||
# But here, it's exactly what we want to do. | # But here, it's exactly what we want to do. | ||||
subprocess.check_call(["hg", "debugstrip", revset], cwd=repo) | subprocess.check_call(["hg", "debugstrip", revset], cwd=repo) | ||||
def test_load_repo_with_new_commits(swh_storage, datadir, tmp_path): | def test_load_repo_with_new_commits(swh_storage, datadir, tmp_path): | ||||
archive_name = "hello" | archive_name = "hello" | ||||
archive_path = Path(datadir, f"{archive_name}.tgz") | archive_path = Path(datadir, f"{archive_name}.tgz") | ||||
json_path = Path(datadir, f"{archive_name}.json") | json_path = Path(datadir, f"{archive_name}.json") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
# first load with missing commits | # first load with missing commits | ||||
hg_strip(repo_url.replace("file://", ""), "tip") | hg_strip(repo_url.replace("file://", ""), "tip") | ||||
loader = HgLoaderFromDisk(swh_storage, repo_url) | loader = HgLoader(swh_storage, repo_url) | ||||
assert loader.load() == {"status": "eventful"} | assert loader.load() == {"status": "eventful"} | ||||
assert get_stats(loader.storage) == { | assert get_stats(loader.storage) == { | ||||
"content": 2, | "content": 2, | ||||
"directory": 2, | "directory": 2, | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 1, | "origin_visit": 1, | ||||
"release": 0, | "release": 0, | ||||
"revision": 2, | "revision": 2, | ||||
"skipped_content": 0, | "skipped_content": 0, | ||||
"snapshot": 1, | "snapshot": 1, | ||||
} | } | ||||
# second load with all commits | # second load with all commits | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
loader = HgLoaderFromDisk(swh_storage, repo_url) | loader = HgLoader(swh_storage, repo_url) | ||||
checker = LoaderChecker(loader=loader, expected=ExpectedSwhids.load(json_path),) | checker = LoaderChecker(loader=loader, expected=ExpectedSwhids.load(json_path),) | ||||
checker.check() | checker.check() | ||||
assert get_stats(loader.storage) == { | assert get_stats(loader.storage) == { | ||||
"content": 3, | "content": 3, | ||||
"directory": 3, | "directory": 3, | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 2, | "origin_visit": 2, | ||||
"release": 1, | "release": 1, | ||||
"revision": 3, | "revision": 3, | ||||
"skipped_content": 0, | "skipped_content": 0, | ||||
"snapshot": 2, | "snapshot": 2, | ||||
} | } | ||||
def test_load_repo_check_extids_write_version(swh_storage, datadir, tmp_path): | def test_load_repo_check_extids_write_version(swh_storage, datadir, tmp_path): | ||||
"""ExtIDs should be stored with a given version when loading is done""" | """ExtIDs should be stored with a given version when loading is done""" | ||||
archive_name = "hello" | archive_name = "hello" | ||||
archive_path = Path(datadir, f"{archive_name}.tgz") | archive_path = Path(datadir, f"{archive_name}.tgz") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
hg_strip(repo_url.replace("file://", ""), "tip") | hg_strip(repo_url.replace("file://", ""), "tip") | ||||
loader = HgLoaderFromDisk(swh_storage, repo_url) | loader = HgLoader(swh_storage, repo_url) | ||||
assert loader.load() == {"status": "eventful"} | assert loader.load() == {"status": "eventful"} | ||||
# Ensure we write ExtIDs to a specific version. | # Ensure we write ExtIDs to a specific version. | ||||
snapshot = snapshot_get_latest(swh_storage, repo_url) | snapshot = snapshot_get_latest(swh_storage, repo_url) | ||||
# First, filter out revisions from that snapshot | # First, filter out revisions from that snapshot | ||||
revision_ids = [ | revision_ids = [ | ||||
branch.target | branch.target | ||||
Show All 14 Lines | |||||
def test_load_new_extid_should_be_eventful(swh_storage, datadir, tmp_path): | def test_load_new_extid_should_be_eventful(swh_storage, datadir, tmp_path): | ||||
"""Changing the extid version should make loaders ignore existing extids, | """Changing the extid version should make loaders ignore existing extids, | ||||
and load the repo again.""" | and load the repo again.""" | ||||
archive_name = "hello" | archive_name = "hello" | ||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | archive_path = os.path.join(datadir, f"{archive_name}.tgz") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
repo_path = repo_url.replace("file://", "") | repo_path = repo_url.replace("file://", "") | ||||
with unittest.mock.patch("swh.loader.mercurial.from_disk.EXTID_VERSION", 0): | with unittest.mock.patch("swh.loader.mercurial.loader.EXTID_VERSION", 0): | ||||
loader = HgLoaderFromDisk(swh_storage, repo_path) | loader = HgLoader(swh_storage, repo_path) | ||||
assert loader.load() == {"status": "eventful"} | assert loader.load() == {"status": "eventful"} | ||||
loader = HgLoaderFromDisk(swh_storage, repo_path) | loader = HgLoader(swh_storage, repo_path) | ||||
assert loader.load() == {"status": "eventful"} | assert loader.load() == {"status": "eventful"} | ||||
loader = HgLoaderFromDisk(swh_storage, repo_path) | loader = HgLoader(swh_storage, repo_path) | ||||
assert loader.load() == {"status": "uneventful"} | assert loader.load() == {"status": "uneventful"} | ||||
with unittest.mock.patch("swh.loader.mercurial.from_disk.EXTID_VERSION", 10000): | with unittest.mock.patch("swh.loader.mercurial.loader.EXTID_VERSION", 10000): | ||||
loader = HgLoaderFromDisk(swh_storage, repo_path) | loader = HgLoader(swh_storage, repo_path) | ||||
assert loader.load() == {"status": "eventful"} | assert loader.load() == {"status": "eventful"} | ||||
loader = HgLoaderFromDisk(swh_storage, repo_path) | loader = HgLoader(swh_storage, repo_path) | ||||
assert loader.load() == {"status": "uneventful"} | assert loader.load() == {"status": "uneventful"} | ||||
def test_loader_hg_extid_filtering(swh_storage, datadir, tmp_path): | def test_loader_hg_extid_filtering(swh_storage, datadir, tmp_path): | ||||
"""The first visit of a fork should filter already seen revisions (through extids) | """The first visit of a fork should filter already seen revisions (through extids) | ||||
""" | """ | ||||
archive_name = "the-sandbox" | archive_name = "the-sandbox" | ||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | archive_path = os.path.join(datadir, f"{archive_name}.tgz") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
loader = HgLoaderFromDisk(swh_storage, url=repo_url) | loader = HgLoader(swh_storage, url=repo_url) | ||||
assert loader.load() == {"status": "eventful"} | assert loader.load() == {"status": "eventful"} | ||||
stats = get_stats(loader.storage) | stats = get_stats(loader.storage) | ||||
expected_stats = { | expected_stats = { | ||||
"content": 2, | "content": 2, | ||||
"directory": 3, | "directory": 3, | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 1, | "origin_visit": 1, | ||||
"release": 0, | "release": 0, | ||||
"revision": 58, | "revision": 58, | ||||
"skipped_content": 0, | "skipped_content": 0, | ||||
"snapshot": 1, | "snapshot": 1, | ||||
} | } | ||||
assert stats == expected_stats | assert stats == expected_stats | ||||
visit_status = assert_last_visit_matches( | visit_status = assert_last_visit_matches( | ||||
loader.storage, repo_url, status="full", type="hg", | loader.storage, repo_url, status="full", type="hg", | ||||
) | ) | ||||
# Make a fork of the first repository we ingested | # Make a fork of the first repository we ingested | ||||
fork_url = prepare_repository_from_archive( | fork_url = prepare_repository_from_archive( | ||||
archive_path, "the-sandbox-reloaded", tmp_path | archive_path, "the-sandbox-reloaded", tmp_path | ||||
) | ) | ||||
loader2 = HgLoaderFromDisk( | loader2 = HgLoader( | ||||
swh_storage, url=fork_url, directory=str(tmp_path / archive_name) | swh_storage, url=fork_url, directory=str(tmp_path / archive_name) | ||||
) | ) | ||||
assert loader2.load() == {"status": "uneventful"} | assert loader2.load() == {"status": "uneventful"} | ||||
stats = get_stats(loader.storage) | stats = get_stats(loader.storage) | ||||
expected_stats2 = expected_stats.copy() | expected_stats2 = expected_stats.copy() | ||||
expected_stats2.update( | expected_stats2.update( | ||||
Show All 11 Lines | |||||
def test_loader_repository_with_bookmark_information(swh_storage, datadir, tmp_path): | def test_loader_repository_with_bookmark_information(swh_storage, datadir, tmp_path): | ||||
"""Repository with bookmark information should be ingested correctly | """Repository with bookmark information should be ingested correctly | ||||
""" | """ | ||||
archive_name = "anomad-d" | archive_name = "anomad-d" | ||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | archive_path = os.path.join(datadir, f"{archive_name}.tgz") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
loader = HgLoaderFromDisk(swh_storage, url=repo_url) | loader = HgLoader(swh_storage, url=repo_url) | ||||
assert loader.load() == {"status": "eventful"} | assert loader.load() == {"status": "eventful"} |