Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/cli/__init__.py
- This file was moved from swh/scheduler/cli.py.
# Copyright (C) 2016-2018 The Software Heritage developers | # Copyright (C) 2016-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import arrow | import arrow | ||||
import click | import click | ||||
import csv | import csv | ||||
import itertools | import itertools | ||||
import json | import json | ||||
import locale | import locale | ||||
import logging | import logging | ||||
import time | import time | ||||
import datetime | import datetime | ||||
from swh.core import utils, config | |||||
from swh.storage import get_storage | |||||
from swh.storage.algos.origin import iter_origins | |||||
from . import compute_nb_tasks_from | |||||
from .backend_es import SWHElasticSearchClient | |||||
from . import get_scheduler, DEFAULT_CONFIG | |||||
from .cli_utils import parse_options, schedule_origin_batches | |||||
locale.setlocale(locale.LC_ALL, '') | locale.setlocale(locale.LC_ALL, '') | ||||
ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] | ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] | ||||
class DateTimeType(click.ParamType): | class DateTimeType(click.ParamType): | ||||
name = 'time and date' | name = 'time and date' | ||||
▲ Show 20 Lines • Show All 118 Lines • ▼ Show 20 Lines | |||||
@click.group(context_settings=CONTEXT_SETTINGS) | @click.group(context_settings=CONTEXT_SETTINGS) | ||||
@click.option('--config-file', '-C', default=None, | @click.option('--config-file', '-C', default=None, | ||||
type=click.Path(exists=True, dir_okay=False,), | type=click.Path(exists=True, dir_okay=False,), | ||||
help="Configuration file.") | help="Configuration file.") | ||||
@click.option('--database', '-d', default=None, | @click.option('--database', '-d', default=None, | ||||
help="Scheduling database DSN (imply cls is 'local')") | help="Scheduling database DSN (imply cls is 'local')") | ||||
@click.option('--url', '-u', default=None, | @click.option('--url', '-u', default=None, | ||||
help="Scheduler's url access (imply cls is 'remote')") | help="Scheduler's url access (imply cls is 'remote')") | ||||
@click.option('--log-level', '-l', default='INFO', | |||||
type=click.Choice(logging._nameToLevel.keys()), | |||||
help="Log level (default to INFO)") | |||||
@click.option('--no-stdout', is_flag=True, default=False, | @click.option('--no-stdout', is_flag=True, default=False, | ||||
help="Do NOT output logs on the console") | help="Do NOT output logs on the console") | ||||
@click.pass_context | @click.pass_context | ||||
def cli(ctx, config_file, database, url, log_level, no_stdout): | def cli(ctx, config_file, database, url, no_stdout): | ||||
"""Software Heritage Scheduler CLI interface | """Scheduler CLI interface. | ||||
Default to use the the local scheduler instance (plugged to the | Default to use the the local scheduler instance (plugged to the | ||||
main scheduler db). | main scheduler db). | ||||
""" | """ | ||||
from swh.core import config | |||||
from swh.scheduler.celery_backend.config import setup_log_handler | from swh.scheduler.celery_backend.config import setup_log_handler | ||||
log_level = setup_log_handler( | from swh.scheduler import get_scheduler, DEFAULT_CONFIG | ||||
loglevel=log_level, colorize=False, | |||||
setup_log_handler( | |||||
loglevel=ctx.obj['log_level'], colorize=False, | |||||
format='[%(levelname)s] %(name)s -- %(message)s', | format='[%(levelname)s] %(name)s -- %(message)s', | ||||
log_console=not no_stdout) | log_console=not no_stdout) | ||||
ctx.ensure_object(dict) | ctx.ensure_object(dict) | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
scheduler = None | scheduler = None | ||||
conf = config.read(config_file, DEFAULT_CONFIG) | conf = config.read(config_file, DEFAULT_CONFIG) | ||||
Show All 13 Lines | try: | ||||
scheduler = get_scheduler(**sched_conf) | scheduler = get_scheduler(**sched_conf) | ||||
except ValueError: | except ValueError: | ||||
# it's the subcommand to decide whether not having a proper | # it's the subcommand to decide whether not having a proper | ||||
# scheduler instance is a problem. | # scheduler instance is a problem. | ||||
pass | pass | ||||
ctx.obj['scheduler'] = scheduler | ctx.obj['scheduler'] = scheduler | ||||
ctx.obj['config'] = conf | ctx.obj['config'] = conf | ||||
ctx.obj['loglevel'] = log_level | |||||
@cli.group('task') | @cli.group('task') | ||||
@click.pass_context | @click.pass_context | ||||
def task(ctx): | def task(ctx): | ||||
"""Manipulate tasks.""" | """Manipulate tasks.""" | ||||
pass | pass | ||||
▲ Show 20 Lines • Show All 88 Lines • ▼ Show 20 Lines | swh-scheduler --database 'service=swh-scheduler' \ | ||||
task add swh-lister-pypi | task add swh-lister-pypi | ||||
swh-scheduler --database 'service=swh-scheduler' \ | swh-scheduler --database 'service=swh-scheduler' \ | ||||
task add swh-lister-debian --policy=oneshot distribution=stretch | task add swh-lister-debian --policy=oneshot distribution=stretch | ||||
Note: if the priority is not given, the task won't have the priority set, | Note: if the priority is not given, the task won't have the priority set, | ||||
which is considered as the lowest priority level. | which is considered as the lowest priority level. | ||||
""" | """ | ||||
from .utils import parse_options | |||||
scheduler = ctx.obj['scheduler'] | scheduler = ctx.obj['scheduler'] | ||||
if not scheduler: | if not scheduler: | ||||
raise ValueError('Scheduler class (local/remote) must be instantiated') | raise ValueError('Scheduler class (local/remote) must be instantiated') | ||||
now = arrow.utcnow() | now = arrow.utcnow() | ||||
(args, kw) = parse_options(options) | (args, kw) = parse_options(options) | ||||
task = {'type': type, | task = {'type': type, | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | def schedule_origin_metadata_index( | ||||
keyword argument(s) of the task in the form key=value, where value is | keyword argument(s) of the task in the form key=value, where value is | ||||
in YAML format. | in YAML format. | ||||
Usage sample: | Usage sample: | ||||
swh-scheduler --database 'service=swh-scheduler' \ | swh-scheduler --database 'service=swh-scheduler' \ | ||||
task schedule_origins indexer_origin_metadata | task schedule_origins indexer_origin_metadata | ||||
""" | """ | ||||
from swh.storage import get_storage | |||||
from swh.storage.algos.origin import iter_origins | |||||
from .utils import parse_options, schedule_origin_batches | |||||
scheduler = ctx.obj['scheduler'] | scheduler = ctx.obj['scheduler'] | ||||
storage = get_storage('remote', {'url': storage_url}) | storage = get_storage('remote', {'url': storage_url}) | ||||
if dry_run: | if dry_run: | ||||
scheduler = None | scheduler = None | ||||
(args, kw) = parse_options(options) | (args, kw) = parse_options(options) | ||||
if args: | if args: | ||||
raise click.ClickException('Only keywords arguments are allowed.') | raise click.ClickException('Only keywords arguments are allowed.') | ||||
Show All 13 Lines | @click.option('--before', '-b', required=False, type=DATETIME, | ||||
help='List all jobs supposed to run before the given date') | help='List all jobs supposed to run before the given date') | ||||
@click.pass_context | @click.pass_context | ||||
def list_pending_tasks(ctx, task_types, limit, before): | def list_pending_tasks(ctx, task_types, limit, before): | ||||
"""List the tasks that are going to be run. | """List the tasks that are going to be run. | ||||
You can override the number of tasks to fetch | You can override the number of tasks to fetch | ||||
""" | """ | ||||
from swh.scheduler import compute_nb_tasks_from | |||||
scheduler = ctx.obj['scheduler'] | scheduler = ctx.obj['scheduler'] | ||||
if not scheduler: | if not scheduler: | ||||
raise ValueError('Scheduler class (local/remote) must be instantiated') | raise ValueError('Scheduler class (local/remote) must be instantiated') | ||||
num_tasks, num_tasks_priority = compute_nb_tasks_from(limit) | num_tasks, num_tasks_priority = compute_nb_tasks_from(limit) | ||||
output = [] | output = [] | ||||
for task_type in task_types: | for task_type in task_types: | ||||
pending = scheduler.peek_ready_tasks( | pending = scheduler.peek_ready_tasks( | ||||
task_type, timestamp=before, | task_type, timestamp=before, | ||||
num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) | num_tasks=num_tasks, num_tasks_priority=num_tasks_priority) | ||||
output.append('Found %d %s tasks\n' % ( | output.append('Found %d %s tasks\n' % ( | ||||
▲ Show 20 Lines • Show All 134 Lines • ▼ Show 20 Lines | def archive_tasks(ctx, before, after, batch_index, bulk_index, batch_clean, | ||||
dry_run, verbose, cleanup, start_from): | dry_run, verbose, cleanup, start_from): | ||||
"""Archive task/task_run whose (task_type is 'oneshot' and task_status | """Archive task/task_run whose (task_type is 'oneshot' and task_status | ||||
is 'completed') or (task_type is 'recurring' and task_status is | is 'completed') or (task_type is 'recurring' and task_status is | ||||
'disabled'). | 'disabled'). | ||||
With --dry-run flag set (default), only list those. | With --dry-run flag set (default), only list those. | ||||
""" | """ | ||||
from swh.core.utils import grouper | |||||
from .backend_es import SWHElasticSearchClient | |||||
scheduler = ctx.obj['scheduler'] | scheduler = ctx.obj['scheduler'] | ||||
if not scheduler: | if not scheduler: | ||||
raise ValueError('Scheduler class (local/remote) must be instantiated') | raise ValueError('Scheduler class (local/remote) must be instantiated') | ||||
es_client = SWHElasticSearchClient() | es_client = SWHElasticSearchClient() | ||||
logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) | logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) | ||||
log = logging.getLogger('swh.scheduler.cli.archive') | log = logging.getLogger('swh.scheduler.cli.archive') | ||||
logging.getLogger('urllib3').setLevel(logging.WARN) | logging.getLogger('urllib3').setLevel(logging.WARN) | ||||
Show All 39 Lines | def index_data(before, last_id, batch_index): | ||||
continue | continue | ||||
yield from es_client.streaming_bulk( | yield from es_client.streaming_bulk( | ||||
index_name, tasks_group, source=['task_id', 'task_run_id'], | index_name, tasks_group, source=['task_id', 'task_run_id'], | ||||
chunk_size=bulk_index, log=log) | chunk_size=bulk_index, log=log) | ||||
gen = index_data(before, last_id=start_from, batch_index=batch_index) | gen = index_data(before, last_id=start_from, batch_index=batch_index) | ||||
if cleanup: | if cleanup: | ||||
for task_ids in utils.grouper(gen, n=batch_clean): | for task_ids in grouper(gen, n=batch_clean): | ||||
task_ids = list(task_ids) | task_ids = list(task_ids) | ||||
log.info('Clean up %s tasks: [%s, ...]' % ( | log.info('Clean up %s tasks: [%s, ...]' % ( | ||||
len(task_ids), task_ids[0])) | len(task_ids), task_ids[0])) | ||||
if dry_run: # no clean up | if dry_run: # no clean up | ||||
continue | continue | ||||
ctx.obj['scheduler'].delete_archived_tasks(task_ids) | ctx.obj['scheduler'].delete_archived_tasks(task_ids) | ||||
else: | else: | ||||
for task_ids in utils.grouper(gen, n=batch_index): | for task_ids in grouper(gen, n=batch_index): | ||||
task_ids = list(task_ids) | task_ids = list(task_ids) | ||||
log.info('Indexed %s tasks: [%s, ...]' % ( | log.info('Indexed %s tasks: [%s, ...]' % ( | ||||
len(task_ids), task_ids[0])) | len(task_ids), task_ids[0])) | ||||
@cli.command('runner') | @cli.command('runner') | ||||
@click.option('--period', '-p', default=0, | @click.option('--period', '-p', default=0, | ||||
help=('Period (in s) at witch pending tasks are checked and ' | help=('Period (in s) at witch pending tasks are checked and ' | ||||
▲ Show 20 Lines • Show All 64 Lines • ▼ Show 20 Lines | def api_server(ctx, host, port, debug): | ||||
if ctx.obj['config']['scheduler']['cls'] == 'remote': | if ctx.obj['config']['scheduler']['cls'] == 'remote': | ||||
click.echo("The API server can only be started with a 'local' " | click.echo("The API server can only be started with a 'local' " | ||||
"configuration", err=True) | "configuration", err=True) | ||||
ctx.exit(1) | ctx.exit(1) | ||||
from swh.scheduler.api import server | from swh.scheduler.api import server | ||||
server.app.config.update(ctx.obj['config']) | server.app.config.update(ctx.obj['config']) | ||||
if debug is None: | if debug is None: | ||||
debug = ctx.obj['loglevel'] <= logging.DEBUG | debug = ctx.obj['log_level'] <= logging.DEBUG | ||||
server.app.run(host, port=port, debug=bool(debug)) | server.app.run(host, port=port, debug=bool(debug)) | ||||
@cli.group('task-type') | @cli.group('task-type') | ||||
@click.pass_context | @click.pass_context | ||||
def task_type(ctx): | def task_type(ctx): | ||||
"""Manipulate task types.""" | """Manipulate task types.""" | ||||
pass | pass | ||||
▲ Show 20 Lines • Show All 104 Lines • Show Last 20 Lines |