Page MenuHomeSoftware Heritage

test_cli.py
No OneTemporary

test_cli.py

# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from itertools import islice
import logging
import random
import re
import tempfile
from unittest.mock import patch
from click.testing import CliRunner
import pytest
from swh.core.api.classes import stream_results
from swh.model.model import Origin
from swh.scheduler.cli import cli
from swh.scheduler.utils import create_task_dict, utcnow
CLI_CONFIG = """
scheduler:
cls: foo
args: {}
"""
def invoke(scheduler, catch_exceptions, args):
runner = CliRunner()
with patch(
"swh.scheduler.get_scheduler"
) as get_scheduler_mock, tempfile.NamedTemporaryFile(
"a", suffix=".yml"
) as config_fd:
config_fd.write(CLI_CONFIG)
config_fd.seek(0)
get_scheduler_mock.return_value = scheduler
args = ["-C" + config_fd.name,] + args
result = runner.invoke(cli, args, obj={"log_level": logging.WARNING})
if not catch_exceptions and result.exception:
print(result.output)
raise result.exception
return result
def test_schedule_tasks(swh_scheduler):
csv_data = (
b'swh-test-ping;[["arg1", "arg2"]];{"key": "value"};'
+ utcnow().isoformat().encode()
+ b"\n"
+ b'swh-test-ping;[["arg3", "arg4"]];{"key": "value"};'
+ utcnow().isoformat().encode()
+ b"\n"
)
with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd:
csv_fd.write(csv_data)
csv_fd.seek(0)
result = invoke(
swh_scheduler, False, ["task", "schedule", "-d", ";", csv_fd.name]
)
expected = r"""
Created 2 tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
Args:
\['arg1', 'arg2'\]
Keyword args:
key: 'value'
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
Args:
\['arg3', 'arg4'\]
Keyword args:
key: 'value'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_schedule_tasks_columns(swh_scheduler):
with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd:
csv_fd.write(b'swh-test-ping;oneshot;["arg1", "arg2"];{"key": "value"}\n')
csv_fd.seek(0)
result = invoke(
swh_scheduler,
False,
[
"task",
"schedule",
"-c",
"type",
"-c",
"policy",
"-c",
"args",
"-c",
"kwargs",
"-d",
";",
csv_fd.name,
],
)
expected = r"""
Created 1 tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
'arg1'
'arg2'
Keyword args:
key: 'value'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_schedule_task(swh_scheduler):
result = invoke(
swh_scheduler,
False,
["task", "add", "swh-test-ping", "arg1", "arg2", "key=value",],
)
expected = r"""
Created 1 tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
Args:
'arg1'
'arg2'
Keyword args:
key: 'value'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_pending_tasks_none(swh_scheduler):
result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",])
expected = r"""
Found 0 swh-test-ping tasks
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_pending_tasks(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task2["next_run"] += datetime.timedelta(days=1)
swh_scheduler.create_tasks([task1, task2])
result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",])
expected = r"""
Found 1 swh-test-ping tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value1'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
swh_scheduler.grab_ready_tasks("swh-test-ping")
result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",])
expected = r"""
Found 0 swh-test-ping tasks
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_pending_tasks_filter(swh_scheduler):
task = create_task_dict("swh-test-multiping", "oneshot", key="value")
swh_scheduler.create_tasks([task])
result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",])
expected = r"""
Found 0 swh-test-ping tasks
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_pending_tasks_filter_2(swh_scheduler):
swh_scheduler.create_tasks(
[
create_task_dict("swh-test-multiping", "oneshot", key="value"),
create_task_dict("swh-test-ping", "oneshot", key="value2"),
]
)
result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",])
expected = r"""
Found 1 swh-test-ping tasks
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
# Fails because "task list-pending --limit 3" only returns 2 tasks, because
# of how compute_nb_tasks_from works.
@pytest.mark.xfail
def test_list_pending_tasks_limit(swh_scheduler):
swh_scheduler.create_tasks(
[
create_task_dict("swh-test-ping", "oneshot", key="value%d" % i)
for i in range(10)
]
)
result = invoke(
swh_scheduler, False, ["task", "list-pending", "swh-test-ping", "--limit", "3",]
)
expected = r"""
Found 2 swh-test-ping tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value0'
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value1'
Task 3
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_pending_tasks_before(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task1["next_run"] += datetime.timedelta(days=3)
task2["next_run"] += datetime.timedelta(days=1)
swh_scheduler.create_tasks([task1, task2])
result = invoke(
swh_scheduler,
False,
[
"task",
"list-pending",
"swh-test-ping",
"--before",
(datetime.date.today() + datetime.timedelta(days=2)).isoformat(),
],
)
expected = r"""
Found 1 swh-test-ping tasks
Task 2
Next run: tomorrow \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task1["next_run"] += datetime.timedelta(days=3, hours=2)
swh_scheduler.create_tasks([task1, task2])
swh_scheduler.grab_ready_tasks("swh-test-ping")
result = invoke(swh_scheduler, False, ["task", "list",])
expected = r"""
Found 2 tasks
Task 1
Next run: .+ \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value1'
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_id(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task3 = create_task_dict("swh-test-ping", "oneshot", key="value3")
swh_scheduler.create_tasks([task1, task2, task3])
result = invoke(swh_scheduler, False, ["task", "list", "--task-id", "2",])
expected = r"""
Found 1 tasks
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_id_2(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task3 = create_task_dict("swh-test-ping", "oneshot", key="value3")
swh_scheduler.create_tasks([task1, task2, task3])
result = invoke(
swh_scheduler, False, ["task", "list", "--task-id", "2", "--task-id", "3"]
)
expected = r"""
Found 2 tasks
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
Task 3
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value3'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_type(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-multiping", "oneshot", key="value2")
task3 = create_task_dict("swh-test-ping", "oneshot", key="value3")
swh_scheduler.create_tasks([task1, task2, task3])
result = invoke(
swh_scheduler, False, ["task", "list", "--task-type", "swh-test-ping"]
)
expected = r"""
Found 2 tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value1'
Task 3
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value3'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_limit(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task3 = create_task_dict("swh-test-ping", "oneshot", key="value3")
swh_scheduler.create_tasks([task1, task2, task3])
result = invoke(swh_scheduler, False, ["task", "list", "--limit", "2",])
expected = r"""
Found 2 tasks
Task 1
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value1'
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_before(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task1["next_run"] += datetime.timedelta(days=3, hours=2)
swh_scheduler.create_tasks([task1, task2])
swh_scheduler.grab_ready_tasks("swh-test-ping")
result = invoke(
swh_scheduler,
False,
[
"task",
"list",
"--before",
(datetime.date.today() + datetime.timedelta(days=2)).isoformat(),
],
)
expected = r"""
Found 1 tasks
Task 2
Next run: today \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_after(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task1["next_run"] += datetime.timedelta(days=3, hours=2)
swh_scheduler.create_tasks([task1, task2])
swh_scheduler.grab_ready_tasks("swh-test-ping")
result = invoke(
swh_scheduler,
False,
[
"task",
"list",
"--after",
(datetime.date.today() + datetime.timedelta(days=2)).isoformat(),
],
)
expected = r"""
Found 1 tasks
Task 1
Next run: .+ \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value1'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def _fill_storage_with_origins(storage, nb_origins):
origins = [Origin(url=f"http://example.com/{i}") for i in range(nb_origins)]
storage.origin_add(origins)
return origins
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
def test_task_schedule_origins_dry_run(swh_scheduler, storage):
"""Tests the scheduling when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
_fill_storage_with_origins(storage, 90)
result = invoke(
swh_scheduler,
False,
["task", "schedule_origins", "--dry-run", "swh-test-ping",],
)
# Check the output
expected = r"""
Scheduled 3 tasks \(30 origins\).
Scheduled 6 tasks \(60 origins\).
Scheduled 9 tasks \(90 origins\).
Done.
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
# Check scheduled tasks
tasks = swh_scheduler.search_tasks()
assert len(tasks) == 0
def _assert_origin_tasks_contraints(tasks, max_tasks, max_task_size, expected_origins):
# check there are not too many tasks
assert len(tasks) <= max_tasks
# check tasks are not too large
assert all(len(task["arguments"]["args"][0]) <= max_task_size for task in tasks)
# check the tasks are exhaustive
assert sum([len(task["arguments"]["args"][0]) for task in tasks]) == len(
expected_origins
)
assert set.union(*(set(task["arguments"]["args"][0]) for task in tasks)) == {
origin.url for origin in expected_origins
}
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
def test_task_schedule_origins(swh_scheduler, storage):
"""Tests the scheduling when neither origin_batch_size or
task_batch_size is a divisor of nb_origins."""
origins = _fill_storage_with_origins(storage, 70)
result = invoke(
swh_scheduler,
False,
["task", "schedule_origins", "swh-test-ping", "--batch-size", "20",],
)
# Check the output
expected = r"""
Scheduled 3 tasks \(60 origins\).
Scheduled 4 tasks \(70 origins\).
Done.
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
# Check tasks
tasks = swh_scheduler.search_tasks()
_assert_origin_tasks_contraints(tasks, 4, 20, origins)
assert all(task["arguments"]["kwargs"] == {} for task in tasks)
def test_task_schedule_origins_kwargs(swh_scheduler, storage):
"""Tests support of extra keyword-arguments."""
origins = _fill_storage_with_origins(storage, 30)
result = invoke(
swh_scheduler,
False,
[
"task",
"schedule_origins",
"swh-test-ping",
"--batch-size",
"20",
'key1="value1"',
'key2="value2"',
],
)
# Check the output
expected = r"""
Scheduled 2 tasks \(30 origins\).
Done.
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
# Check tasks
tasks = swh_scheduler.search_tasks()
_assert_origin_tasks_contraints(tasks, 2, 20, origins)
assert all(
task["arguments"]["kwargs"] == {"key1": "value1", "key2": "value2"}
for task in tasks
)
def test_task_schedule_origins_with_limit(swh_scheduler, storage):
"""Tests support of extra keyword-arguments."""
_fill_storage_with_origins(storage, 50)
limit = 20
expected_origins = list(islice(stream_results(storage.origin_list), limit))
nb_origins = len(expected_origins)
assert nb_origins == limit
max_task_size = 5
nb_tasks, remainder = divmod(nb_origins, max_task_size)
assert remainder == 0 # made the numbers go round
result = invoke(
swh_scheduler,
False,
[
"task",
"schedule_origins",
"swh-test-ping",
"--batch-size",
max_task_size,
"--limit",
limit,
],
)
# Check the output
expected = rf"""
Scheduled {nb_tasks} tasks \({nb_origins} origins\).
Done.
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
tasks = swh_scheduler.search_tasks()
_assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins)
def test_task_schedule_origins_with_page_token(swh_scheduler, storage):
"""Tests support of extra keyword-arguments."""
nb_total_origins = 50
origins = _fill_storage_with_origins(storage, nb_total_origins)
# prepare page_token and origins result expectancy
page_result = storage.origin_list(limit=10)
assert len(page_result.results) == 10
page_token = page_result.next_page_token
assert page_token is not None
# remove the first 10 origins listed as we won't see those in tasks
expected_origins = [o for o in origins if o not in page_result.results]
nb_origins = len(expected_origins)
assert nb_origins == nb_total_origins - len(page_result.results)
max_task_size = 10
nb_tasks, remainder = divmod(nb_origins, max_task_size)
assert remainder == 0
result = invoke(
swh_scheduler,
False,
[
"task",
"schedule_origins",
"swh-test-ping",
"--batch-size",
max_task_size,
"--page-token",
page_token,
],
)
# Check the output
expected = rf"""
Scheduled {nb_tasks} tasks \({nb_origins} origins\).
Done.
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
# Check tasks
tasks = swh_scheduler.search_tasks()
_assert_origin_tasks_contraints(tasks, max_task_size, nb_origins, expected_origins)
def test_cli_task_runner_unknown_task_types(swh_scheduler, storage):
"""When passing at least one unknown task type, the runner should fail."""
task_types = swh_scheduler.get_task_types()
task_type_names = [t["type"] for t in task_types]
known_task_type = random.choice(task_type_names)
unknown_task_type = "unknown-task-type"
assert unknown_task_type not in task_type_names
with pytest.raises(ValueError, match="Unknown"):
invoke(
swh_scheduler,
False,
[
"start-runner",
"--task-type",
known_task_type,
"--task-type",
unknown_task_type,
],
)
@pytest.mark.parametrize("flag_priority", ["--with-priority", "--without-priority"])
def test_cli_task_runner_with_known_tasks(
swh_scheduler, storage, caplog, flag_priority
):
"""Trigger runner with known tasks runs smoothly."""
task_types = swh_scheduler.get_task_types()
task_type_names = [t["type"] for t in task_types]
task_type_name = random.choice(task_type_names)
task_type_name2 = random.choice(task_type_names)
# The runner will just iterate over the following known tasks and do noop. We are
# just checking the runner does not explode here.
result = invoke(
swh_scheduler,
False,
[
"start-runner",
flag_priority,
"--task-type",
task_type_name,
"--task-type",
task_type_name2,
],
)
assert result.exit_code == 0, result.output
def test_cli_task_runner_no_task(swh_scheduler, storage):
"""Trigger runner with no parameter should run as before."""
# The runner will just iterate over the existing tasks from the scheduler and do
# noop. We are just checking the runner does not explode here.
result = invoke(swh_scheduler, False, ["start-runner",],)
assert result.exit_code == 0, result.output

File Metadata

Mime Type
text/x-python
Expires
Tue, Apr 15, 2:21 AM (2 d, 8 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3285970

Event Timeline