Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/opam/tests/test_lister.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import io | import io | ||||
from tempfile import mkdtemp | |||||
from unittest.mock import MagicMock | from unittest.mock import MagicMock | ||||
import pytest | import pytest | ||||
from swh.lister.opam.lister import OpamLister | from swh.lister.opam.lister import OpamLister | ||||
module_name = "swh.lister.opam.lister" | module_name = "swh.lister.opam.lister" | ||||
Show All 13 Lines | def mock_opam(mocker): | ||||
return mock_init, mock_open | return mock_init, mock_open | ||||
def test_lister_opam_optional_instance(swh_scheduler): | def test_lister_opam_optional_instance(swh_scheduler): | ||||
"""Instance name should be optional and default to be built out of the netloc.""" | """Instance name should be optional and default to be built out of the netloc.""" | ||||
netloc = "opam.ocaml.org" | netloc = "opam.ocaml.org" | ||||
instance_url = f"https://{netloc}" | instance_url = f"https://{netloc}" | ||||
lister = OpamLister(swh_scheduler, url=instance_url) | lister = OpamLister(swh_scheduler, url=instance_url,) | ||||
assert lister.instance == netloc | assert lister.instance == netloc | ||||
assert lister.opamroot.endswith(lister.instance) | |||||
def test_urls(swh_scheduler, mock_opam): | def test_urls(swh_scheduler, mock_opam): | ||||
mock_init, mock_popen = mock_opam | mock_init, mock_popen = mock_opam | ||||
instance_url = "https://opam.ocaml.org" | instance_url = "https://opam.ocaml.org" | ||||
lister = OpamLister(swh_scheduler, url=instance_url, instance="opam") | lister = OpamLister( | ||||
swh_scheduler, | |||||
url=instance_url, | |||||
instance="opam", | |||||
opam_root=mkdtemp(prefix="swh_opam_lister"), | |||||
) | |||||
assert lister.instance == "opam" | assert lister.instance == "opam" | ||||
# call the lister and get all listed origins urls | # call the lister and get all listed origins urls | ||||
stats = lister.run() | stats = lister.run() | ||||
assert mock_init.called | assert mock_init.called | ||||
assert mock_popen.called | assert mock_popen.called | ||||
Show All 11 Lines | def test_urls(swh_scheduler, mock_opam): | ||||
result_urls = [origin.url for origin in scheduler_origins] | result_urls = [origin.url for origin in scheduler_origins] | ||||
assert expected_urls == result_urls | assert expected_urls == result_urls | ||||
def test_opam_binary(datadir, swh_scheduler): | def test_opam_binary(datadir, swh_scheduler): | ||||
instance_url = f"file://{datadir}/fake_opam_repo" | instance_url = f"file://{datadir}/fake_opam_repo" | ||||
lister = OpamLister(swh_scheduler, url=instance_url, instance="fake") | lister = OpamLister( | ||||
swh_scheduler, | |||||
url=instance_url, | |||||
instance="fake", | |||||
opam_root=mkdtemp(prefix="swh_opam_lister"), | |||||
) | |||||
stats = lister.run() | stats = lister.run() | ||||
assert stats.pages == 4 | assert stats.pages == 4 | ||||
assert stats.origins == 4 | assert stats.origins == 4 | ||||
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results | scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results | ||||
Show All 10 Lines |