Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/cli/task.py
Show First 20 Lines • Show All 530 Lines • ▼ Show 20 Lines | def group_by_index_name(data, es_client=es_client): | ||||
status. | status. | ||||
""" | """ | ||||
date = data.get('started') | date = data.get('started') | ||||
if not date: | if not date: | ||||
date = data['scheduled'] | date = data['scheduled'] | ||||
return es_client.compute_index_name(date.year, date.month) | return es_client.compute_index_name(date.year, date.month) | ||||
def index_data(before, last_id, batch_index): | def index_data(before, last_id, batch_index): | ||||
tasks_in = scheduler.filter_task_to_archive( | while last_id is not None: | ||||
result = scheduler.filter_task_to_archive( | |||||
after, before, last_id=last_id, limit=batch_index) | after, before, last_id=last_id, limit=batch_index) | ||||
tasks_in = result['tasks'] | |||||
for index_name, tasks_group in itertools.groupby( | for index_name, tasks_group in itertools.groupby( | ||||
tasks_in, key=group_by_index_name): | tasks_in, key=group_by_index_name): | ||||
log.debug('Index tasks to %s' % index_name) | log.debug('Index tasks to %s' % index_name) | ||||
if dry_run: | if dry_run: | ||||
for task in tasks_group: | for task in tasks_group: | ||||
yield task | yield task | ||||
continue | continue | ||||
yield from es_client.streaming_bulk( | yield from es_client.streaming_bulk( | ||||
index_name, tasks_group, source=['task_id', 'task_run_id'], | index_name, tasks_group, source=['task_id', 'task_run_id'], | ||||
chunk_size=bulk_index, log=log) | chunk_size=bulk_index, log=log) | ||||
last_id = result.get('next_task_id') | |||||
vlorentz: The `last_id` as a loop variable should be renamed `page_token`, because it should be an opaque… | |||||
Done Inline Actionswe do need it, i'm trying to avoid repeating the while truc loop. gen = index_data... if cleanup: # do something with gen else: # do something else with gen ardumont: we do need it, i'm trying to avoid repeating the while truc loop.
Take a look below
```
gen =… | |||||
Done Inline Actionsi meant while True ;) ardumont: i meant `while True` ;) | |||||
gen = index_data(before, last_id=start_from, batch_index=batch_index) | gen = index_data(before, last_id=start_from, batch_index=batch_index) | ||||
if cleanup: | if cleanup: | ||||
for task_ids in grouper(gen, n=batch_clean): | for task_ids in grouper(gen, n=batch_clean): | ||||
task_ids = list(task_ids) | task_ids = list(task_ids) | ||||
log.info('Clean up %s tasks: [%s, ...]' % ( | log.info('Clean up %s tasks: [%s, ...]' % ( | ||||
len(task_ids), task_ids[0])) | len(task_ids), task_ids[0])) | ||||
if dry_run: # no clean up | if dry_run: # no clean up | ||||
continue | continue | ||||
ctx.obj['scheduler'].delete_archived_tasks(task_ids) | ctx.obj['scheduler'].delete_archived_tasks(task_ids) | ||||
else: | else: | ||||
for task_ids in grouper(gen, n=batch_index): | for task_ids in grouper(gen, n=batch_index): | ||||
task_ids = list(task_ids) | task_ids = list(task_ids) | ||||
log.info('Indexed %s tasks: [%s, ...]' % ( | log.info('Indexed %s tasks: [%s, ...]' % ( | ||||
len(task_ids), task_ids[0])) | len(task_ids), task_ids[0])) | ||||
Not Done Inline ActionsYou're still not using it as an opaque identifier if it comes from a CLI option documented as "task id to start from". But I guess there is no easy way other than this one vlorentz: You're still not using it as an opaque identifier if it comes from a CLI option documented as… |
The last_id as a loop variable should be renamed page_token, because it should be an opaque identifier.
As for its use as argument of index_data, I don't know what to do with it. Maybe add an extra parameter to filter_task_to_archive, that is ignored if page_token is not None? Or just remove it altogether if we don't actually need it?