Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/cli/task.py
# Copyright (C) 2016-2019 The Software Heritage developers | # Copyright (C) 2016-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import json | import json | ||||
import itertools | import itertools | ||||
import locale | import locale | ||||
import logging | import logging | ||||
import arrow | import arrow | ||||
import csv | import csv | ||||
import click | import click | ||||
from typing import Any, Dict | |||||
from . import cli | from . import cli | ||||
locale.setlocale(locale.LC_ALL, '') | locale.setlocale(locale.LC_ALL, '') | ||||
ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] | ARROW_LOCALE = locale.getlocale(locale.LC_TIME)[0] | ||||
class DateTimeType(click.ParamType): | class DateTimeType(click.ParamType): | ||||
▲ Show 20 Lines • Show All 454 Lines • ▼ Show 20 Lines | |||||
@click.option('--batch-clean', default=1000, type=click.INT, | @click.option('--batch-clean', default=1000, type=click.INT, | ||||
help='Batch size of task to clean after archival') | help='Batch size of task to clean after archival') | ||||
@click.option('--dry-run/--no-dry-run', is_flag=True, default=False, | @click.option('--dry-run/--no-dry-run', is_flag=True, default=False, | ||||
help='Default to list only what would be archived.') | help='Default to list only what would be archived.') | ||||
@click.option('--verbose', is_flag=True, default=False, | @click.option('--verbose', is_flag=True, default=False, | ||||
help='Verbose mode') | help='Verbose mode') | ||||
@click.option('--cleanup/--no-cleanup', is_flag=True, default=True, | @click.option('--cleanup/--no-cleanup', is_flag=True, default=True, | ||||
help='Clean up archived tasks (default)') | help='Clean up archived tasks (default)') | ||||
@click.option('--start-from', type=click.INT, default=-1, | @click.option('--start-from', type=click.STRING, default=None, | ||||
help='(Optional) default task id to start from. Default is -1.') | help='(Optional) default page to start from.') | ||||
@click.pass_context | @click.pass_context | ||||
def archive_tasks(ctx, before, after, batch_index, bulk_index, batch_clean, | def archive_tasks(ctx, before, after, batch_index, bulk_index, batch_clean, | ||||
dry_run, verbose, cleanup, start_from): | dry_run, verbose, cleanup, start_from): | ||||
"""Archive task/task_run whose (task_type is 'oneshot' and task_status | """Archive task/task_run whose (task_type is 'oneshot' and task_status | ||||
is 'completed') or (task_type is 'recurring' and task_status is | is 'completed') or (task_type is 'recurring' and task_status is | ||||
'disabled'). | 'disabled'). | ||||
With --dry-run flag set (default), only list those. | With --dry-run flag set (default), only list those. | ||||
""" | """ | ||||
from swh.core.utils import grouper | from swh.core.utils import grouper | ||||
from swh.scheduler.backend_es import SWHElasticSearchClient | from swh.scheduler.backend_es import ElasticSearchBackend | ||||
config = ctx.obj['config'] | |||||
scheduler = ctx.obj['scheduler'] | scheduler = ctx.obj['scheduler'] | ||||
if not scheduler: | if not scheduler: | ||||
raise ValueError('Scheduler class (local/remote) must be instantiated') | raise ValueError('Scheduler class (local/remote) must be instantiated') | ||||
es_client = SWHElasticSearchClient() | |||||
logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) | logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) | ||||
log = logging.getLogger('swh.scheduler.cli.archive') | logger = logging.getLogger(__name__) | ||||
logging.getLogger('urllib3').setLevel(logging.WARN) | logging.getLogger('urllib3').setLevel(logging.WARN) | ||||
logging.getLogger('elasticsearch').setLevel(logging.WARN) | logging.getLogger('elasticsearch').setLevel(logging.ERROR) | ||||
if dry_run: | if dry_run: | ||||
log.info('**DRY-RUN** (only reading db)') | logger.info('**DRY-RUN** (only reading db)') | ||||
if not cleanup: | if not cleanup: | ||||
log.info('**NO CLEANUP**') | logger.info('**NO CLEANUP**') | ||||
es_storage = ElasticSearchBackend(**config) | |||||
now = arrow.utcnow() | now = arrow.utcnow() | ||||
# Default to archive tasks from a rolling month starting the week | # Default to archive tasks from a rolling month starting the week | ||||
# prior to the current one | # prior to the current one | ||||
if not before: | if not before: | ||||
before = now.shift(weeks=-1).format('YYYY-MM-DD') | before = now.shift(weeks=-1).format('YYYY-MM-DD') | ||||
if not after: | if not after: | ||||
after = now.shift(weeks=-1).shift(months=-1).format('YYYY-MM-DD') | after = now.shift(weeks=-1).shift(months=-1).format('YYYY-MM-DD') | ||||
log.debug('index: %s; cleanup: %s; period: [%s ; %s]' % ( | logger.debug('index: %s; cleanup: %s; period: [%s ; %s]' % ( | ||||
not dry_run, not dry_run and cleanup, after, before)) | not dry_run, not dry_run and cleanup, after, before)) | ||||
def group_by_index_name(data, es_client=es_client): | def get_index_name(data: Dict[str, Any], | ||||
es_storage: ElasticSearchBackend = es_storage) -> str: | |||||
vlorentz: I know it's off-topic, but this function should just be named `get_index_name`... it doesn't do… | |||||
"""Given a data record, determine the index's name through its ending | """Given a data record, determine the index's name through its ending | ||||
date. This varies greatly depending on the task_run's | date. This varies greatly depending on the task_run's | ||||
status. | status. | ||||
""" | """ | ||||
date = data.get('started') | date = data.get('started') | ||||
if not date: | if not date: | ||||
date = data['scheduled'] | date = data['scheduled'] | ||||
return es_client.compute_index_name(date.year, date.month) | return es_storage.compute_index_name(date.year, date.month) | ||||
def index_data(before, page_token, batch_index): | def index_data(before, page_token, batch_index): | ||||
while page_token is not None: | while True: | ||||
result = scheduler.filter_task_to_archive( | result = scheduler.filter_task_to_archive( | ||||
after, before, page_token=page_token, limit=batch_index) | after, before, page_token=page_token, limit=batch_index) | ||||
tasks_in = result['tasks'] | tasks_sorted = sorted(result['tasks'], key=get_index_name) | ||||
for index_name, tasks_group in itertools.groupby( | groups = itertools.groupby(tasks_sorted, key=get_index_name) | ||||
tasks_in, key=group_by_index_name): | for index_name, tasks_group in groups: | ||||
log.debug('Index tasks to %s' % index_name) | logger.debug('Index tasks to %s' % index_name) | ||||
if dry_run: | if dry_run: | ||||
for task in tasks_group: | for task in tasks_group: | ||||
yield task | yield task | ||||
continue | continue | ||||
yield from es_client.streaming_bulk( | yield from es_storage.streaming_bulk( | ||||
index_name, tasks_group, source=['task_id', 'task_run_id'], | index_name, tasks_group, source=['task_id', 'task_run_id'], | ||||
chunk_size=bulk_index, log=log) | chunk_size=bulk_index) | ||||
page_token = result.get('next_page_token') | page_token = result.get('next_page_token') | ||||
if page_token is None: | |||||
break | |||||
gen = index_data(before, page_token=start_from, batch_index=batch_index) | gen = index_data(before, page_token=start_from, batch_index=batch_index) | ||||
if cleanup: | if cleanup: | ||||
for task_ids in grouper(gen, n=batch_clean): | for task_ids in grouper(gen, n=batch_clean): | ||||
task_ids = list(task_ids) | task_ids = list(task_ids) | ||||
log.info('Clean up %s tasks: [%s, ...]' % ( | logger.info('Clean up %s tasks: [%s, ...]' % ( | ||||
len(task_ids), task_ids[0])) | len(task_ids), task_ids[0])) | ||||
if dry_run: # no clean up | if dry_run: # no clean up | ||||
continue | continue | ||||
ctx.obj['scheduler'].delete_archived_tasks(task_ids) | ctx.obj['scheduler'].delete_archived_tasks(task_ids) | ||||
else: | else: | ||||
for task_ids in grouper(gen, n=batch_index): | for task_ids in grouper(gen, n=batch_index): | ||||
task_ids = list(task_ids) | task_ids = list(task_ids) | ||||
log.info('Indexed %s tasks: [%s, ...]' % ( | logger.info('Indexed %s tasks: [%s, ...]' % ( | ||||
len(task_ids), task_ids[0])) | len(task_ids), task_ids[0])) | ||||
logger.debug('Done!') |
I know it's off-topic, but this function should just be named get_index_name... it doesn't do any grouping.