Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/cli/task.py
Show First 20 Lines • Show All 492 Lines • ▼ Show 20 Lines | """Archive task/task_run whose (task_type is 'oneshot' and task_status | ||||
'disabled'). | 'disabled'). | ||||
With --dry-run flag set (default), only list those. | With --dry-run flag set (default), only list those. | ||||
""" | """ | ||||
from swh.core.utils import grouper | from swh.core.utils import grouper | ||||
from swh.scheduler.backend_es import SWHElasticSearchClient | from swh.scheduler.backend_es import SWHElasticSearchClient | ||||
config = ctx.obj['config'] | |||||
scheduler = ctx.obj['scheduler'] | scheduler = ctx.obj['scheduler'] | ||||
if not scheduler: | if not scheduler: | ||||
raise ValueError('Scheduler class (local/remote) must be instantiated') | raise ValueError('Scheduler class (local/remote) must be instantiated') | ||||
es_client = SWHElasticSearchClient() | es_client = SWHElasticSearchClient(**config) | ||||
logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) | logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO) | ||||
log = logging.getLogger('swh.scheduler.cli.archive') | log = logging.getLogger('swh.scheduler.cli.archive') | ||||
logging.getLogger('urllib3').setLevel(logging.WARN) | logging.getLogger('urllib3').setLevel(logging.WARN) | ||||
logging.getLogger('elasticsearch').setLevel(logging.WARN) | logging.getLogger('elasticsearch').setLevel(logging.WARN) | ||||
if dry_run: | if dry_run: | ||||
log.info('**DRY-RUN** (only reading db)') | log.info('**DRY-RUN** (only reading db)') | ||||
if not cleanup: | if not cleanup: | ||||
log.info('**NO CLEANUP**') | log.info('**NO CLEANUP**') | ||||
now = arrow.utcnow() | now = arrow.utcnow() | ||||
# Default to archive tasks from a rolling month starting the week | # Default to archive tasks from a rolling month starting the week | ||||
# prior to the current one | # prior to the current one | ||||
if not before: | if not before: | ||||
before = now.shift(weeks=-1).format('YYYY-MM-DD') | before = now.shift(weeks=-1).format('YYYY-MM-DD') | ||||
if not after: | if not after: | ||||
after = now.shift(weeks=-1).shift(months=-1).format('YYYY-MM-DD') | after = now.shift(weeks=-1).shift(months=-1).format('YYYY-MM-DD') | ||||
log.debug('index: %s; cleanup: %s; period: [%s ; %s]' % ( | log.debug('index: %s; cleanup: %s; period: [%s ; %s]' % ( | ||||
not dry_run, not dry_run and cleanup, after, before)) | not dry_run, not dry_run and cleanup, after, before)) | ||||
def group_by_index_name(data, es_client=es_client): | def group_by_index_name(data, es_client=es_client): | ||||
vlorentz: I know it's off-topic, but this function should just be named `get_index_name`... it doesn't do… | |||||
"""Given a data record, determine the index's name through its ending | """Given a data record, determine the index's name through its ending | ||||
date. This varies greatly depending on the task_run's | date. This varies greatly depending on the task_run's | ||||
status. | status. | ||||
""" | """ | ||||
date = data.get('started') | date = data.get('started') | ||||
if not date: | if not date: | ||||
date = data['scheduled'] | date = data['scheduled'] | ||||
return es_client.compute_index_name(date.year, date.month) | return es_client.compute_index_name(date.year, date.month) | ||||
def index_data(before, last_id, batch_index): | def index_data(before, last_id, batch_index): | ||||
while last_id is not None: | while last_id is not None: | ||||
result = scheduler.filter_task_to_archive( | result = scheduler.filter_task_to_archive( | ||||
after, before, last_id=last_id, limit=batch_index) | after, before, last_id=last_id, limit=batch_index) | ||||
tasks_in = result['tasks'] | tasks_sorted = sorted(result['tasks'], key=group_by_index_name) | ||||
for index_name, tasks_group in itertools.groupby( | groups = itertools.groupby(tasks_sorted, key=group_by_index_name) | ||||
tasks_in, key=group_by_index_name): | for index_name, tasks_group in groups: | ||||
log.debug('Index tasks to %s' % index_name) | log.debug('Index tasks to %s' % index_name) | ||||
if dry_run: | if dry_run: | ||||
for task in tasks_group: | for task in tasks_group: | ||||
yield task | yield task | ||||
continue | continue | ||||
yield from es_client.streaming_bulk( | yield from es_client.streaming_bulk( | ||||
index_name, tasks_group, source=['task_id', 'task_run_id'], | index_name, tasks_group, source=['task_id', 'task_run_id'], | ||||
Show All 18 Lines |
I know it's off-topic, but this function should just be named get_index_name... it doesn't do any grouping.