Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/cgit/tests/test_tasks.py
Show All 16 Lines | |||||
def test_cgit_lister_task( | def test_cgit_lister_task( | ||||
swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker | swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker | ||||
): | ): | ||||
# setup the mocked CGitLister | # setup the mocked CGitLister | ||||
lister = mocker.patch("swh.lister.cgit.tasks.CGitLister") | lister = mocker.patch("swh.lister.cgit.tasks.CGitLister") | ||||
lister.from_configfile.return_value = lister | lister.from_configfile.return_value = lister | ||||
lister.run.return_value = ListerStats(pages=10, origins=500) | lister.run.return_value = ListerStats(pages=10, origins=500) | ||||
kwargs = dict(url="https://git.kernel.org/", instance="kernel") | kwargs = dict(url="https://git.kernel.org/", instance="kernel", base_git_url=None) | ||||
res = swh_scheduler_celery_app.send_task( | res = swh_scheduler_celery_app.send_task( | ||||
"swh.lister.cgit.tasks.CGitListerTask", kwargs=kwargs, | "swh.lister.cgit.tasks.CGitListerTask", kwargs=kwargs, | ||||
) | ) | ||||
assert res | assert res | ||||
res.wait() | res.wait() | ||||
assert res.successful() | assert res.successful() | ||||
lister.from_configfile.assert_called_once_with(**kwargs) | lister.from_configfile.assert_called_once_with(**kwargs) | ||||
lister.run.assert_called_once_with() | lister.run.assert_called_once_with() |