Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/launchpad/tests/test_tasks.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from unittest.mock import patch | from swh.lister.pattern import ListerStats | ||||
@patch("swh.lister.launchpad.tasks.LaunchpadLister") | def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker): | ||||
def test_new(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker): | res = swh_scheduler_celery_app.send_task("swh.lister.launchpad.tasks.ping") | ||||
# setup the mocked LaunchpadLister | assert res | ||||
lister.return_value = lister | res.wait() | ||||
lister.run.return_value = None | assert res.successful() | ||||
assert res.result == "OK" | |||||
def test_launchpad_full_listing_task( | |||||
swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker | |||||
): | |||||
lister = mocker.patch("swh.lister.launchpad.tasks.LaunchpadLister") | |||||
lister.from_configfile.return_value = lister | |||||
stats = ListerStats(pages=1, origins=28000) | |||||
lister.run.return_value = stats | |||||
res = swh_scheduler_celery_app.send_task( | res = swh_scheduler_celery_app.send_task( | ||||
"swh.lister.launchpad.tasks.NewLaunchpadLister" | "swh.lister.launchpad.tasks.FullLaunchpadLister" | ||||
) | ) | ||||
assert res | assert res | ||||
res.wait() | res.wait() | ||||
assert res.successful() | assert res.successful() | ||||
assert res.result == stats.dict() | |||||
assert lister.call_count == 2 | lister.from_configfile.assert_called_once_with() | ||||
lister.db_last_threshold.assert_called_once() | lister.run.assert_called_once_with() | ||||
lister.run.assert_called_once() | |||||
@patch("swh.lister.launchpad.tasks.LaunchpadLister") | def test_launchpad_incremental_listing_task( | ||||
def test_full(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker): | swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker | ||||
# setup the mocked LaunchpadLister | ): | ||||
lister.return_value = lister | lister = mocker.patch("swh.lister.launchpad.tasks.LaunchpadLister") | ||||
lister.run.return_value = None | lister.from_configfile.return_value = lister | ||||
stats = ListerStats(pages=1, origins=200) | |||||
lister.run.return_value = stats | |||||
res = swh_scheduler_celery_app.send_task( | res = swh_scheduler_celery_app.send_task( | ||||
"swh.lister.launchpad.tasks.FullLaunchpadLister" | "swh.lister.launchpad.tasks.IncrementalLaunchpadLister" | ||||
) | ) | ||||
assert res | assert res | ||||
res.wait() | res.wait() | ||||
assert res.successful() | assert res.successful() | ||||
assert res.result == stats.dict() | |||||
lister.assert_called_once() | lister.from_configfile.assert_called_once_with(incremental=True) | ||||
lister.db_last_threshold.assert_not_called() | lister.run.assert_called_once_with() | ||||
lister.run.assert_called_once_with(max_bound=None) |