Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/mercurial/tests/test_loader.py
- This file was copied to swh/loader/mercurial/tests/test_from_disk.py.
Show All 16 Lines | from swh.loader.tests import ( | ||||
get_stats, | get_stats, | ||||
prepare_repository_from_archive, | prepare_repository_from_archive, | ||||
) | ) | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType | from swh.model.model import RevisionType, Snapshot, SnapshotBranch, TargetType | ||||
from swh.storage.algos.snapshot import snapshot_get_latest | from swh.storage.algos.snapshot import snapshot_get_latest | ||||
from ..loader import CloneTimeoutError, HgArchiveBundle20Loader, HgBundle20Loader | from ..loader import CloneTimeoutError, HgArchiveBundle20Loader, HgBundle20Loader | ||||
from .loader_checker import ExpectedSwhids, LoaderChecker | |||||
def test_examples(swh_config, datadir, tmp_path): | |||||
for archive_name in ("hello", "transplant", "the-sandbox", "example"): | |||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | |||||
json_path = os.path.join(datadir, f"{archive_name}.json") | |||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | |||||
LoaderChecker( | |||||
loader=HgBundle20Loader(repo_url), expected=ExpectedSwhids.load(json_path), | |||||
).check() | |||||
def test_loader_hg_new_visit_no_release(swh_config, datadir, tmp_path): | def test_loader_hg_new_visit_no_release(swh_config, datadir, tmp_path): | ||||
"""Eventful visit should yield 1 snapshot""" | """Eventful visit should yield 1 snapshot""" | ||||
archive_name = "the-sandbox" | archive_name = "the-sandbox" | ||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | archive_path = os.path.join(datadir, f"{archive_name}.tgz") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
▲ Show 20 Lines • Show All 64 Lines • ▼ Show 20 Lines | assert_last_visit_matches( | ||||
snapshot=expected_snapshot.id, | snapshot=expected_snapshot.id, | ||||
) | ) | ||||
def test_loader_hg_new_visit_with_release(swh_config, datadir, tmp_path): | def test_loader_hg_new_visit_with_release(swh_config, datadir, tmp_path): | ||||
"""Eventful visit with release should yield 1 snapshot""" | """Eventful visit with release should yield 1 snapshot""" | ||||
archive_name = "hello" | archive_name = "hello" | ||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | archive_path = os.path.join(datadir, f"{archive_name}.tgz") | ||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | ||||
marmoute: What is going on here, and why is it correct ? | |||||
loader = HgBundle20Loader(url=repo_url, visit_date="2016-05-03 15:16:32+00",) | loader = HgBundle20Loader(url=repo_url, visit_date="2016-05-03 15:16:32+00",) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status == {"status": "eventful"} | assert actual_load_status == {"status": "eventful"} | ||||
# then | # then | ||||
stats = get_stats(loader.storage) | stats = get_stats(loader.storage) | ||||
▲ Show 20 Lines • Show All 185 Lines • Show Last 20 Lines |
What is going on here, and why is it correct ?