Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/cvs/tests/test_loader.py
Show First 20 Lines • Show All 150 Lines • ▼ Show 20 Lines | assert stats == { | ||||
"revision": 7, | "revision": 7, | ||||
"skipped_content": 0, | "skipped_content": 0, | ||||
"snapshot": 7, | "snapshot": 7, | ||||
} | } | ||||
check_snapshot(GREEK_SNAPSHOT, loader.storage) | check_snapshot(GREEK_SNAPSHOT, loader.storage) | ||||
def test_loader_cvs_pserver_with_file_additions_and_deletions( | |||||
swh_storage, datadir, tmp_path | |||||
): | |||||
"""Eventful CVS pserver conversion with file additions and deletions""" | |||||
archive_name = "greek-repository" | |||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | |||||
repo_url = prepare_repository_from_archive(archive_path, archive_name, tmp_path) | |||||
repo_url += "/greek-tree" # CVS module name | |||||
# Ask our cvsclient to connect via the 'cvs server' command | |||||
repo_url = f"fake://{repo_url[7:]}" | |||||
loader = CvsLoader( | |||||
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, archive_name) | |||||
) | |||||
assert loader.load() == {"status": "eventful"} | |||||
assert_last_visit_matches( | |||||
loader.storage, repo_url, status="full", type="cvs", snapshot=GREEK_SNAPSHOT.id, | |||||
) | |||||
stats = get_stats(loader.storage) | |||||
assert stats == { | |||||
"content": 8, | |||||
"directory": 20, | |||||
"origin": 1, | |||||
"origin_visit": 1, | |||||
"release": 0, | |||||
"revision": 7, | |||||
"skipped_content": 0, | |||||
"snapshot": 7, | |||||
} | |||||
check_snapshot(GREEK_SNAPSHOT, loader.storage) | |||||
GREEK_SNAPSHOT2 = Snapshot( | GREEK_SNAPSHOT2 = Snapshot( | ||||
id=hash_to_bytes("048885ae2145ffe81588aea95dcf75c536ecdf26"), | id=hash_to_bytes("048885ae2145ffe81588aea95dcf75c536ecdf26"), | ||||
branches={ | branches={ | ||||
b"HEAD": SnapshotBranch( | b"HEAD": SnapshotBranch( | ||||
target=hash_to_bytes("55eb1438c03588607ce4b8db8f45e8e23075951b"), | target=hash_to_bytes("55eb1438c03588607ce4b8db8f45e8e23075951b"), | ||||
target_type=TargetType.REVISION, | target_type=TargetType.REVISION, | ||||
) | ) | ||||
}, | }, | ||||
▲ Show 20 Lines • Show All 236 Lines • ▼ Show 20 Lines | assert stats == { | ||||
"origin_visit": 1, | "origin_visit": 1, | ||||
"release": 0, | "release": 0, | ||||
"revision": 8, | "revision": 8, | ||||
"skipped_content": 0, | "skipped_content": 0, | ||||
"snapshot": 8, | "snapshot": 8, | ||||
} | } | ||||
check_snapshot(GREEK_SNAPSHOT4, loader.storage) | check_snapshot(GREEK_SNAPSHOT4, loader.storage) | ||||
GREEK_SNAPSHOT5 = Snapshot( | |||||
id=hash_to_bytes("ee6faeaf50aa513c53c8ba29194116a5ef88add6"), | |||||
branches={ | |||||
b"HEAD": SnapshotBranch( | |||||
target=hash_to_bytes("4320f152cc61ed660d25fdeebc787b3099e55a96"), | |||||
target_type=TargetType.REVISION, | |||||
) | |||||
}, | |||||
) | |||||
def test_loader_cvs_with_file_deleted_and_readded(swh_storage, datadir, tmp_path): | |||||
"""Eventful conversion of history with file deletion and re-addition""" | |||||
archive_name = "greek-repository5" | |||||
extracted_name = "greek-repository" | |||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | |||||
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) | |||||
repo_url += "/greek-tree" # CVS module name | |||||
loader = CvsLoader( | |||||
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) | |||||
) | |||||
assert loader.load() == {"status": "eventful"} | |||||
assert_last_visit_matches( | |||||
loader.storage, | |||||
repo_url, | |||||
status="full", | |||||
type="cvs", | |||||
snapshot=GREEK_SNAPSHOT5.id, | |||||
) | |||||
stats = get_stats(loader.storage) | |||||
assert stats == { | |||||
"content": 9, | |||||
"directory": 22, | |||||
"origin": 1, | |||||
"origin_visit": 1, | |||||
"release": 0, | |||||
"revision": 8, | |||||
"skipped_content": 0, | |||||
"snapshot": 8, | |||||
} | |||||
check_snapshot(GREEK_SNAPSHOT5, loader.storage) | |||||
def test_loader_cvs_pserver_with_file_deleted_and_readded( | |||||
swh_storage, datadir, tmp_path | |||||
): | |||||
"""Eventful pserver conversion with file deletion and re-addition""" | |||||
archive_name = "greek-repository5" | |||||
extracted_name = "greek-repository" | |||||
archive_path = os.path.join(datadir, f"{archive_name}.tgz") | |||||
repo_url = prepare_repository_from_archive(archive_path, extracted_name, tmp_path) | |||||
repo_url += "/greek-tree" # CVS module name | |||||
# Ask our cvsclient to connect via the 'cvs server' command | |||||
repo_url = f"fake://{repo_url[7:]}" | |||||
loader = CvsLoader( | |||||
swh_storage, repo_url, cvsroot_path=os.path.join(tmp_path, extracted_name) | |||||
) | |||||
assert loader.load() == {"status": "eventful"} | |||||
assert_last_visit_matches( | |||||
loader.storage, | |||||
repo_url, | |||||
status="full", | |||||
type="cvs", | |||||
snapshot=GREEK_SNAPSHOT5.id, | |||||
) | |||||
stats = get_stats(loader.storage) | |||||
assert stats == { | |||||
"content": 9, | |||||
"directory": 22, | |||||
"origin": 1, | |||||
"origin_visit": 1, | |||||
"release": 0, | |||||
"revision": 8, | |||||
"skipped_content": 0, | |||||
"snapshot": 8, | |||||
} | |||||
check_snapshot(GREEK_SNAPSHOT5, loader.storage) |