Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_recurrent_visits.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import timedelta | from datetime import timedelta | ||||
import logging | import logging | ||||
from queue import Queue | from queue import Queue | ||||
from unittest.mock import MagicMock | from unittest.mock import MagicMock | ||||
import pytest | import pytest | ||||
from swh.scheduler.celery_backend.recurrent_visits import ( | from swh.scheduler.celery_backend.recurrent_visits import ( | ||||
POLICY_ADDITIONAL_PARAMETERS, | |||||
VisitSchedulerThreads, | VisitSchedulerThreads, | ||||
grab_next_visits_policy_weights, | |||||
send_visits_for_visit_type, | send_visits_for_visit_type, | ||||
spawn_visit_scheduler_thread, | spawn_visit_scheduler_thread, | ||||
terminate_visit_scheduler_threads, | terminate_visit_scheduler_threads, | ||||
visit_scheduler_thread, | visit_scheduler_thread, | ||||
) | ) | ||||
from .test_cli import invoke | from .test_cli import invoke | ||||
▲ Show 20 Lines • Show All 104 Lines • ▼ Show 20 Lines | for task_type in task_types: | ||||
f"in queue {queue_name}" | f"in queue {queue_name}" | ||||
) | ) | ||||
expected_records.add(msg) | expected_records.add(msg) | ||||
for expected_record in expected_records: | for expected_record in expected_records: | ||||
assert expected_record in set(records) | assert expected_record in set(records) | ||||
@pytest.mark.parametrize( | |||||
"visit_type, tablesamples", | |||||
[("hg", {}), ("git", POLICY_ADDITIONAL_PARAMETERS["git"])], | |||||
) | |||||
def test_recurrent_visit_additional_parameters( | |||||
swh_scheduler, mocker, visit_type, tablesamples | |||||
): | |||||
"""Testing additional policy parameters""" | |||||
mock_grab_next_visits = mocker.patch.object(swh_scheduler, "grab_next_visits") | |||||
mock_grab_next_visits.return_value = [] | |||||
grab_next_visits_policy_weights(swh_scheduler, visit_type, 10) | |||||
for call in mock_grab_next_visits.call_args_list: | |||||
assert call[1].get("tablesample") == tablesamples.get( | |||||
call[1]["policy"], {} | |||||
).get("tablesample") | |||||
@pytest.fixture | @pytest.fixture | ||||
def scheduler_config(swh_scheduler_config): | def scheduler_config(swh_scheduler_config): | ||||
return {"scheduler": {"cls": "local", **swh_scheduler_config}, "celery": {}} | return {"scheduler": {"cls": "local", **swh_scheduler_config}, "celery": {}} | ||||
def test_visit_scheduler_thread_unknown_task( | def test_visit_scheduler_thread_unknown_task( | ||||
swh_scheduler, scheduler_config, | swh_scheduler, scheduler_config, | ||||
): | ): | ||||
Show All 38 Lines |