diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,2 @@ -swh.core >= 0.0.56 +swh.core[db,http] >= 0.0.59 swh.storage >= 0.0.129 diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -53,6 +53,8 @@ entry_points=''' [console_scripts] swh-scheduler=swh.scheduler.cli:main + [swh.cli.subcommands] + scheduler=swh.scheduler.cli:cli ''', classifiers=[ "Programming Language :: Python :: 3", diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli/__init__.py rename from swh/scheduler/cli.py rename to swh/scheduler/cli/__init__.py --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli/__init__.py @@ -13,15 +13,6 @@ import time import datetime -from swh.core import utils, config -from swh.storage import get_storage -from swh.storage.algos.origin import iter_origins - -from . import compute_nb_tasks_from -from .backend_es import SWHElasticSearchClient -from . import get_scheduler, DEFAULT_CONFIG -from .cli_utils import parse_options, schedule_origin_batches - locale.setlocale(locale.LC_ALL, '') ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] @@ -156,22 +147,22 @@ help="Scheduling database DSN (imply cls is 'local')") @click.option('--url', '-u', default=None, help="Scheduler's url access (imply cls is 'remote')") -@click.option('--log-level', '-l', default='INFO', - type=click.Choice(logging._nameToLevel.keys()), - help="Log level (default to INFO)") @click.option('--no-stdout', is_flag=True, default=False, help="Do NOT output logs on the console") @click.pass_context -def cli(ctx, config_file, database, url, log_level, no_stdout): - """Software Heritage Scheduler CLI interface +def cli(ctx, config_file, database, url, no_stdout): + """Scheduler CLI interface. Default to use the the local scheduler instance (plugged to the main scheduler db). """ + from swh.core import config from swh.scheduler.celery_backend.config import setup_log_handler - log_level = setup_log_handler( - loglevel=log_level, colorize=False, + from swh.scheduler import get_scheduler, DEFAULT_CONFIG + + setup_log_handler( + loglevel=ctx.obj['log_level'], colorize=False, format='[%(levelname)s] %(name)s -- %(message)s', log_console=not no_stdout) @@ -201,7 +192,6 @@ ctx.obj['scheduler'] = scheduler ctx.obj['config'] = conf - ctx.obj['loglevel'] = log_level @cli.group('task') @@ -306,6 +296,8 @@ Note: if the priority is not given, the task won't have the priority set, which is considered as the lowest priority level. """ + from .utils import parse_options + scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') @@ -366,6 +358,10 @@ swh-scheduler --database 'service=swh-scheduler' \ task schedule_origins indexer_origin_metadata """ + from swh.storage import get_storage + from swh.storage.algos.origin import iter_origins + from .utils import parse_options, schedule_origin_batches + scheduler = ctx.obj['scheduler'] storage = get_storage('remote', {'url': storage_url}) if dry_run: @@ -395,10 +391,10 @@ You can override the number of tasks to fetch """ + from swh.scheduler import compute_nb_tasks_from scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') - num_tasks, num_tasks_priority = compute_nb_tasks_from(limit) output = [] @@ -549,6 +545,9 @@ With --dry-run flag set (default), only list those. """ + from swh.core.utils import grouper + from .backend_es import SWHElasticSearchClient + scheduler = ctx.obj['scheduler'] if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') @@ -604,7 +603,7 @@ gen = index_data(before, last_id=start_from, batch_index=batch_index) if cleanup: - for task_ids in utils.grouper(gen, n=batch_clean): + for task_ids in grouper(gen, n=batch_clean): task_ids = list(task_ids) log.info('Clean up %s tasks: [%s, ...]' % ( len(task_ids), task_ids[0])) @@ -612,7 +611,7 @@ continue ctx.obj['scheduler'].delete_archived_tasks(task_ids) else: - for task_ids in utils.grouper(gen, n=batch_index): + for task_ids in grouper(gen, n=batch_index): task_ids = list(task_ids) log.info('Indexed %s tasks: [%s, ...]' % ( len(task_ids), task_ids[0])) @@ -693,7 +692,7 @@ from swh.scheduler.api import server server.app.config.update(ctx.obj['config']) if debug is None: - debug = ctx.obj['loglevel'] <= logging.DEBUG + debug = ctx.obj['log_level'] <= logging.DEBUG server.app.run(host, port=port, debug=bool(debug)) diff --git a/swh/scheduler/cli_utils.py b/swh/scheduler/cli/utils.py copy from swh/scheduler/cli_utils.py copy to swh/scheduler/cli/utils.py --- a/swh/scheduler/cli_utils.py +++ b/swh/scheduler/cli/utils.py @@ -8,7 +8,7 @@ import click import yaml -from .utils import create_task_dict +from swh.scheduler.utils import create_task_dict TASK_BATCH_SIZE = 1000 # Number of tasks per query to the scheduler diff --git a/swh/scheduler/cli_utils.py b/swh/scheduler/cli_utils.py --- a/swh/scheduler/cli_utils.py +++ b/swh/scheduler/cli_utils.py @@ -1,87 +1,2 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import itertools - -import click -import yaml - -from .utils import create_task_dict - -TASK_BATCH_SIZE = 1000 # Number of tasks per query to the scheduler - - -def schedule_origin_batches( - scheduler, task_type, origins, origin_batch_size, kwargs): - nb_origins = 0 - nb_tasks = 0 - - while True: - task_batch = [] - for _ in range(TASK_BATCH_SIZE): - # Group origins - origin_batch = [] - for origin in itertools.islice(origins, origin_batch_size): - origin_batch.append(origin) - nb_origins += len(origin_batch) - if not origin_batch: - break - - # Create a task for these origins - args = [origin_batch] - task_dict = create_task_dict(task_type, 'oneshot', *args, **kwargs) - task_batch.append(task_dict) - - # Schedule a batch of tasks - if not task_batch: - break - nb_tasks += len(task_batch) - if scheduler: - scheduler.create_tasks(task_batch) - click.echo('Scheduled %d tasks (%d origins).' % (nb_tasks, nb_origins)) - - # Print final status. - if nb_tasks: - click.echo('Done.') - else: - click.echo('Nothing to do (no origin metadata matched the criteria).') - - -def parse_argument(option): - try: - return yaml.safe_load(option) - except Exception: - raise click.ClickException('Invalid argument: {}'.format(option)) - - -def parse_options(options): - """Parses options from a CLI as YAML and turns it into Python - args and kwargs. - - >>> parse_options([]) - ([], {}) - >>> parse_options(['foo', 'bar']) - (['foo', 'bar'], {}) - >>> parse_options(['[foo, bar]']) - ([['foo', 'bar']], {}) - >>> parse_options(['"foo"', '"bar"']) - (['foo', 'bar'], {}) - >>> parse_options(['foo="bar"']) - ([], {'foo': 'bar'}) - >>> parse_options(['"foo"', 'bar="baz"']) - (['foo'], {'bar': 'baz'}) - >>> parse_options(['42', 'bar=False']) - ([42], {'bar': False}) - >>> parse_options(['42', 'bar=false']) - ([42], {'bar': False}) - >>> parse_options(['42', '"foo']) - Traceback (most recent call last): - ... - click.exceptions.ClickException: Invalid argument: "foo - """ - kw_pairs = [x.split('=', 1) for x in options if '=' in x] - args = [parse_argument(x) for x in options if '=' not in x] - kw = {k: parse_argument(v) for (k, v) in kw_pairs} - return (args, kw) +# for BW compat +from swh.scheduler.cli.utils import * # noqa diff --git a/swh/scheduler/tests/test_cli.py b/swh/scheduler/tests/test_cli.py --- a/swh/scheduler/tests/test_cli.py +++ b/swh/scheduler/tests/test_cli.py @@ -7,6 +7,7 @@ import re import tempfile from unittest.mock import patch +import logging from click.testing import CliRunner import pytest @@ -26,13 +27,13 @@ def invoke(scheduler, catch_exceptions, args): runner = CliRunner() - with patch('swh.scheduler.cli.get_scheduler') as get_scheduler_mock, \ + with patch('swh.scheduler.get_scheduler') as get_scheduler_mock, \ tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) get_scheduler_mock.return_value = scheduler - args = ['-C' + config_fd.name, '-l', 'WARNING'] + args - result = runner.invoke(cli, args) + args = ['-C' + config_fd.name, ] + args + result = runner.invoke(cli, args, obj={'log_level': logging.WARNING}) if not catch_exceptions and result.exception: print(result.output) raise result.exception @@ -578,12 +579,12 @@ """An instance of swh.storage.in_memory.Storage that gets injected into the CLI functions.""" storage = Storage() - with patch('swh.scheduler.cli.get_storage') as get_storage_mock: + with patch('swh.storage.get_storage') as get_storage_mock: get_storage_mock.return_value = storage yield storage -@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) +@patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) def test_task_schedule_origins_dry_run( swh_scheduler, storage): """Tests the scheduling when origin_batch_size*task_batch_size is a @@ -610,7 +611,7 @@ assert len(tasks) == 0 -@patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) +@patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) def test_task_schedule_origins(swh_scheduler, storage): """Tests the scheduling when neither origin_batch_size or task_batch_size is a divisor of nb_origins."""