Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/opam/tests/test_opam.py
Show First 20 Lines • Show All 104 Lines • ▼ Show 20 Lines | loader = OpamLoader( | ||||
opam_instance, | opam_instance, | ||||
opam_url, | opam_url, | ||||
opam_package, | opam_package, | ||||
initialize_opam_root=True, | initialize_opam_root=True, | ||||
) | ) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
expected_snapshot_id = hash_to_bytes("4e4bf977312460329d7f769b0be89937c9827efc") | expected_snapshot_id = hash_to_bytes("50b5961c27dd4f8b138acce8bac4f90d1e33081f") | ||||
assert actual_load_status == { | assert actual_load_status == { | ||||
"status": "eventful", | "status": "eventful", | ||||
"snapshot_id": expected_snapshot_id.hex(), | "snapshot_id": expected_snapshot_id.hex(), | ||||
} | } | ||||
target = b"S\x8c\x8aq\xdcy\xa4/0\xa0\xb2j\xeb\xc1\x16\xad\xce\x06\xeaV" | |||||
expected_snapshot = Snapshot( | expected_snapshot = Snapshot( | ||||
id=expected_snapshot_id, | id=expected_snapshot_id, | ||||
branches={ | branches={ | ||||
b"HEAD": SnapshotBranch(target=b"agrid.0.1", target_type=TargetType.ALIAS,), | b"HEAD": SnapshotBranch(target=b"agrid.0.1", target_type=TargetType.ALIAS,), | ||||
b"agrid.0.1": SnapshotBranch( | b"agrid.0.1": SnapshotBranch( | ||||
target=target, target_type=TargetType.REVISION, | target=hash_to_bytes("efcb9ef9d0f2a85312463251732b42f9e45a5c12"), | ||||
target_type=TargetType.RELEASE, | |||||
), | ), | ||||
}, | }, | ||||
) | ) | ||||
assert_last_visit_matches( | assert_last_visit_matches( | ||||
swh_storage, url, status="full", type="opam", snapshot=expected_snapshot_id | swh_storage, url, status="full", type="opam", snapshot=expected_snapshot_id | ||||
) | ) | ||||
check_snapshot(expected_snapshot, swh_storage) | check_snapshot(expected_snapshot, swh_storage) | ||||
stats = get_stats(swh_storage) | stats = get_stats(swh_storage) | ||||
assert { | assert { | ||||
"content": 18, | "content": 18, | ||||
"directory": 8, | "directory": 8, | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 1, | "origin_visit": 1, | ||||
"release": 0, | "release": 1, | ||||
"revision": 1, | "revision": 0, | ||||
"skipped_content": 0, | "skipped_content": 0, | ||||
"snapshot": 1, | "snapshot": 1, | ||||
} == stats | } == stats | ||||
def test_opam_loader_many_version(tmpdir, requests_mock_datadir, datadir, swh_storage): | def test_opam_loader_many_version(tmpdir, requests_mock_datadir, datadir, swh_storage): | ||||
opam_url = f"file://{datadir}/fake_opam_repo" | opam_url = f"file://{datadir}/fake_opam_repo" | ||||
Show All 9 Lines | loader = OpamLoader( | ||||
opam_instance, | opam_instance, | ||||
opam_url, | opam_url, | ||||
opam_package, | opam_package, | ||||
initialize_opam_root=True, | initialize_opam_root=True, | ||||
) | ) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
expected_snapshot_id = hash_to_bytes("1b49be175dcf17c0f568bcd7aac3d4faadc41249") | expected_snapshot_id = hash_to_bytes("f0a974e47999e74d323f1fb9604fde72527bda28") | ||||
assert actual_load_status == { | assert actual_load_status == { | ||||
"status": "eventful", | "status": "eventful", | ||||
"snapshot_id": expected_snapshot_id.hex(), | "snapshot_id": expected_snapshot_id.hex(), | ||||
} | } | ||||
expected_snapshot = Snapshot( | expected_snapshot = Snapshot( | ||||
id=expected_snapshot_id, | id=expected_snapshot_id, | ||||
branches={ | branches={ | ||||
b"HEAD": SnapshotBranch( | b"HEAD": SnapshotBranch( | ||||
target=b"directories.0.3", target_type=TargetType.ALIAS, | target=b"directories.0.3", target_type=TargetType.ALIAS, | ||||
), | ), | ||||
b"directories.0.1": SnapshotBranch( | b"directories.0.1": SnapshotBranch( | ||||
target=b"N\x92jA\xb2\x892\xeb\xcc\x9c\xa9\xb3\xea\xa7kz\xb08\xa6V", | target=hash_to_bytes("1f839cb1f4720d6b33fdd856e3ff1119497979d9"), | ||||
target_type=TargetType.REVISION, | target_type=TargetType.RELEASE, | ||||
), | ), | ||||
b"directories.0.2": SnapshotBranch( | b"directories.0.2": SnapshotBranch( | ||||
target=b"yj\xc9\x1a\x8f\xe0\xaa\xff[\x88\xffz" | target=hash_to_bytes("4133834d966381804347efbc41e35dd2bdd48962"), | ||||
b"\x91C\xcc\x96\xb7\xd4\xf65", | target_type=TargetType.RELEASE, | ||||
target_type=TargetType.REVISION, | |||||
), | ), | ||||
b"directories.0.3": SnapshotBranch( | b"directories.0.3": SnapshotBranch( | ||||
target=b"hA \xc4\xb5\x18A8\xb8C\x12\xa3\xa5T\xb7/v\x85X\xcb", | target=hash_to_bytes("2f20cabfbacfe447b80dc2a4eb14d461775100c8"), | ||||
target_type=TargetType.REVISION, | target_type=TargetType.RELEASE, | ||||
), | ), | ||||
}, | }, | ||||
) | ) | ||||
assert_last_visit_matches( | assert_last_visit_matches( | ||||
swh_storage, url, status="full", type="opam", snapshot=expected_snapshot_id | swh_storage, url, status="full", type="opam", snapshot=expected_snapshot_id | ||||
) | ) | ||||
check_snapshot(expected_snapshot, swh_storage) | check_snapshot(expected_snapshot, swh_storage) | ||||
def test_opam_revision(tmpdir, requests_mock_datadir, swh_storage, datadir): | def test_opam_release(tmpdir, requests_mock_datadir, swh_storage, datadir): | ||||
opam_url = f"file://{datadir}/fake_opam_repo" | opam_url = f"file://{datadir}/fake_opam_repo" | ||||
opam_root = tmpdir | opam_root = tmpdir | ||||
opam_instance = "loadertest" | opam_instance = "loadertest" | ||||
opam_package = "ocb" | opam_package = "ocb" | ||||
url = f"opam+{opam_url}/packages/{opam_package}" | url = f"opam+{opam_url}/packages/{opam_package}" | ||||
loader = OpamLoader( | loader = OpamLoader( | ||||
swh_storage, | swh_storage, | ||||
url, | url, | ||||
opam_root, | opam_root, | ||||
opam_instance, | opam_instance, | ||||
opam_url, | opam_url, | ||||
opam_package, | opam_package, | ||||
initialize_opam_root=True, | initialize_opam_root=True, | ||||
) | ) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
expected_snapshot_id = hash_to_bytes("398df115b9feb2f463efd21941d69b7d59cd9025") | expected_snapshot_id = hash_to_bytes("987425c6fe94d3972c4c4e97ee27a6a7c8b68e82") | ||||
assert actual_load_status == { | assert actual_load_status == { | ||||
"status": "eventful", | "status": "eventful", | ||||
"snapshot_id": expected_snapshot_id.hex(), | "snapshot_id": expected_snapshot_id.hex(), | ||||
} | } | ||||
info_iter = loader.get_package_info("0.1") | info_iter = loader.get_package_info("0.1") | ||||
branch_name, package_info = next(info_iter) | branch_name, package_info = next(info_iter) | ||||
expected_branch_name = "ocb.0.1" | expected_branch_name = "ocb.0.1" | ||||
Show All 12 Lines | expected_package_info = OpamPackageInfo( | ||||
metadata=OCB_METADATA, format="opam-package-definition", | metadata=OCB_METADATA, format="opam-package-definition", | ||||
) | ) | ||||
], | ], | ||||
) | ) | ||||
assert branch_name == expected_branch_name | assert branch_name == expected_branch_name | ||||
assert package_info == expected_package_info | assert package_info == expected_package_info | ||||
revision_id = b"o\xad\x7f=\x07\xbb\xaah\xdbI(\xb0'\x10z\xfc\xff\x06x\x1b" | release_id = hash_to_bytes("8d0612cdf172e5dff3d876ca2bbc0f6003cc36cc") | ||||
expected_snapshot = Snapshot( | |||||
id=hash_to_bytes(actual_load_status["snapshot_id"]), | |||||
branches={ | |||||
b"HEAD": SnapshotBranch(target=b"ocb.0.1", target_type=TargetType.ALIAS,), | |||||
b"ocb.0.1": SnapshotBranch( | |||||
target=release_id, target_type=TargetType.RELEASE, | |||||
), | |||||
}, | |||||
) | |||||
assert_last_visit_matches( | |||||
swh_storage, url, status="full", type="opam", snapshot=expected_snapshot.id | |||||
) | |||||
check_snapshot(expected_snapshot, swh_storage) | |||||
revision = swh_storage.revision_get([revision_id])[0] | release = swh_storage.release_get([release_id])[0] | ||||
assert revision is not None | assert release is not None | ||||
assert revision.author == expected_package_info.author | assert release.author == expected_package_info.author | ||||
assert revision.committer == expected_package_info.committer | |||||
def test_opam_metadata(tmpdir, requests_mock_datadir, swh_storage, datadir): | def test_opam_metadata(tmpdir, requests_mock_datadir, swh_storage, datadir): | ||||
opam_url = f"file://{datadir}/fake_opam_repo" | opam_url = f"file://{datadir}/fake_opam_repo" | ||||
opam_root = tmpdir | opam_root = tmpdir | ||||
opam_instance = "loadertest" | opam_instance = "loadertest" | ||||
opam_package = "ocb" | opam_package = "ocb" | ||||
url = f"opam+{opam_url}/packages/{opam_package}" | url = f"opam+{opam_url}/packages/{opam_package}" | ||||
loader = OpamLoader( | loader = OpamLoader( | ||||
swh_storage, | swh_storage, | ||||
url, | url, | ||||
opam_root, | opam_root, | ||||
opam_instance, | opam_instance, | ||||
opam_url, | opam_url, | ||||
opam_package, | opam_package, | ||||
initialize_opam_root=True, | initialize_opam_root=True, | ||||
) | ) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status["status"] == "eventful" | assert actual_load_status["status"] == "eventful" | ||||
expected_revision_id = b"o\xad\x7f=\x07\xbb\xaah\xdbI(\xb0'\x10z\xfc\xff\x06x\x1b" | expected_release_id = hash_to_bytes("8d0612cdf172e5dff3d876ca2bbc0f6003cc36cc") | ||||
expected_snapshot = Snapshot( | |||||
id=hash_to_bytes(actual_load_status["snapshot_id"]), | |||||
branches={ | |||||
b"HEAD": SnapshotBranch(target=b"ocb.0.1", target_type=TargetType.ALIAS,), | |||||
b"ocb.0.1": SnapshotBranch( | |||||
target=expected_release_id, target_type=TargetType.RELEASE, | |||||
), | |||||
}, | |||||
) | |||||
assert_last_visit_matches( | |||||
swh_storage, url, status="full", type="opam", snapshot=expected_snapshot.id | |||||
) | |||||
check_snapshot(expected_snapshot, swh_storage) | |||||
revision = swh_storage.revision_get([expected_revision_id])[0] | release = swh_storage.release_get([expected_release_id])[0] | ||||
assert revision is not None | assert release is not None | ||||
revision_swhid = CoreSWHID( | release_swhid = CoreSWHID( | ||||
object_type=ObjectType.REVISION, object_id=expected_revision_id | object_type=ObjectType.RELEASE, object_id=expected_release_id | ||||
) | ) | ||||
directory_swhid = ExtendedSWHID( | directory_swhid = ExtendedSWHID( | ||||
object_type=ExtendedObjectType.DIRECTORY, object_id=revision.directory | object_type=ExtendedObjectType.DIRECTORY, object_id=release.target | ||||
) | ) | ||||
metadata_authority = MetadataAuthority( | metadata_authority = MetadataAuthority( | ||||
type=MetadataAuthorityType.FORGE, url=opam_url, | type=MetadataAuthorityType.FORGE, url=opam_url, | ||||
) | ) | ||||
expected_metadata = [ | expected_metadata = [ | ||||
RawExtrinsicMetadata( | RawExtrinsicMetadata( | ||||
target=directory_swhid, | target=directory_swhid, | ||||
authority=metadata_authority, | authority=metadata_authority, | ||||
fetcher=MetadataFetcher( | fetcher=MetadataFetcher( | ||||
name="swh.loader.package.opam.loader.OpamLoader", version=__version__, | name="swh.loader.package.opam.loader.OpamLoader", version=__version__, | ||||
), | ), | ||||
discovery_date=loader.visit_date, | discovery_date=loader.visit_date, | ||||
format="opam-package-definition", | format="opam-package-definition", | ||||
metadata=OCB_METADATA, | metadata=OCB_METADATA, | ||||
origin=url, | origin=url, | ||||
revision=revision_swhid, | release=release_swhid, | ||||
) | ) | ||||
] | ] | ||||
assert swh_storage.raw_extrinsic_metadata_get( | assert swh_storage.raw_extrinsic_metadata_get( | ||||
directory_swhid, metadata_authority, | directory_swhid, metadata_authority, | ||||
) == PagedResult(next_page_token=None, results=expected_metadata,) | ) == PagedResult(next_page_token=None, results=expected_metadata,) |