Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/opam/tests/test_lister.py
Show First 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | def test_mock_init_repository_init(mock_opam, tmp_path, datadir): | ||||
assert mock_init.called | assert mock_init.called | ||||
def test_mock_init_repository_update(mock_opam, tmp_path, datadir): | def test_mock_init_repository_update(mock_opam, tmp_path, datadir): | ||||
"""Updating opam root directory with another instance should be ok""" | """Updating opam root directory with another instance should be ok""" | ||||
mock_init, mock_popen = mock_opam | mock_init, mock_popen = mock_opam | ||||
instance = "fake_opam_repo" | instance = "fake_opam_repo" | ||||
instance_url = f"file://{datadir}/{instance}" | instance_url = f"http://example.org/{instance}" | ||||
opam_root = str(tmp_path / "test-opam") | opam_root = str(tmp_path / "test-opam") | ||||
os.makedirs(opam_root, exist_ok=True) | os.makedirs(opam_root, exist_ok=True) | ||||
with open(os.path.join(opam_root, "opam"), "w") as f: | with open(os.path.join(opam_root, "opam"), "w") as f: | ||||
f.write("one file to avoid empty folder") | f.write("one file to avoid empty folder") | ||||
assert os.path.exists(opam_root) | assert os.path.exists(opam_root) | ||||
assert os.listdir(opam_root) == ["opam"] # not empty | assert os.listdir(opam_root) == ["opam"] # not empty | ||||
▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | expected_urls = [ | ||||
f"opam+{instance_url}/packages/foo/", | f"opam+{instance_url}/packages/foo/", | ||||
] | ] | ||||
result_urls = [origin.url for origin in scheduler_origins] | result_urls = [origin.url for origin in scheduler_origins] | ||||
assert expected_urls == result_urls | assert expected_urls == result_urls | ||||
def test_opam_binary(datadir, swh_scheduler, tmp_path): | def test_opam_binary(datadir, swh_scheduler, tmp_path, mocker): | ||||
instance_url = f"file://{datadir}/fake_opam_repo" | from swh.lister.opam.lister import opam_init | ||||
instance_url = "http://example.org/fake_opam_repo" | |||||
def mock_opam_init(opam_root, instance, url, env): | |||||
assert url == instance_url | |||||
return opam_init(opam_root, instance, f"{datadir}/fake_opam_repo", env) | |||||
# Patch opam_init to use the local directory | |||||
mocker.patch("swh.lister.opam.lister.opam_init", side_effect=mock_opam_init) | |||||
lister = OpamLister( | lister = OpamLister( | ||||
swh_scheduler, | swh_scheduler, | ||||
url=instance_url, | url=instance_url, | ||||
instance="fake", | instance="fake", | ||||
opam_root=mkdtemp(dir=tmp_path, prefix="swh_opam_lister"), | opam_root=mkdtemp(dir=tmp_path, prefix="swh_opam_lister"), | ||||
) | ) | ||||
Show All 11 Lines | expected_urls = [ | ||||
f"opam+{instance_url}/packages/ocb/", | f"opam+{instance_url}/packages/ocb/", | ||||
] | ] | ||||
result_urls = [origin.url for origin in scheduler_origins] | result_urls = [origin.url for origin in scheduler_origins] | ||||
assert expected_urls == result_urls | assert expected_urls == result_urls | ||||
def test_opam_multi_instance(datadir, swh_scheduler, tmp_path): | def test_opam_multi_instance(datadir, swh_scheduler, tmp_path, mocker): | ||||
instance_url = f"file://{datadir}/fake_opam_repo" | from swh.lister.opam.lister import opam_init | ||||
instance_url = "http://example.org/fake_opam_repo" | |||||
def mock_opam_init(opam_root, instance, url, env): | |||||
assert url == instance_url | |||||
return opam_init(opam_root, instance, f"{datadir}/fake_opam_repo", env) | |||||
# Patch opam_init to use the local directory | |||||
mocker.patch("swh.lister.opam.lister.opam_init", side_effect=mock_opam_init) | |||||
lister = OpamLister( | lister = OpamLister( | ||||
swh_scheduler, | swh_scheduler, | ||||
url=instance_url, | url=instance_url, | ||||
instance="fake", | instance="fake", | ||||
opam_root=mkdtemp(dir=tmp_path, prefix="swh_opam_lister"), | opam_root=mkdtemp(dir=tmp_path, prefix="swh_opam_lister"), | ||||
) | ) | ||||
Show All 17 Lines |