Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/gitea/tests/test_tasks.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from time import sleep | from unittest.mock import patch | ||||
from unittest.mock import call, patch | |||||
from celery.result import GroupResult | from swh.lister.pattern import ListerStats | ||||
from swh.lister.gitea.tasks import NBPAGES | |||||
from swh.lister.utils import split_range | |||||
def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker): | def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker): | ||||
res = swh_scheduler_celery_app.send_task("swh.lister.gitea.tasks.ping") | res = swh_scheduler_celery_app.send_task("swh.lister.gitea.tasks.ping") | ||||
assert res | assert res | ||||
res.wait() | res.wait() | ||||
assert res.successful() | assert res.successful() | ||||
assert res.result == "OK" | assert res.result == "OK" | ||||
@patch("swh.lister.gitea.tasks.GiteaLister") | @patch("swh.lister.gitea.tasks.GiteaLister") | ||||
def test_incremental(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker): | def test_full_listing(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker): | ||||
# setup the mocked GiteaLister | lister.from_configfile.return_value = lister | ||||
lister.return_value = lister | lister.run.return_value = ListerStats(pages=10, origins=500) | ||||
lister.run.return_value = None | |||||
lister.get_pages_information.return_value = (None, 10, None) | |||||
kwargs = dict(url="https://try.gitea.io/api/v1") | |||||
res = swh_scheduler_celery_app.send_task( | res = swh_scheduler_celery_app.send_task( | ||||
"swh.lister.gitea.tasks.IncrementalGiteaLister" | "swh.lister.gitea.tasks.FullGiteaRelister", kwargs=kwargs, | ||||
) | ) | ||||
assert res | assert res | ||||
res.wait() | res.wait() | ||||
assert res.successful() | assert res.successful() | ||||
lister.assert_called_once_with(order="desc") | actual_kwargs = dict(**kwargs, instance=None, api_token=None, page_size=None) | ||||
lister.db_last_index.assert_not_called() | |||||
lister.get_pages_information.assert_called_once_with() | |||||
lister.run.assert_called_once_with(min_bound=1, max_bound=10, check_existence=True) | |||||
@patch("swh.lister.gitea.tasks.GiteaLister") | |||||
def test_range(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker): | |||||
# setup the mocked GiteaLister | |||||
lister.return_value = lister | |||||
lister.run.return_value = None | |||||
res = swh_scheduler_celery_app.send_task( | |||||
"swh.lister.gitea.tasks.RangeGiteaLister", kwargs=dict(start=12, end=42) | |||||
) | |||||
assert res | |||||
res.wait() | |||||
assert res.successful() | |||||
lister.assert_called_once_with() | lister.from_configfile.assert_called_once_with(**actual_kwargs) | ||||
lister.db_last_index.assert_not_called() | lister.run.assert_called_once_with() | ||||
lister.run.assert_called_once_with(min_bound=12, max_bound=42) | |||||
@patch("swh.lister.gitea.tasks.GiteaLister") | @patch("swh.lister.gitea.tasks.GiteaLister") | ||||
def test_relister(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker): | def test_full_listing_params( | ||||
total_pages = 85 | |||||
# setup the mocked GiteaLister | |||||
lister.return_value = lister | |||||
lister.run.return_value = None | |||||
lister.get_pages_information.return_value = (None, total_pages, None) | |||||
res = swh_scheduler_celery_app.send_task("swh.lister.gitea.tasks.FullGiteaRelister") | |||||
assert res | |||||
res.wait() | |||||
assert res.successful() | |||||
# retrieve the GroupResult for this task and wait for all the subtasks | |||||
# to complete | |||||
promise_id = res.result | |||||
assert promise_id | |||||
promise = GroupResult.restore(promise_id, app=swh_scheduler_celery_app) | |||||
for i in range(5): | |||||
if promise.ready(): | |||||
break | |||||
sleep(1) | |||||
lister.assert_called_with() | |||||
# one by the FullGiteaRelister task | |||||
# + 9 for the RangeGiteaLister subtasks | |||||
assert lister.call_count == 10 | |||||
lister.db_last_index.assert_not_called() | |||||
lister.db_partition_indices.assert_not_called() | |||||
lister.get_pages_information.assert_called_once_with() | |||||
# lister.run should have been called once per partition interval | |||||
for min_bound, max_bound in split_range(total_pages, NBPAGES): | |||||
assert ( | |||||
call(min_bound=min_bound, max_bound=max_bound) in lister.run.call_args_list | |||||
) | |||||
@patch("swh.lister.gitea.tasks.GiteaLister") | |||||
def test_relister_instance( | |||||
lister, swh_scheduler_celery_app, swh_scheduler_celery_worker | lister, swh_scheduler_celery_app, swh_scheduler_celery_worker | ||||
): | ): | ||||
total_pages = 85 | lister.from_configfile.return_value = lister | ||||
# setup the mocked GiteaLister | lister.run.return_value = ListerStats(pages=10, origins=500) | ||||
lister.return_value = lister | |||||
lister.run.return_value = None | |||||
lister.get_pages_information.return_value = (None, total_pages, None) | |||||
kwargs = dict( | |||||
url="https://0xacab.org/api/v4", | |||||
instance="0xacab", | |||||
api_token="test", | |||||
page_size=50, | |||||
) | |||||
res = swh_scheduler_celery_app.send_task( | res = swh_scheduler_celery_app.send_task( | ||||
"swh.lister.gitea.tasks.FullGiteaRelister", | "swh.lister.gitea.tasks.FullGiteaRelister", kwargs=kwargs, | ||||
kwargs=dict(url="https://0xacab.org/api/v4"), | |||||
) | ) | ||||
assert res | assert res | ||||
res.wait() | res.wait() | ||||
assert res.successful() | assert res.successful() | ||||
# retrieve the GroupResult for this task and wait for all the subtasks | lister.from_configfile.assert_called_once_with(**kwargs) | ||||
# to complete | lister.run.assert_called_once_with() | ||||
promise_id = res.result | |||||
assert promise_id | |||||
promise = GroupResult.restore(promise_id, app=swh_scheduler_celery_app) | |||||
for i in range(5): | |||||
if promise.ready(): | |||||
break | |||||
sleep(1) | |||||
lister.assert_called_with(url="https://0xacab.org/api/v4") | |||||
# one by the FullGiteaRelister task | |||||
# + 9 for the RangeGiteaLister subtasks | |||||
assert lister.call_count == 10 | |||||
lister.db_last_index.assert_not_called() | |||||
lister.db_partition_indices.assert_not_called() | |||||
lister.get_pages_information.assert_called_once_with() | |||||
# lister.run should have been called once per partition interval | |||||
for min_bound, max_bound in split_range(total_pages, NBPAGES): | |||||
assert ( | |||||
call(min_bound=min_bound, max_bound=max_bound) in lister.run.call_args_list | |||||
) |