diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py index 8044134..4ededb9 100644 --- a/swh/scheduler/tests/test_cli.py +++ b/swh/scheduler/tests/test_cli.py @@ -1,683 +1,690 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import re import tempfile from unittest.mock import patch import logging from click.testing import CliRunner import pytest from swh.storage import get_storage from swh.scheduler.cli import cli from swh.scheduler.utils import create_task_dict CLI_CONFIG = ''' scheduler: cls: foo args: {} ''' def invoke(scheduler, catch_exceptions, args): runner = CliRunner() with patch('swh.scheduler.get_scheduler') as get_scheduler_mock, \ tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) get_scheduler_mock.return_value = scheduler args = ['-C' + config_fd.name, ] + args result = runner.invoke(cli, args, obj={'log_level': logging.WARNING}) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_schedule_tasks(swh_scheduler): csv_data = ( b'swh-test-ping;[["arg1", "arg2"]];{"key": "value"};' + datetime.datetime.utcnow().isoformat().encode() + b'\n' + b'swh-test-ping;[["arg3", "arg4"]];{"key": "value"};' + datetime.datetime.utcnow().isoformat().encode() + b'\n') with tempfile.NamedTemporaryFile(suffix='.csv') as csv_fd: csv_fd.write(csv_data) csv_fd.seek(0) result = invoke(swh_scheduler, False, [ 'task', 'schedule', '-d', ';', csv_fd.name ]) expected = r''' Created 2 tasks Task 1 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: \['arg1', 'arg2'\] Keyword args: key: 'value' Task 2 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: \['arg3', 'arg4'\] Keyword args: key: 'value' '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_schedule_tasks_columns(swh_scheduler): with tempfile.NamedTemporaryFile(suffix='.csv') as csv_fd: csv_fd.write( b'swh-test-ping;oneshot;["arg1", "arg2"];{"key": "value"}\n') csv_fd.seek(0) result = invoke(swh_scheduler, False, [ 'task', 'schedule', '-c', 'type', '-c', 'policy', '-c', 'args', '-c', 'kwargs', '-d', ';', csv_fd.name ]) expected = r''' Created 1 tasks Task 1 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: 'arg1' 'arg2' Keyword args: key: 'value' '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_schedule_task(swh_scheduler): result = invoke(swh_scheduler, False, [ 'task', 'add', 'swh-test-ping', 'arg1', 'arg2', 'key=value', ]) expected = r''' Created 1 tasks Task 1 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: recurring Args: 'arg1' 'arg2' Keyword args: key: 'value' '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_none(swh_scheduler): result = invoke(swh_scheduler, False, [ 'task', 'list-pending', 'swh-test-ping', ]) expected = r''' Found 0 swh-test-ping tasks '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks(swh_scheduler): task1 = create_task_dict('swh-test-ping', 'oneshot', key='value1') task2 = create_task_dict('swh-test-ping', 'oneshot', key='value2') task2['next_run'] += datetime.timedelta(days=1) swh_scheduler.create_tasks([task1, task2]) result = invoke(swh_scheduler, False, [ 'task', 'list-pending', 'swh-test-ping', ]) expected = r''' Found 1 swh-test-ping tasks Task 1 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value1' '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output swh_scheduler.grab_ready_tasks('swh-test-ping') result = invoke(swh_scheduler, False, [ 'task', 'list-pending', 'swh-test-ping', ]) expected = r''' Found 0 swh-test-ping tasks '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_filter(swh_scheduler): task = create_task_dict('swh-test-multiping', 'oneshot', key='value') swh_scheduler.create_tasks([task]) result = invoke(swh_scheduler, False, [ 'task', 'list-pending', 'swh-test-ping', ]) expected = r''' Found 0 swh-test-ping tasks '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_filter_2(swh_scheduler): swh_scheduler.create_tasks([ create_task_dict('swh-test-multiping', 'oneshot', key='value'), create_task_dict('swh-test-ping', 'oneshot', key='value2'), ]) result = invoke(swh_scheduler, False, [ 'task', 'list-pending', 'swh-test-ping', ]) expected = r''' Found 1 swh-test-ping tasks Task 2 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output # Fails because "task list-pending --limit 3" only returns 2 tasks, because # of how compute_nb_tasks_from works. @pytest.mark.xfail def test_list_pending_tasks_limit(swh_scheduler): swh_scheduler.create_tasks([ create_task_dict('swh-test-ping', 'oneshot', key='value%d' % i) for i in range(10) ]) result = invoke(swh_scheduler, False, [ 'task', 'list-pending', 'swh-test-ping', '--limit', '3', ]) expected = r''' Found 2 swh-test-ping tasks Task 1 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value0' Task 2 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value1' Task 3 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_pending_tasks_before(swh_scheduler): task1 = create_task_dict('swh-test-ping', 'oneshot', key='value') task2 = create_task_dict('swh-test-ping', 'oneshot', key='value2') task1['next_run'] += datetime.timedelta(days=3) task2['next_run'] += datetime.timedelta(days=1) swh_scheduler.create_tasks([task1, task2]) result = invoke(swh_scheduler, False, [ 'task', 'list-pending', 'swh-test-ping', '--before', (datetime.date.today() + datetime.timedelta(days=2)).isoformat() ]) expected = r''' Found 1 swh-test-ping tasks Task 2 Next run: in a day \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Args: Keyword args: key: 'value2' '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks(swh_scheduler): task1 = create_task_dict('swh-test-ping', 'oneshot', key='value1') task2 = create_task_dict('swh-test-ping', 'oneshot', key='value2') task1['next_run'] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks('swh-test-ping') result = invoke(swh_scheduler, False, [ 'task', 'list', ]) expected = r''' Found 2 tasks Task 1 Next run: in 3 days \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 2 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_scheduled Priority:\x20 Args: Keyword args: key: 'value2' '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_id(swh_scheduler): task1 = create_task_dict('swh-test-ping', 'oneshot', key='value1') task2 = create_task_dict('swh-test-ping', 'oneshot', key='value2') task3 = create_task_dict('swh-test-ping', 'oneshot', key='value3') swh_scheduler.create_tasks([task1, task2, task3]) result = invoke(swh_scheduler, False, [ 'task', 'list', '--task-id', '2', ]) expected = r''' Found 1 tasks Task 2 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_id_2(swh_scheduler): task1 = create_task_dict('swh-test-ping', 'oneshot', key='value1') task2 = create_task_dict('swh-test-ping', 'oneshot', key='value2') task3 = create_task_dict('swh-test-ping', 'oneshot', key='value3') swh_scheduler.create_tasks([task1, task2, task3]) result = invoke(swh_scheduler, False, [ 'task', 'list', '--task-id', '2', '--task-id', '3' ]) expected = r''' Found 2 tasks Task 2 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' Task 3 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value3' '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_type(swh_scheduler): task1 = create_task_dict('swh-test-ping', 'oneshot', key='value1') task2 = create_task_dict('swh-test-multiping', 'oneshot', key='value2') task3 = create_task_dict('swh-test-ping', 'oneshot', key='value3') swh_scheduler.create_tasks([task1, task2, task3]) result = invoke(swh_scheduler, False, [ 'task', 'list', '--task-type', 'swh-test-ping' ]) expected = r''' Found 2 tasks Task 1 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 3 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value3' '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_limit(swh_scheduler): task1 = create_task_dict('swh-test-ping', 'oneshot', key='value1') task2 = create_task_dict('swh-test-ping', 'oneshot', key='value2') task3 = create_task_dict('swh-test-ping', 'oneshot', key='value3') swh_scheduler.create_tasks([task1, task2, task3]) result = invoke(swh_scheduler, False, [ 'task', 'list', '--limit', '2', ]) expected = r''' Found 2 tasks Task 1 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' Task 2 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value2' '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_before(swh_scheduler): task1 = create_task_dict('swh-test-ping', 'oneshot', key='value1') task2 = create_task_dict('swh-test-ping', 'oneshot', key='value2') task1['next_run'] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks('swh-test-ping') result = invoke(swh_scheduler, False, [ 'task', 'list', '--before', (datetime.date.today() + datetime.timedelta(days=2)).isoformat() ]) expected = r''' Found 1 tasks Task 2 Next run: just now \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_scheduled Priority:\x20 Args: Keyword args: key: 'value2' '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def test_list_tasks_after(swh_scheduler): task1 = create_task_dict('swh-test-ping', 'oneshot', key='value1') task2 = create_task_dict('swh-test-ping', 'oneshot', key='value2') task1['next_run'] += datetime.timedelta(days=3, hours=2) swh_scheduler.create_tasks([task1, task2]) swh_scheduler.grab_ready_tasks('swh-test-ping') result = invoke(swh_scheduler, False, [ 'task', 'list', '--after', (datetime.date.today() + datetime.timedelta(days=2)).isoformat() ]) expected = r''' Found 1 tasks Task 1 Next run: in 3 days \(.*\) Interval: 1 day, 0:00:00 Type: swh-test-ping Policy: oneshot Status: next_run_not_scheduled Priority:\x20 Args: Keyword args: key: 'value1' '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output def _fill_storage_with_origins(storage, nb_origins): origins = [ { 'type': 'type{}'.format(i), 'url': 'http://example.com/{}'.format(i), } for i in range(nb_origins) ] storage.origin_add(origins) return origins @pytest.fixture def storage(): """An instance of in-memory storage that gets injected into the CLI functions.""" - storage = get_storage('memory') + storage_config = { + 'cls': 'pipeline', + 'steps': [ + {'cls': 'validate'}, + {'cls': 'memory'}, + ] + } + storage = get_storage(**storage_config) with patch('swh.storage.get_storage') as get_storage_mock: get_storage_mock.return_value = storage yield storage @patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) def test_task_schedule_origins_dry_run( swh_scheduler, storage): """Tests the scheduling when origin_batch_size*task_batch_size is a divisor of nb_origins.""" _fill_storage_with_origins(storage, 90) result = invoke(swh_scheduler, False, [ 'task', 'schedule_origins', '--dry-run', 'swh-test-ping', ]) # Check the output expected = r''' Scheduled 3 tasks \(30 origins\). Scheduled 6 tasks \(60 origins\). Scheduled 9 tasks \(90 origins\). Done. '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), \ repr(result.output) # Check scheduled tasks tasks = swh_scheduler.search_tasks() assert len(tasks) == 0 def _assert_origin_tasks_contraints( tasks, max_tasks, max_task_size, expected_origins): # check there are not too many tasks assert len(tasks) <= max_tasks # check tasks are not too large assert all(len(task['arguments']['args'][0]) <= max_task_size for task in tasks) # check the tasks are exhaustive assert sum([len(task['arguments']['args'][0]) for task in tasks]) == \ len(expected_origins) assert \ set.union(*(set(task['arguments']['args'][0]) for task in tasks)) == \ {origin['url'] for origin in expected_origins} @patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) def test_task_schedule_origins(swh_scheduler, storage): """Tests the scheduling when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" origins = _fill_storage_with_origins(storage, 70) result = invoke(swh_scheduler, False, [ 'task', 'schedule_origins', 'swh-test-ping', '--batch-size', '20', ]) # Check the output expected = r''' Scheduled 3 tasks \(60 origins\). Scheduled 4 tasks \(70 origins\). Done. '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), \ repr(result.output) # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, 4, 20, origins) assert all(task['arguments']['kwargs'] == {} for task in tasks) def test_task_schedule_origins_kwargs(swh_scheduler, storage): """Tests support of extra keyword-arguments.""" origins = _fill_storage_with_origins(storage, 30) result = invoke(swh_scheduler, False, [ 'task', 'schedule_origins', 'swh-test-ping', '--batch-size', '20', 'key1="value1"', 'key2="value2"', ]) # Check the output expected = r''' Scheduled 2 tasks \(30 origins\). Done. '''.lstrip() assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), \ repr(result.output) # Check tasks tasks = swh_scheduler.search_tasks() _assert_origin_tasks_contraints(tasks, 2, 20, origins) assert all(task['arguments']['kwargs'] == {'key1': 'value1', 'key2': 'value2'} for task in tasks)