Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/opam/tests/test_opam.py
# Copyright (C) 2019-2021 The Software Heritage developers | # Copyright (C) 2019-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from os.path import exists | |||||
import shutil | |||||
import pytest | |||||
from swh.loader.package import __version__ | from swh.loader.package import __version__ | ||||
from swh.loader.package.loader import RawExtrinsicMetadataCore | from swh.loader.package.loader import RawExtrinsicMetadataCore | ||||
from swh.loader.package.opam.loader import OpamLoader, OpamPackageInfo | from swh.loader.package.opam.loader import OpamLoader, OpamPackageInfo | ||||
from swh.loader.tests import assert_last_visit_matches, check_snapshot, get_stats | from swh.loader.tests import assert_last_visit_matches, check_snapshot, get_stats | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Person, | Person, | ||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
▲ Show 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | url { | ||||
checksum: [ | checksum: [ | ||||
"sha256=aa27684fbda1b8036ae7e3c87de33a98a9cd2662bcc91c8447e00e41476b6a46" | "sha256=aa27684fbda1b8036ae7e3c87de33a98a9cd2662bcc91c8447e00e41476b6a46" | ||||
"sha512=1260344f184dd8c8074b0439dbcc8a5d59550a654c249cd61913d4c150c664f37b76195ddca38f7f6646d08bddb320ceb8d420508450b4f09a233cd5c22e6b9b" | "sha512=1260344f184dd8c8074b0439dbcc8a5d59550a654c249cd61913d4c150c664f37b76195ddca38f7f6646d08bddb320ceb8d420508450b4f09a233cd5c22e6b9b" | ||||
] | ] | ||||
} | } | ||||
""" # noqa | """ # noqa | ||||
@pytest.fixture | |||||
def fake_opam_root(mocker, tmpdir, datadir): | |||||
"""Fixture to initialize the actual opam in test context. It mocks the actual opam init | |||||
calls and installs a fake opam root out of the one present in datadir. | |||||
""" | |||||
# inhibits the real `subprocess.call` which prepares the required internal opam | |||||
# state | |||||
module_name = "swh.loader.package.opam.loader" | |||||
mock_init = mocker.patch(f"{module_name}.call", return_value=None) | |||||
# Installs the fake opam root for the tests to use | |||||
fake_opam_root_src = f"{datadir}/fake_opam_repo" | |||||
fake_opam_root_dst = f"{tmpdir}/opam" | |||||
# old version does not support dirs_exist_ok... | |||||
# TypeError: copytree() got an unexpected keyword argument 'dirs_exist_ok' | |||||
# see: https://docs.python.org/3.7/library/shutil.html | |||||
if exists(fake_opam_root_dst): | |||||
shutil.rmtree(fake_opam_root_dst) | |||||
shutil.copytree(fake_opam_root_src, fake_opam_root_dst) | |||||
yield fake_opam_root_dst | |||||
# loader are initialized with `initialize_opam_root=True` so this should be called | |||||
assert mock_init.called, "This should be called when loader use this fixture" | |||||
def test_opam_loader_no_opam_repository_fails(swh_storage, tmpdir, datadir): | def test_opam_loader_no_opam_repository_fails(swh_storage, tmpdir, datadir): | ||||
"""Running opam loader without a prepared opam repository fails""" | """Running opam loader without a prepared opam repository fails""" | ||||
opam_url = f"file://{datadir}/fake_opam_repo" | opam_url = f"file://{datadir}/fake_opam_repo" | ||||
opam_root = tmpdir | opam_root = tmpdir | ||||
opam_instance = "loadertest" | opam_instance = "loadertest" | ||||
opam_package = "agrid" | opam_package = "agrid" | ||||
url = f"opam+{opam_url}/packages/{opam_package}" | url = f"opam+{opam_url}/packages/{opam_package}" | ||||
loader = OpamLoader( | loader = OpamLoader( | ||||
swh_storage, | swh_storage, | ||||
url, | url, | ||||
opam_root, | opam_root, | ||||
opam_instance, | opam_instance, | ||||
opam_url, | opam_url, | ||||
opam_package, | opam_package, | ||||
initialize_opam_root=False, # The opam directory must be present | initialize_opam_root=False, # The opam directory must be present and no init... | ||||
) | ) | ||||
# No opam root directory init directory from loader. So, at the opam root does not | # No opam root directory init directory from loader. So, at the opam root does not | ||||
# exist, the loading fails. That's the expected use for the production workers | # exist, the loading fails. That's the expected use for the production workers | ||||
# (whose opam_root maintenance will be externally managed). | # (whose opam_root maintenance will be externally managed). | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status == {"status": "failed"} | assert actual_load_status == {"status": "failed"} | ||||
def test_opam_loader_one_version(tmpdir, requests_mock_datadir, datadir, swh_storage): | def test_opam_loader_one_version( | ||||
tmpdir, requests_mock_datadir, fake_opam_root, datadir, swh_storage | |||||
): | |||||
opam_url = f"file://{datadir}/fake_opam_repo" | opam_url = f"file://{datadir}/fake_opam_repo" | ||||
opam_root = tmpdir | opam_root = fake_opam_root | ||||
opam_instance = "loadertest" | opam_instance = "loadertest" | ||||
opam_package = "agrid" | opam_package = "agrid" | ||||
url = f"opam+{opam_url}/packages/{opam_package}" | url = f"opam+{opam_url}/packages/{opam_package}" | ||||
loader = OpamLoader( | loader = OpamLoader( | ||||
swh_storage, | swh_storage, | ||||
url, | url, | ||||
opam_root, | opam_root, | ||||
opam_instance, | opam_instance, | ||||
opam_url, | opam_url, | ||||
opam_package, | opam_package, | ||||
initialize_opam_root=True, | initialize_opam_root=True, # go through the initialization while mocking it | ||||
) | ) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
expected_snapshot_id = hash_to_bytes("e1159446b00745ba4daa7ee26d74fbd81ecc081c") | expected_snapshot_id = hash_to_bytes("e1159446b00745ba4daa7ee26d74fbd81ecc081c") | ||||
assert actual_load_status == { | assert actual_load_status == { | ||||
"status": "eventful", | "status": "eventful", | ||||
"snapshot_id": expected_snapshot_id.hex(), | "snapshot_id": expected_snapshot_id.hex(), | ||||
Show All 37 Lines | assert { | ||||
"origin_visit": 1, | "origin_visit": 1, | ||||
"release": 1, | "release": 1, | ||||
"revision": 0, | "revision": 0, | ||||
"skipped_content": 0, | "skipped_content": 0, | ||||
"snapshot": 1, | "snapshot": 1, | ||||
} == stats | } == stats | ||||
def test_opam_loader_many_version(tmpdir, requests_mock_datadir, datadir, swh_storage): | def test_opam_loader_many_version( | ||||
tmpdir, requests_mock_datadir, fake_opam_root, datadir, swh_storage | |||||
): | |||||
opam_url = f"file://{datadir}/fake_opam_repo" | opam_url = f"file://{datadir}/fake_opam_repo" | ||||
opam_root = tmpdir | opam_root = fake_opam_root | ||||
opam_instance = "loadertest" | opam_instance = "loadertest" | ||||
opam_package = "directories" | opam_package = "directories" | ||||
url = f"opam+{opam_url}/packages/{opam_package}" | url = f"opam+{opam_url}/packages/{opam_package}" | ||||
loader = OpamLoader( | loader = OpamLoader( | ||||
swh_storage, | swh_storage, | ||||
url, | url, | ||||
opam_root, | opam_root, | ||||
Show All 34 Lines | ): | ||||
assert_last_visit_matches( | assert_last_visit_matches( | ||||
swh_storage, url, status="full", type="opam", snapshot=expected_snapshot_id | swh_storage, url, status="full", type="opam", snapshot=expected_snapshot_id | ||||
) | ) | ||||
check_snapshot(expected_snapshot, swh_storage) | check_snapshot(expected_snapshot, swh_storage) | ||||
def test_opam_release(tmpdir, requests_mock_datadir, swh_storage, datadir): | def test_opam_release( | ||||
tmpdir, requests_mock_datadir, fake_opam_root, swh_storage, datadir | |||||
): | |||||
opam_url = f"file://{datadir}/fake_opam_repo" | opam_url = f"file://{datadir}/fake_opam_repo" | ||||
opam_root = tmpdir | opam_root = fake_opam_root | ||||
opam_instance = "loadertest" | opam_instance = "loadertest" | ||||
opam_package = "ocb" | opam_package = "ocb" | ||||
url = f"opam+{opam_url}/packages/{opam_package}" | url = f"opam+{opam_url}/packages/{opam_package}" | ||||
loader = OpamLoader( | loader = OpamLoader( | ||||
swh_storage, | swh_storage, | ||||
url, | url, | ||||
▲ Show 20 Lines • Show All 51 Lines • ▼ Show 20 Lines | ): | ||||
release = swh_storage.release_get([release_id])[0] | release = swh_storage.release_get([release_id])[0] | ||||
assert release is not None | assert release is not None | ||||
assert release.author == expected_package_info.author | assert release.author == expected_package_info.author | ||||
def test_opam_metadata(tmpdir, requests_mock_datadir, swh_storage, datadir): | def test_opam_metadata( | ||||
tmpdir, requests_mock_datadir, fake_opam_root, swh_storage, datadir | |||||
): | |||||
opam_url = f"file://{datadir}/fake_opam_repo" | opam_url = f"file://{datadir}/fake_opam_repo" | ||||
opam_root = tmpdir | opam_root = fake_opam_root | ||||
opam_instance = "loadertest" | opam_instance = "loadertest" | ||||
opam_package = "ocb" | opam_package = "ocb" | ||||
url = f"opam+{opam_url}/packages/{opam_package}" | url = f"opam+{opam_url}/packages/{opam_package}" | ||||
loader = OpamLoader( | loader = OpamLoader( | ||||
swh_storage, | swh_storage, | ||||
url, | url, | ||||
▲ Show 20 Lines • Show All 58 Lines • Show Last 20 Lines |