diff --git a/ardumont/schedule_with_queue_length_check.py b/ardumont/schedule_with_queue_length_check.py index 938d5b1..2d80a69 100755 --- a/ardumont/schedule_with_queue_length_check.py +++ b/ardumont/schedule_with_queue_length_check.py @@ -1,105 +1,142 @@ #!/usr/bin/env python3 # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import sys import time -from swh.scheduler.celery_backend.config import app as main_app +try: + from swh.indexer.producer import gen_sha1 +except ImportError: + pass +from swh.scheduler.celery_backend.config import app as main_app # Max batch size for tasks MAX_NUM_TASKS = 10000 +MAX_WAITING_TIME = 10 + + +def stdin_to_svn_tasks(): + """Generates from stdin the proper task argument for the loader-svn + worker. + + """ + for line in sys.stdin: + line = line.rstrip() + values = line.split(' ') + origin = values[0] + path = values[1] + visit_date = 'Tue, 3 May 2016 17:16:32 +0200' + yield { + 'arguments': { + 'args': [path, origin, visit_date], + 'kwargs': { + 'start_from_scratch': True, + } + }, + } + + +def stdin_to_index_tasks(): + """Generates from stdin the proper task argument for the orchestrator. + + """ + for sha1s in gen_sha1(batch=1000): + yield { + 'arguments': { + 'args': [sha1s], + 'kwargs': {} + }, + } + QUEUES = { - 'svndump': 'swh.loader.svn.tasks.MountAndLoadSvnRepositoryTsk', + 'svndump': { # for svn, we use the same queue for length checking + # and scheduling + 'task_name': 'swh.loader.svn.tasks.MountAndLoadSvnRepositoryTsk', + # to_task the function to use to transform the input in task + 'task_fn_generator': stdin_to_svn_tasks, + }, + 'indexer': { # for indexer, we schedule using the orchestrator's queue + # we check the length on the mimetype queue though + 'task_name': 'swh.indexer.tasks.SWHOrchestratorAllContentsTask', + 'queue_to_check': 'swh.indexer.tasks.SWHContentMimetypeTask', + 'task_fn_generator': stdin_to_index_tasks, + } } @click.command(help='Read from stdin and send message to queue ') @click.option('--queue-name', help='Queue concerned') @click.option('--threshold', help='Threshold for the queue', + type=click.INT, default=MAX_NUM_TASKS) -def main(queue_name, threshold, app=main_app): +@click.option('--waiting-time', help='Waiting time between checks', + type=click.INT, + default=MAX_WAITING_TIME) +def main(queue_name, threshold, waiting_time, app=main_app): if queue_name not in QUEUES: raise ValueError("Unsupported %s, possible values: %s" % ( queue_name, QUEUES)) for module in app.conf.CELERY_IMPORTS: __import__(module) - task_name = QUEUES[queue_name] - task = app.tasks[task_name] - print(task) - queue_name = task.task_queue + queue_information = QUEUES[queue_name] + task_name = queue_information['task_name'] + scheduling_task = app.tasks[task_name] + + queue_to_check = queue_information.get('queue_to_check', task_name) + checking_task = app.tasks[queue_to_check] + checking_queue_name = checking_task.task_queue while True: throttled = False remains_data = False pending_tasks = [] - queue_length = app.get_queue_length(queue_name) - print('##### queue name: %s' % queue_name) - print('##### threshold: %s' % threshold) - print('##### queue length: %s' % queue_length) + queue_length = app.get_queue_length(checking_queue_name) if queue_length < threshold: nb_tasks_to_send = threshold - queue_length else: # queue_length >= threshold nb_tasks_to_send = 0 throttled = True - print('##### nb tasks to send: %s' % nb_tasks_to_send) - if nb_tasks_to_send > 0: - print('##### %s to send' % nb_tasks_to_send) count = 0 - for line in sys.stdin: - line = line.rstrip() - values = line.split(' ') - origin = values[0] - path = values[1] - - visit_date = 'Tue, 3 May 2016 17:16:32 +0200' - - _task = { - 'arguments': { - 'args': [path, origin, visit_date], - 'kwargs': { - 'start_from_scratch': True, - } - }, - } - print(_task) + task_fn = queue_information['task_fn_generator'] + for _task in task_fn(): pending_tasks.append(_task) count += 1 if count >= nb_tasks_to_send: throttled = True remains_data = True break if not pending_tasks: # check for some more data on stdin if not remains_data: # if no more data, we break to exit break for _task in pending_tasks: args = _task['arguments']['args'] kwargs = _task['arguments']['kwargs'] - - task.delay(*args, **kwargs) + scheduling_task.delay(*args, **kwargs) + print(_task['arguments']) if throttled: - time.sleep(10) + time.sleep(waiting_time) if __name__ == '__main__': main()