Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_cli.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import re | import re | ||||
import tempfile | import tempfile | ||||
from unittest.mock import patch | |||||
import logging | import logging | ||||
from itertools import islice | |||||
from unittest.mock import patch | |||||
from click.testing import CliRunner | from click.testing import CliRunner | ||||
import pytest | import pytest | ||||
from swh.model.model import Origin | from swh.model.model import Origin | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.scheduler.cli import cli | from swh.scheduler.cli import cli | ||||
from swh.scheduler.utils import create_task_dict | from swh.scheduler.utils import create_task_dict | ||||
from swh.core.api.classes import stream_results | |||||
CLI_CONFIG = """ | CLI_CONFIG = """ | ||||
scheduler: | scheduler: | ||||
cls: foo | cls: foo | ||||
args: {} | args: {} | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 674 Lines • ▼ Show 20 Lines | """.lstrip() | ||||
# Check tasks | # Check tasks | ||||
tasks = swh_scheduler.search_tasks() | tasks = swh_scheduler.search_tasks() | ||||
_assert_origin_tasks_contraints(tasks, 2, 20, origins) | _assert_origin_tasks_contraints(tasks, 2, 20, origins) | ||||
assert all( | assert all( | ||||
task["arguments"]["kwargs"] == {"key1": "value1", "key2": "value2"} | task["arguments"]["kwargs"] == {"key1": "value1", "key2": "value2"} | ||||
for task in tasks | for task in tasks | ||||
) | ) | ||||
def test_task_schedule_origins_with_limit(swh_scheduler, storage): | |||||
"""Tests support of extra keyword-arguments.""" | |||||
_fill_storage_with_origins(storage, 50) | |||||
limit = 20 | |||||
expected_origins = list(islice(stream_results(storage.origin_list), limit)) | |||||
nb_origins = len(expected_origins) | |||||
assert nb_origins == limit | |||||
max_task_size = 5 | |||||
nb_tasks, remainder = divmod(nb_origins, max_task_size) | |||||
assert remainder == 0 # made the numbers go round | |||||
result = invoke( | |||||
swh_scheduler, | |||||
False, | |||||
[ | |||||
"task", | |||||
"schedule_origins", | |||||
"swh-test-ping", | |||||
"--batch-size", | |||||
max_task_size, | |||||
"--limit", | |||||
limit, | |||||
], | |||||
) | |||||
# Check the output | |||||
expected = rf""" | |||||
Scheduled {nb_tasks} tasks \({nb_origins} origins\). | |||||
Done. | |||||
""".lstrip() | |||||
assert result.exit_code == 0, result.output | |||||
assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) | |||||
tasks = swh_scheduler.search_tasks() | |||||
_assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins) | |||||
def test_task_schedule_origins_with_page_token(swh_scheduler, storage): | |||||
"""Tests support of extra keyword-arguments.""" | |||||
nb_total_origins = 50 | |||||
origins = _fill_storage_with_origins(storage, nb_total_origins) | |||||
# prepare page_token and origins result expectancy | |||||
page_result = storage.origin_list(limit=10) | |||||
assert len(page_result.results) == 10 | |||||
page_token = page_result.next_page_token | |||||
assert page_token is not None | |||||
# remove the first 10 origins listed as we won't see those in tasks | |||||
expected_origins = [o for o in origins if o not in page_result.results] | |||||
nb_origins = len(expected_origins) | |||||
assert nb_origins == nb_total_origins - len(page_result.results) | |||||
max_task_size = 10 | |||||
nb_tasks, remainder = divmod(nb_origins, max_task_size) | |||||
assert remainder == 0 | |||||
result = invoke( | |||||
swh_scheduler, | |||||
False, | |||||
[ | |||||
"task", | |||||
"schedule_origins", | |||||
"swh-test-ping", | |||||
"--batch-size", | |||||
max_task_size, | |||||
"--page-token", | |||||
page_token, | |||||
], | |||||
) | |||||
# Check the output | |||||
expected = rf""" | |||||
Scheduled {nb_tasks} tasks \({nb_origins} origins\). | |||||
Done. | |||||
""".lstrip() | |||||
assert result.exit_code == 0, result.output | |||||
assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output) | |||||
# Check tasks | |||||
tasks = swh_scheduler.search_tasks() | |||||
_assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins) |