Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/mercurial/tests/test_from_disk.py
Show First 20 Lines • Show All 235 Lines • ▼ Show 20 Lines | for rev in loader.storage.revision_log(revisions): | ||||
for k, v in rev["extra_headers"]: | for k, v in rev["extra_headers"]: | ||||
if k == b"transplant_source": | if k == b"transplant_source": | ||||
transplant_sources.add(v.decode("ascii")) | transplant_sources.add(v.decode("ascii")) | ||||
# check extracted data are valid | # check extracted data are valid | ||||
assert len(hg_changesets) > 0 | assert len(hg_changesets) > 0 | ||||
assert len(transplant_sources) > 0 | assert len(transplant_sources) > 0 | ||||
assert transplant_sources.issubset(hg_changesets) | assert transplant_sources.issubset(hg_changesets) | ||||
def test_load_unchanged_repo_should_be_uneventfull(swh_config, datadir, tmp_path): | |||||
ardumont: swh_config is no longer needed since you inject the swh_storage instance into the loader now. | |||||
archive_name = "hello" | |||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | |||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | |||||
repo_path = repo_url.replace("file://", "") | |||||
print(repo_path) | |||||
loader = HgLoaderFromDisk(repo_url) | |||||
assert loader.load() == {"status": "eventful"} | |||||
assert get_stats(loader.storage) == { | |||||
"content": 3, | |||||
"directory": 3, | |||||
"origin": 1, | |||||
"origin_visit": 1, | |||||
"release": 1, | |||||
"revision": 3, | |||||
"skipped_content": 0, | |||||
"snapshot": 1, | |||||
} | |||||
assert loader.load() == {"status": "uneventful"} | |||||
assert get_stats(loader.storage) == { | |||||
"content": 3, | |||||
"directory": 3, | |||||
"origin": 1, | |||||
"origin_visit": 2, | |||||
"release": 1, | |||||
"revision": 3, | |||||
"skipped_content": 0, | |||||
"snapshot": 1, | |||||
} |
swh_config is no longer needed since you inject the swh_storage instance into the loader now.