diff --git a/conftest.py b/conftest.py index eb6de3d..8009b53 100644 --- a/conftest.py +++ b/conftest.py @@ -1,6 +1,16 @@ from hypothesis import settings # define tests profile. Full documentation is at: # https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles settings.register_profile("fast", max_examples=5, deadline=5000) settings.register_profile("slow", max_examples=20, deadline=5000) + +# Modules that should not be loaded by --doctest-modules +collect_ignore = [ + # ImportError + 'swh/scheduler/updater/ghtorrent/fake.py', + # NotImplementedError: save_group is not supported by this backend. + 'swh/scheduler/tests/tasks.py', + # OSError: Configuration file must be defined + 'swh/scheduler/api/wsgi.py', +] diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py index 149d419..4f4d601 100644 --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -1,709 +1,749 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import arrow import click import csv import itertools import json import locale import logging import time import datetime from swh.core import utils, config from . import compute_nb_tasks_from from .backend_es import SWHElasticSearchClient from . import get_scheduler, DEFAULT_CONFIG locale.setlocale(locale.LC_ALL, '') ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] class DateTimeType(click.ParamType): name = 'time and date' def convert(self, value, param, ctx): if not isinstance(value, arrow.Arrow): value = arrow.get(value) return value DATETIME = DateTimeType() CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) def format_dict(d): ret = {} for k, v in d.items(): if isinstance(v, (arrow.Arrow, datetime.date, datetime.datetime)): v = arrow.get(v).format() elif isinstance(v, dict): v = format_dict(v) ret[k] = v return ret def pretty_print_list(list, indent=0): """Pretty-print a list""" return ''.join('%s%s\n' % (' ' * indent, item) for item in list) def pretty_print_dict(dict, indent=0): """Pretty-print a list""" return ''.join('%s%s: %s\n' % (' ' * indent, click.style(key, bold=True), value) for key, value in dict.items()) def pretty_print_run(run, indent=4): fmt = ('{indent}{backend_id} [{status}]\n' '{indent} scheduled: {scheduled} [{started}:{ended}]') return fmt.format(indent=' '*indent, **format_dict(run)) def pretty_print_task(task, full=False): """Pretty-print a task If 'full' is True, also print the status and priority fields. + + >>> task = { + ... 'id': 1234, + ... 'arguments': { + ... 'args': ['foo', 'bar'], + ... 'kwargs': {'key': 'value'}, + ... }, + ... 'current_interval': datetime.timedelta(hours=1), + ... 'next_run': datetime.datetime(2019, 2, 21, 13, 52, 35, 407818), + ... 'policy': 'oneshot', + ... 'priority': None, + ... 'status': 'next_run_not_scheduled', + ... 'type': 'test_task', + ... } + >>> print(click.unstyle(pretty_print_task(task))) + Task 1234 + Next run: ... (2019-02-21 13:52:35+00:00) + Interval: 1:00:00 + Type: test_task + Policy: oneshot + Args: + foo + bar + Keyword args: + key: value + + >>> print(click.unstyle(pretty_print_task(task, full=True))) + Task 1234 + Next run: ... (2019-02-21 13:52:35+00:00) + Interval: 1:00:00 + Type: test_task + Policy: oneshot + Status: next_run_not_scheduled + Priority:\x20 + Args: + foo + bar + Keyword args: + key: value + """ next_run = arrow.get(task['next_run']) lines = [ '%s %s\n' % (click.style('Task', bold=True), task['id']), click.style(' Next run: ', bold=True), "%s (%s)" % (next_run.humanize(locale=ARROW_LOCALE), next_run.format()), '\n', click.style(' Interval: ', bold=True), str(task['current_interval']), '\n', click.style(' Type: ', bold=True), task['type'] or '', '\n', click.style(' Policy: ', bold=True), task['policy'] or '', '\n', ] if full: lines += [ click.style(' Status: ', bold=True), task['status'] or '', '\n', click.style(' Priority: ', bold=True), task['priority'] or '', '\n', ] lines += [ click.style(' Args:\n', bold=True), pretty_print_list(task['arguments']['args'], indent=4), click.style(' Keyword args:\n', bold=True), pretty_print_dict(task['arguments']['kwargs'], indent=4), ] return ''.join(lines) @click.group(context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.option('--database', '-d', default=None, help="Scheduling database DSN (imply cls is 'local')") @click.option('--url', '-u', default=None, help="Scheduler's url access (imply cls is 'remote')") @click.option('--log-level', '-l', default='INFO', type=click.Choice(logging._nameToLevel.keys()), help="Log level (default to INFO)") @click.option('--no-stdout', is_flag=True, default=False, help="Do NOT output logs on the console") @click.pass_context def cli(ctx, config_file, database, url, log_level, no_stdout): """Software Heritage Scheduler CLI interface Default to use the the local scheduler instance (plugged to the main scheduler db). """ from swh.scheduler.celery_backend.config import setup_log_handler log_level = setup_log_handler( loglevel=log_level, colorize=False, format='[%(levelname)s] %(name)s -- %(message)s', log_console=not no_stdout) ctx.ensure_object(dict) logger = logging.getLogger(__name__) scheduler = None conf = config.read(config_file, DEFAULT_CONFIG) if 'scheduler' not in conf: raise ValueError("missing 'scheduler' configuration") if database: conf['scheduler']['cls'] = 'local' conf['scheduler']['args']['db'] = database elif url: conf['scheduler']['cls'] = 'remote' conf['scheduler']['args'] = {'url': url} sched_conf = conf['scheduler'] try: logger.debug('Instanciating scheduler with %s' % ( sched_conf)) scheduler = get_scheduler(**sched_conf) except ValueError: # it's the subcommand to decide whether not having a proper # scheduler instance is a problem. pass ctx.obj['scheduler'] = scheduler ctx.obj['config'] = conf ctx.obj['loglevel'] = log_level @cli.group('task') @click.pass_context def task(ctx): """Manipulate tasks.""" pass @task.command('schedule') @click.option('--columns', '-c', multiple=True, default=['type', 'args', 'kwargs', 'next_run'], type=click.Choice([ 'type', 'args', 'kwargs', 'policy', 'next_run']), help='columns present in the CSV file') @click.option('--delimiter', '-d', default=',') @click.argument('file', type=click.File(encoding='utf-8')) @click.pass_context def schedule_tasks(ctx, columns, delimiter, file): """Schedule tasks from a CSV input file. The following columns are expected, and can be set through the -c option: - type: the type of the task to be scheduled (mandatory) - args: the arguments passed to the task (JSON list, defaults to an empty list) - kwargs: the keyword arguments passed to the task (JSON object, defaults to an empty dict) - next_run: the date at which the task should run (datetime, defaults to now) The CSV can be read either from a named file, or from stdin (use - as filename). Use sample: cat scheduling-task.txt | \ python3 -m swh.scheduler.cli \ --database 'service=swh-scheduler-dev' \ task schedule \ --columns type --columns kwargs --columns policy \ --delimiter ';' - """ tasks = [] now = arrow.utcnow() scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') reader = csv.reader(file, delimiter=delimiter) for line in reader: task = dict(zip(columns, line)) args = json.loads(task.pop('args', '[]')) kwargs = json.loads(task.pop('kwargs', '{}')) task['arguments'] = { 'args': args, 'kwargs': kwargs, } task['next_run'] = DATETIME.convert(task.get('next_run', now), None, None) tasks.append(task) created = scheduler.create_tasks(tasks) output = [ 'Created %d tasks\n' % len(created), ] for task in created: output.append(pretty_print_task(task)) click.echo_via_pager('\n'.join(output)) @task.command('add') @click.argument('type', nargs=1, required=True) @click.argument('options', nargs=-1) @click.option('--policy', '-p', default='recurring', type=click.Choice(['recurring', 'oneshot'])) @click.option('--priority', '-P', default=None, type=click.Choice(['low', 'normal', 'high'])) @click.option('--next-run', '-n', default=None) @click.pass_context def schedule_task(ctx, type, options, policy, priority, next_run): """Schedule one task from arguments. Usage sample: swh-scheduler --database 'service=swh-scheduler' \ task add swh-lister-pypi swh-scheduler --database 'service=swh-scheduler' \ task add swh-lister-debian --policy=oneshot distribution=stretch Note: if the priority is not given, the task won't have the priority set, which is considered as the lowest priority level. """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') now = arrow.utcnow() args = [x for x in options if '=' not in x] kw = dict(x.split('=', 1) for x in options if '=' in x) task = {'type': type, 'policy': policy, 'priority': priority, 'arguments': { 'args': args, 'kwargs': kw, }, 'next_run': DATETIME.convert(next_run or now, None, None), } created = scheduler.create_tasks([task]) output = [ 'Created %d tasks\n' % len(created), ] for task in created: output.append(pretty_print_task(task)) click.echo('\n'.join(output)) @task.command('list-pending') @click.argument('task-types', required=True, nargs=-1) @click.option('--limit', '-l', required=False, type=click.INT, help='The maximum number of tasks to fetch') @click.option('--before', '-b', required=False, type=DATETIME, help='List all jobs supposed to run before the given date') @click.pass_context def list_pending_tasks(ctx, task_types, limit, before): """List the tasks that are going to be run. You can override the number of tasks to fetch """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') num_tasks, num_tasks_priority = compute_nb_tasks_from(limit) output = [] for task_type in task_types: pending = scheduler.peek_ready_tasks( task_type, timestamp=before, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) output.append('Found %d %s tasks\n' % ( len(pending), task_type)) for task in pending: output.append(pretty_print_task(task)) click.echo('\n'.join(output)) @task.command('list') @click.option('--task-id', '-i', default=None, multiple=True, metavar='ID', help='List only tasks whose id is ID.') @click.option('--task-type', '-t', default=None, multiple=True, metavar='TYPE', help='List only tasks of type TYPE') @click.option('--limit', '-l', required=False, type=click.INT, help='The maximum number of tasks to fetch.') @click.option('--status', '-s', multiple=True, metavar='STATUS', default=None, help='List tasks whose status is STATUS.') @click.option('--policy', '-p', default=None, type=click.Choice(['recurring', 'oneshot']), help='List tasks whose policy is POLICY.') @click.option('--priority', '-P', default=None, multiple=True, type=click.Choice(['all', 'low', 'normal', 'high']), help='List tasks whose priority is PRIORITY.') @click.option('--before', '-b', required=False, type=DATETIME, metavar='DATETIME', help='Limit to tasks supposed to run before the given date.') @click.option('--after', '-a', required=False, type=DATETIME, metavar='DATETIME', help='Limit to tasks supposed to run after the given date.') @click.option('--list-runs', '-r', is_flag=True, default=False, help='Also list past executions of each task.') @click.pass_context def list_tasks(ctx, task_id, task_type, limit, status, policy, priority, before, after, list_runs): """List tasks. """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') if not task_type: task_type = [x['type'] for x in scheduler.get_task_types()] # if task_id is not given, default value for status is # 'next_run_not_scheduled' # if task_id is given, default status is 'all' if task_id is None and status is None: status = ['next_run_not_scheduled'] if status and 'all' in status: status = None if priority and 'all' in priority: priority = None output = [] tasks = scheduler.search_tasks( task_id=task_id, task_type=task_type, status=status, priority=priority, policy=policy, before=before, after=after, limit=limit) if list_runs: runs = {t['id']: [] for t in tasks} for r in scheduler.get_task_runs([task['id'] for task in tasks]): runs[r['task']].append(r) else: runs = {} output.append('Found %d tasks\n' % ( len(tasks))) for task in tasks: output.append(pretty_print_task(task, full=True)) if runs.get(task['id']): output.append(click.style(' Executions:', bold=True)) for run in runs[task['id']]: output.append(pretty_print_run(run, indent=4)) click.echo('\n'.join(output)) @task.command('respawn') @click.argument('task-ids', required=True, nargs=-1) @click.option('--next-run', '-n', required=False, type=DATETIME, metavar='DATETIME', default=None, help='Re spawn the selected tasks at this date') @click.pass_context def respawn_tasks(ctx, task_ids, next_run): """Respawn tasks. Respawn tasks given by their ids (see the 'task list' command to find task ids) at the given date (immediately by default). Eg. swh-scheduler task respawn 1 3 12 """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') if next_run is None: next_run = arrow.utcnow() output = [] scheduler.set_status_tasks( task_ids, status='next_run_not_scheduled', next_run=next_run) output.append('Respawn tasks %s\n' % (task_ids,)) click.echo('\n'.join(output)) @task.command('archive') @click.option('--before', '-b', default=None, help='''Task whose ended date is anterior will be archived. Default to current month's first day.''') @click.option('--after', '-a', default=None, help='''Task whose ended date is after the specified date will be archived. Default to prior month's first day.''') @click.option('--batch-index', default=1000, type=click.INT, help='Batch size of tasks to read from db to archive') @click.option('--bulk-index', default=200, type=click.INT, help='Batch size of tasks to bulk index') @click.option('--batch-clean', default=1000, type=click.INT, help='Batch size of task to clean after archival') @click.option('--dry-run/--no-dry-run', is_flag=True, default=False, help='Default to list only what would be archived.') @click.option('--verbose', is_flag=True, default=False, help='Verbose mode') @click.option('--cleanup/--no-cleanup', is_flag=True, default=True, help='Clean up archived tasks (default)') @click.option('--start-from', type=click.INT, default=-1, help='(Optional) default task id to start from. Default is -1.') @click.pass_context def archive_tasks(ctx, before, after, batch_index, bulk_index, batch_clean, dry_run, verbose, cleanup, start_from): """Archive task/task_run whose (task_type is 'oneshot' and task_status is 'completed') or (task_type is 'recurring' and task_status is 'disabled'). With --dry-run flag set (default), only list those. """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') es_client = SWHElasticSearchClient() logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) log = logging.getLogger('swh.scheduler.cli.archive') logging.getLogger('urllib3').setLevel(logging.WARN) logging.getLogger('elasticsearch').setLevel(logging.WARN) if dry_run: log.info('**DRY-RUN** (only reading db)') if not cleanup: log.info('**NO CLEANUP**') now = arrow.utcnow() # Default to archive tasks from a rolling month starting the week # prior to the current one if not before: before = now.shift(weeks=-1).format('YYYY-MM-DD') if not after: after = now.shift(weeks=-1).shift(months=-1).format('YYYY-MM-DD') log.debug('index: %s; cleanup: %s; period: [%s ; %s]' % ( not dry_run, not dry_run and cleanup, after, before)) def group_by_index_name(data, es_client=es_client): """Given a data record, determine the index's name through its ending date. This varies greatly depending on the task_run's status. """ date = data.get('started') if not date: date = data['scheduled'] return es_client.compute_index_name(date.year, date.month) def index_data(before, last_id, batch_index): tasks_in = scheduler.filter_task_to_archive( after, before, last_id=last_id, limit=batch_index) for index_name, tasks_group in itertools.groupby( tasks_in, key=group_by_index_name): log.debug('Index tasks to %s' % index_name) if dry_run: for task in tasks_group: yield task continue yield from es_client.streaming_bulk( index_name, tasks_group, source=['task_id', 'task_run_id'], chunk_size=bulk_index, log=log) gen = index_data(before, last_id=start_from, batch_index=batch_index) if cleanup: for task_ids in utils.grouper(gen, n=batch_clean): task_ids = list(task_ids) log.info('Clean up %s tasks: [%s, ...]' % ( len(task_ids), task_ids[0])) if dry_run: # no clean up continue ctx.obj['scheduler'].delete_archived_tasks(task_ids) else: for task_ids in utils.grouper(gen, n=batch_index): task_ids = list(task_ids) log.info('Indexed %s tasks: [%s, ...]' % ( len(task_ids), task_ids[0])) @cli.command('runner') @click.option('--period', '-p', default=0, help=('Period (in s) at witch pending tasks are checked and ' 'executed. Set to 0 (default) for a one shot.')) @click.pass_context def runner(ctx, period): """Starts a swh-scheduler runner service. This process is responsible for checking for ready-to-run tasks and schedule them.""" from swh.scheduler.celery_backend.runner import run_ready_tasks from swh.scheduler.celery_backend.config import build_app app = build_app(ctx.obj['config'].get('celery')) app.set_current() logger = logging.getLogger(__name__ + '.runner') scheduler = ctx.obj['scheduler'] logger.debug('Scheduler %s' % scheduler) try: while True: logger.debug('Run ready tasks') try: ntasks = len(run_ready_tasks(scheduler, app)) if ntasks: logger.info('Scheduled %s tasks', ntasks) except Exception: logger.exception('Unexpected error in run_ready_tasks()') if not period: break time.sleep(period) except KeyboardInterrupt: ctx.exit(0) @cli.command('listener') @click.pass_context def listener(ctx): """Starts a swh-scheduler listener service. This service is responsible for listening at task lifecycle events and handle their workflow status in the database.""" scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') from swh.scheduler.celery_backend.config import build_app app = build_app(ctx.obj['config'].get('celery')) app.set_current() from swh.scheduler.celery_backend.listener import event_monitor event_monitor(app, backend=scheduler) @cli.command('api-server') @click.option('--host', default='0.0.0.0', help="Host to run the scheduler server api") @click.option('--port', default=5008, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=None, help=("Indicates if the server should run in debug mode. " "Defaults to True if log-level is DEBUG, False otherwise.") ) @click.pass_context def api_server(ctx, host, port, debug): """Starts a swh-scheduler API HTTP server. """ if ctx.obj['config']['scheduler']['cls'] == 'remote': click.echo("The API server can only be started with a 'local' " "configuration", err=True) ctx.exit(1) from swh.scheduler.api import server server.app.config.update(ctx.obj['config']) if debug is None: debug = ctx.obj['loglevel'] <= logging.DEBUG server.app.run(host, port=port, debug=bool(debug)) @cli.group('task-type') @click.pass_context def task_type(ctx): """Manipulate task types.""" pass @task_type.command('list') @click.option('--verbose', '-v', is_flag=True, default=False, help='Verbose mode') @click.option('--task_type', '-t', multiple=True, default=None, help='List task types of given type') @click.option('--task_name', '-n', multiple=True, default=None, help='List task types of given backend task name') @click.pass_context def list_task_types(ctx, verbose, task_type, task_name): click.echo("Known task types:") if verbose: tmpl = click.style('{type}: ', bold=True) + '''{backend_name} {description} interval: {default_interval} [{min_interval}, {max_interval}] backoff_factor: {backoff_factor} max_queue_length: {max_queue_length} num_retries: {num_retries} retry_delay: {retry_delay} ''' else: tmpl = '{type}:\n {description}' for tasktype in sorted(ctx.obj['scheduler'].get_task_types(), key=lambda x: x['type']): if task_type and tasktype['type'] not in task_type: continue if task_name and tasktype['backend_name'] not in task_name: continue click.echo(tmpl.format(**tasktype)) @task_type.command('add') @click.argument('type', required=1) @click.argument('task-name', required=1) @click.argument('description', required=1) @click.option('--default-interval', '-i', default='90 days', help='Default interval ("90 days" by default)') @click.option('--min-interval', default=None, help='Minimum interval (default interval if not set)') @click.option('--max-interval', '-i', default=None, help='Maximal interval (default interval if not set)') @click.option('--backoff-factor', '-f', type=float, default=1, help='Backoff factor') @click.pass_context def add_task_type(ctx, type, task_name, description, default_interval, min_interval, max_interval, backoff_factor): """Create a new task type """ scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') task_type = dict( type=type, backend_name=task_name, description=description, default_interval=default_interval, min_interval=min_interval, max_interval=max_interval, backoff_factor=backoff_factor, max_queue_length=None, num_retries=None, retry_delay=None, ) scheduler.create_task_type(task_type) click.echo('OK') @cli.command('updater') @click.option('--verbose/--no-verbose', '-v', default=False, help='Verbose mode') @click.pass_context def updater(ctx, verbose): """Insert tasks in the scheduler from the scheduler-updater's events """ from swh.scheduler.updater.writer import UpdaterWriter UpdaterWriter(**ctx.obj['config']).run() @cli.command('ghtorrent') @click.option('--verbose/--no-verbose', '-v', default=False, help='Verbose mode') @click.pass_context def ghtorrent(ctx, verbose): """Consume events from ghtorrent and write them to cache. """ from swh.scheduler.updater.ghtorrent import GHTorrentConsumer from swh.scheduler.updater.backend import SchedulerUpdaterBackend ght_config = ctx.obj['config'].get('ghtorrent', {}) back_config = ctx.obj['config'].get('scheduler_updater', {}) backend = SchedulerUpdaterBackend(**back_config) GHTorrentConsumer(backend, **ght_config).run() def main(): return cli(auto_envvar_prefix='SWH_SCHEDULER') if __name__ == '__main__': main() diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py index d761f9a..1cfde95 100644 --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -1,93 +1,93 @@ import os import pytest import glob from datetime import timedelta from swh.core.utils import numfile_sortkey as sortkey from swh.scheduler import get_scheduler from swh.scheduler.tests import SQL_DIR # make sure we are not fooled by CELERY_ config environment vars for var in [x for x in os.environ.keys() if x.startswith('CELERY')]: os.environ.pop(var) import swh.scheduler.celery_backend.config # noqa # this import is needed here to enforce creation of the celery current app # BEFORE the swh_app fixture is called, otherwise the Celery app instance from # celery_backend.config becomes the celery.current_app DUMP_FILES = os.path.join(SQL_DIR, '*.sql') # celery tasks for testing purpose; tasks themselves should be -# in swh/scheduler/tests/celery_tasks.py +# in swh/scheduler/tests/tasks.py TASK_NAMES = ['ping', 'multiping', 'add', 'error'] @pytest.fixture(scope='session') def celery_enable_logging(): return True @pytest.fixture(scope='session') def celery_includes(): return [ 'swh.scheduler.tests.tasks', ] @pytest.fixture(scope='session') def celery_parameters(): return { 'task_cls': 'swh.scheduler.task:SWHTask', } @pytest.fixture(scope='session') def celery_config(): return { 'accept_content': ['application/x-msgpack', 'application/json'], 'task_serializer': 'msgpack', 'result_serializer': 'msgpack', } # override the celery_session_app fixture to monkeypatch the 'main' # swh.scheduler.celery_backend.config.app Celery application # with the test application. @pytest.fixture(scope='session') def swh_app(celery_session_app): swh.scheduler.celery_backend.config.app = celery_session_app yield celery_session_app @pytest.fixture def swh_scheduler(request, postgresql_proc, postgresql): scheduler_config = { 'db': 'postgresql://{user}@{host}:{port}/{dbname}'.format( host=postgresql_proc.host, port=postgresql_proc.port, user='postgres', dbname='tests') } all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) cursor = postgresql.cursor() for fname in all_dump_files: with open(fname) as fobj: cursor.execute(fobj.read()) postgresql.commit() scheduler = get_scheduler('local', scheduler_config) for taskname in TASK_NAMES: scheduler.create_task_type({ 'type': 'swh-test-{}'.format(taskname), 'description': 'The {} testing task'.format(taskname), 'backend_name': 'swh.scheduler.tests.tasks.{}'.format(taskname), 'default_interval': timedelta(days=1), 'min_interval': timedelta(hours=6), 'max_interval': timedelta(days=12), }) return scheduler diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py new file mode 100644 index 0000000..7e2a819 --- /dev/null +++ b/swh/scheduler/tests/test_cli.py @@ -0,0 +1,191 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import re +import tempfile +from unittest.mock import patch + +from click.testing import CliRunner + +from swh.scheduler.cli import cli +from swh.scheduler.utils import create_task_dict + + +CLI_CONFIG = ''' +scheduler: + cls: foo + args: {} +''' + + +def invoke(scheduler, catch_exceptions, args): + runner = CliRunner() + with patch('swh.scheduler.cli.get_scheduler') as get_scheduler_mock, \ + tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: + config_fd.write(CLI_CONFIG) + config_fd.seek(0) + get_scheduler_mock.return_value = scheduler + result = runner.invoke(cli, ['-C' + config_fd.name] + args) + if not catch_exceptions and result.exception: + print(result.output) + raise result.exception + return result + + +def test_schedule_tasks(swh_scheduler): + csv_data = ( + b'swh-test-ping;[["arg1", "arg2"]];{"key": "value"};' + + datetime.datetime.utcnow().isoformat().encode() + b'\n' + + b'swh-test-ping;[["arg3", "arg4"]];{"key": "value"};' + + datetime.datetime.utcnow().isoformat().encode() + b'\n') + with tempfile.NamedTemporaryFile(suffix='.csv') as csv_fd: + csv_fd.write(csv_data) + csv_fd.seek(0) + result = invoke(swh_scheduler, False, [ + 'task', 'schedule', + '-d', ';', + csv_fd.name + ]) + expected = r''' +\[INFO\] swh.core.config -- Loading config file .* +Created 2 tasks + +Task 1 + Next run: just now \(.*\) + Interval: 1 day, 0:00:00 + Type: swh-test-ping + Policy: recurring + Args: + \['arg1', 'arg2'\] + Keyword args: + key: value + +Task 2 + Next run: just now \(.*\) + Interval: 1 day, 0:00:00 + Type: swh-test-ping + Policy: recurring + Args: + \['arg3', 'arg4'\] + Keyword args: + key: value + +'''.lstrip() + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + +def test_schedule_tasks_columns(swh_scheduler): + with tempfile.NamedTemporaryFile(suffix='.csv') as csv_fd: + csv_fd.write( + b'swh-test-ping;oneshot;["arg1", "arg2"];{"key": "value"}\n') + csv_fd.seek(0) + result = invoke(swh_scheduler, False, [ + 'task', 'schedule', + '-c', 'type', '-c', 'policy', '-c', 'args', '-c', 'kwargs', + '-d', ';', + csv_fd.name + ]) + expected = r''' +\[INFO\] swh.core.config -- Loading config file .* +Created 1 tasks + +Task 1 + Next run: just now \(.*\) + Interval: 1 day, 0:00:00 + Type: swh-test-ping + Policy: oneshot + Args: + arg1 + arg2 + Keyword args: + key: value + +'''.lstrip() + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + +def test_schedule_task(swh_scheduler): + result = invoke(swh_scheduler, False, [ + 'task', 'add', + 'swh-test-ping', 'arg1', 'arg2', 'key=value', + ]) + expected = r''' +\[INFO\] swh.core.config -- Loading config file .* +Created 1 tasks + +Task 1 + Next run: just now \(.*\) + Interval: 1 day, 0:00:00 + Type: swh-test-ping + Policy: recurring + Args: + arg1 + arg2 + Keyword args: + key: value + +'''.lstrip() + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + +def test_list_pending_tasks_none(swh_scheduler): + result = invoke(swh_scheduler, False, [ + 'task', 'list-pending', 'swh-test-ping', + ]) + + expected = r''' +\[INFO\] swh.core.config -- Loading config file .* +Found 0 swh-test-ping tasks + +'''.lstrip() + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + +def test_list_pending_tasks_one(swh_scheduler): + task = create_task_dict('swh-test-ping', 'oneshot', key='value') + swh_scheduler.create_tasks([task]) + + result = invoke(swh_scheduler, False, [ + 'task', 'list-pending', 'swh-test-ping', + ]) + + expected = r''' +\[INFO\] swh.core.config -- Loading config file .* +Found 1 swh-test-ping tasks + +Task 1 + Next run: just now \(.*\) + Interval: 1 day, 0:00:00 + Type: swh-test-ping + Policy: oneshot + Args: + Keyword args: + key: value + +'''.lstrip() + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + +def test_list_pending_tasks_one_filter(swh_scheduler): + task = create_task_dict('swh-test-multiping', 'oneshot', key='value') + swh_scheduler.create_tasks([task]) + + result = invoke(swh_scheduler, False, [ + 'task', 'list-pending', 'swh-test-ping', + ]) + + expected = r''' +\[INFO\] swh.core.config -- Loading config file .* +Found 0 swh-test-ping tasks + +'''.lstrip() + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output diff --git a/tox.ini b/tox.ini index 0578fc8..8495db2 100644 --- a/tox.ini +++ b/tox.ini @@ -1,30 +1,30 @@ [tox] envlist=flake8,py3 [testenv:py3] deps = .[testing] pytest-cov pifpaf setenv = LC_ALL=C.UTF-8 LC_CTYPE=C.UTF-8 LANG=C.UTF-8 commands = - pifpaf run postgresql -- pytest --hypothesis-profile=fast --cov=swh --cov-branch {posargs} + pifpaf run postgresql -- pytest --doctest-modules --hypothesis-profile=fast --cov=swh --cov-branch {posargs} [testenv:py3-slow] deps = .[testing] pytest-cov pifpaf commands = - pifpaf run postgresql -- pytest --hypothesis-profile=slow --cov=swh --cov-branch {posargs} + pifpaf run postgresql -- pytest --doctest-modules --hypothesis-profile=slow --cov=swh --cov-branch {posargs} [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8