Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F8322541
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
19 KB
Subscribers
None
View Options
diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py
index 3744482..22f1ead 100644
--- a/swh/scheduler/tests/test_cli.py
+++ b/swh/scheduler/tests/test_cli.py
@@ -1,712 +1,709 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import re
import tempfile
from unittest.mock import patch
import logging
from click.testing import CliRunner
import pytest
+from swh.model.model import Origin
from swh.storage import get_storage
from swh.scheduler.cli import cli
from swh.scheduler.utils import create_task_dict
CLI_CONFIG = """
scheduler:
cls: foo
args: {}
"""
def invoke(scheduler, catch_exceptions, args):
runner = CliRunner()
with patch(
"swh.scheduler.get_scheduler"
) as get_scheduler_mock, tempfile.NamedTemporaryFile(
"a", suffix=".yml"
) as config_fd:
config_fd.write(CLI_CONFIG)
config_fd.seek(0)
get_scheduler_mock.return_value = scheduler
args = ["-C" + config_fd.name,] + args
result = runner.invoke(cli, args, obj={"log_level": logging.WARNING})
if not catch_exceptions and result.exception:
print(result.output)
raise result.exception
return result
def test_schedule_tasks(swh_scheduler):
csv_data = (
b'swh-test-ping;[["arg1", "arg2"]];{"key": "value"};'
+ datetime.datetime.utcnow().isoformat().encode()
+ b"\n"
+ b'swh-test-ping;[["arg3", "arg4"]];{"key": "value"};'
+ datetime.datetime.utcnow().isoformat().encode()
+ b"\n"
)
with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd:
csv_fd.write(csv_data)
csv_fd.seek(0)
result = invoke(
swh_scheduler, False, ["task", "schedule", "-d", ";", csv_fd.name]
)
expected = r"""
Created 2 tasks
Task 1
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
Args:
\['arg1', 'arg2'\]
Keyword args:
key: 'value'
Task 2
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
Args:
\['arg3', 'arg4'\]
Keyword args:
key: 'value'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_schedule_tasks_columns(swh_scheduler):
with tempfile.NamedTemporaryFile(suffix=".csv") as csv_fd:
csv_fd.write(b'swh-test-ping;oneshot;["arg1", "arg2"];{"key": "value"}\n')
csv_fd.seek(0)
result = invoke(
swh_scheduler,
False,
[
"task",
"schedule",
"-c",
"type",
"-c",
"policy",
"-c",
"args",
"-c",
"kwargs",
"-d",
";",
csv_fd.name,
],
)
expected = r"""
Created 1 tasks
Task 1
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
'arg1'
'arg2'
Keyword args:
key: 'value'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_schedule_task(swh_scheduler):
result = invoke(
swh_scheduler,
False,
["task", "add", "swh-test-ping", "arg1", "arg2", "key=value",],
)
expected = r"""
Created 1 tasks
Task 1
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: recurring
Args:
'arg1'
'arg2'
Keyword args:
key: 'value'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_pending_tasks_none(swh_scheduler):
result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",])
expected = r"""
Found 0 swh-test-ping tasks
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_pending_tasks(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task2["next_run"] += datetime.timedelta(days=1)
swh_scheduler.create_tasks([task1, task2])
result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",])
expected = r"""
Found 1 swh-test-ping tasks
Task 1
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value1'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
swh_scheduler.grab_ready_tasks("swh-test-ping")
result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",])
expected = r"""
Found 0 swh-test-ping tasks
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_pending_tasks_filter(swh_scheduler):
task = create_task_dict("swh-test-multiping", "oneshot", key="value")
swh_scheduler.create_tasks([task])
result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",])
expected = r"""
Found 0 swh-test-ping tasks
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_pending_tasks_filter_2(swh_scheduler):
swh_scheduler.create_tasks(
[
create_task_dict("swh-test-multiping", "oneshot", key="value"),
create_task_dict("swh-test-ping", "oneshot", key="value2"),
]
)
result = invoke(swh_scheduler, False, ["task", "list-pending", "swh-test-ping",])
expected = r"""
Found 1 swh-test-ping tasks
Task 2
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
# Fails because "task list-pending --limit 3" only returns 2 tasks, because
# of how compute_nb_tasks_from works.
@pytest.mark.xfail
def test_list_pending_tasks_limit(swh_scheduler):
swh_scheduler.create_tasks(
[
create_task_dict("swh-test-ping", "oneshot", key="value%d" % i)
for i in range(10)
]
)
result = invoke(
swh_scheduler, False, ["task", "list-pending", "swh-test-ping", "--limit", "3",]
)
expected = r"""
Found 2 swh-test-ping tasks
Task 1
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value0'
Task 2
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value1'
Task 3
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_pending_tasks_before(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task1["next_run"] += datetime.timedelta(days=3)
task2["next_run"] += datetime.timedelta(days=1)
swh_scheduler.create_tasks([task1, task2])
result = invoke(
swh_scheduler,
False,
[
"task",
"list-pending",
"swh-test-ping",
"--before",
(datetime.date.today() + datetime.timedelta(days=2)).isoformat(),
],
)
expected = r"""
Found 1 swh-test-ping tasks
Task 2
Next run: in a day \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task1["next_run"] += datetime.timedelta(days=3, hours=2)
swh_scheduler.create_tasks([task1, task2])
swh_scheduler.grab_ready_tasks("swh-test-ping")
result = invoke(swh_scheduler, False, ["task", "list",])
expected = r"""
Found 2 tasks
Task 1
Next run: in 3 days \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value1'
Task 2
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_id(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task3 = create_task_dict("swh-test-ping", "oneshot", key="value3")
swh_scheduler.create_tasks([task1, task2, task3])
result = invoke(swh_scheduler, False, ["task", "list", "--task-id", "2",])
expected = r"""
Found 1 tasks
Task 2
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_id_2(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task3 = create_task_dict("swh-test-ping", "oneshot", key="value3")
swh_scheduler.create_tasks([task1, task2, task3])
result = invoke(
swh_scheduler, False, ["task", "list", "--task-id", "2", "--task-id", "3"]
)
expected = r"""
Found 2 tasks
Task 2
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
Task 3
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value3'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_type(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-multiping", "oneshot", key="value2")
task3 = create_task_dict("swh-test-ping", "oneshot", key="value3")
swh_scheduler.create_tasks([task1, task2, task3])
result = invoke(
swh_scheduler, False, ["task", "list", "--task-type", "swh-test-ping"]
)
expected = r"""
Found 2 tasks
Task 1
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value1'
Task 3
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value3'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_limit(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task3 = create_task_dict("swh-test-ping", "oneshot", key="value3")
swh_scheduler.create_tasks([task1, task2, task3])
result = invoke(swh_scheduler, False, ["task", "list", "--limit", "2",])
expected = r"""
Found 2 tasks
Task 1
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value1'
Task 2
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_before(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task1["next_run"] += datetime.timedelta(days=3, hours=2)
swh_scheduler.create_tasks([task1, task2])
swh_scheduler.grab_ready_tasks("swh-test-ping")
result = invoke(
swh_scheduler,
False,
[
"task",
"list",
"--before",
(datetime.date.today() + datetime.timedelta(days=2)).isoformat(),
],
)
expected = r"""
Found 1 tasks
Task 2
Next run: just now \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value2'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def test_list_tasks_after(swh_scheduler):
task1 = create_task_dict("swh-test-ping", "oneshot", key="value1")
task2 = create_task_dict("swh-test-ping", "oneshot", key="value2")
task1["next_run"] += datetime.timedelta(days=3, hours=2)
swh_scheduler.create_tasks([task1, task2])
swh_scheduler.grab_ready_tasks("swh-test-ping")
result = invoke(
swh_scheduler,
False,
[
"task",
"list",
"--after",
(datetime.date.today() + datetime.timedelta(days=2)).isoformat(),
],
)
expected = r"""
Found 1 tasks
Task 1
Next run: in 3 days \(.*\)
Interval: 1 day, 0:00:00
Type: swh-test-ping
Policy: oneshot
Status: next_run_not_scheduled
Priority:\x20
Args:
Keyword args:
key: 'value1'
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
def _fill_storage_with_origins(storage, nb_origins):
- origins = [{"url": "http://example.com/{}".format(i),} for i in range(nb_origins)]
+ origins = [Origin(url=f"http://example.com/{i}") for i in range(nb_origins)]
storage.origin_add(origins)
return origins
@pytest.fixture
def storage():
"""An instance of in-memory storage that gets injected
into the CLI functions."""
- storage_config = {
- "cls": "pipeline",
- "steps": [{"cls": "validate"}, {"cls": "memory"},],
- }
- storage = get_storage(**storage_config)
+ storage = get_storage(cls="memory")
with patch("swh.storage.get_storage") as get_storage_mock:
get_storage_mock.return_value = storage
yield storage
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
def test_task_schedule_origins_dry_run(swh_scheduler, storage):
"""Tests the scheduling when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
_fill_storage_with_origins(storage, 90)
result = invoke(
swh_scheduler,
False,
["task", "schedule_origins", "--dry-run", "swh-test-ping",],
)
# Check the output
expected = r"""
Scheduled 3 tasks \(30 origins\).
Scheduled 6 tasks \(60 origins\).
Scheduled 9 tasks \(90 origins\).
Done.
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
# Check scheduled tasks
tasks = swh_scheduler.search_tasks()
assert len(tasks) == 0
def _assert_origin_tasks_contraints(tasks, max_tasks, max_task_size, expected_origins):
# check there are not too many tasks
assert len(tasks) <= max_tasks
# check tasks are not too large
assert all(len(task["arguments"]["args"][0]) <= max_task_size for task in tasks)
# check the tasks are exhaustive
assert sum([len(task["arguments"]["args"][0]) for task in tasks]) == len(
expected_origins
)
assert set.union(*(set(task["arguments"]["args"][0]) for task in tasks)) == {
- origin["url"] for origin in expected_origins
+ origin.url for origin in expected_origins
}
@patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3)
def test_task_schedule_origins(swh_scheduler, storage):
"""Tests the scheduling when neither origin_batch_size or
task_batch_size is a divisor of nb_origins."""
origins = _fill_storage_with_origins(storage, 70)
result = invoke(
swh_scheduler,
False,
["task", "schedule_origins", "swh-test-ping", "--batch-size", "20",],
)
# Check the output
expected = r"""
Scheduled 3 tasks \(60 origins\).
Scheduled 4 tasks \(70 origins\).
Done.
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
# Check tasks
tasks = swh_scheduler.search_tasks()
_assert_origin_tasks_contraints(tasks, 4, 20, origins)
assert all(task["arguments"]["kwargs"] == {} for task in tasks)
def test_task_schedule_origins_kwargs(swh_scheduler, storage):
"""Tests support of extra keyword-arguments."""
origins = _fill_storage_with_origins(storage, 30)
result = invoke(
swh_scheduler,
False,
[
"task",
"schedule_origins",
"swh-test-ping",
"--batch-size",
"20",
'key1="value1"',
'key2="value2"',
],
)
# Check the output
expected = r"""
Scheduled 2 tasks \(30 origins\).
Done.
""".lstrip()
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), repr(result.output)
# Check tasks
tasks = swh_scheduler.search_tasks()
_assert_origin_tasks_contraints(tasks, 2, 20, origins)
assert all(
task["arguments"]["kwargs"] == {"key1": "value1", "key2": "value2"}
for task in tasks
)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Tue, Jun 3, 7:37 AM (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3272278
Attached To
rDSCH Scheduling utilities
Event Timeline
Log In to Comment