Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/cli/admin.py
| Show All 22 Lines | help=( | ||||
| "executed. Set to 0 (default) for a one shot." | "executed. Set to 0 (default) for a one shot." | ||||
| ), | ), | ||||
| ) | ) | ||||
| @click.option( | @click.option( | ||||
| "--task-type", | "--task-type", | ||||
| "task_type_names", | "task_type_names", | ||||
| multiple=True, | multiple=True, | ||||
| default=[], | default=[], | ||||
| help="Task type names (e.g load-git, load-hg, list-github-full, ...) to schedule.", | help=( | ||||
| "Task types to schedule. If not provided, this iterates over every " | |||||
| "task types referenced in the scheduler backend." | |||||
| ), | |||||
| ) | ) | ||||
| @click.option( | @click.option( | ||||
| "--with-priority/--without-priority", | "--with-priority / --without-priority", | ||||
| is_flag=True, | is_flag=True, | ||||
| default=False, | default=False, | ||||
| help=( | help=( | ||||
| "Determine if those tasks should be the ones with priority or not." | "Determine if those tasks should be the ones with priority or not." | ||||
| "By default, this deals with tasks without any priority." | "By default, this deals with tasks without any priority." | ||||
| ), | ), | ||||
| ) | ) | ||||
| @click.pass_context | @click.pass_context | ||||
| def runner(ctx, period, task_type_names, with_priority): | def runner(ctx, period, task_type_names, with_priority): | ||||
| """Starts a swh-scheduler runner service. | """Starts a swh-scheduler runner service. | ||||
| This process is responsible for checking for ready-to-run tasks and | This process is responsible for checking for ready-to-run tasks and | ||||
| schedule them.""" | schedule them.""" | ||||
| from swh.scheduler.celery_backend.config import build_app | from swh.scheduler.celery_backend.config import build_app | ||||
| from swh.scheduler.celery_backend.runner import run_ready_tasks | from swh.scheduler.celery_backend.runner import run_ready_tasks | ||||
| config = ctx.obj["config"] | config = ctx.obj["config"] | ||||
| app = build_app(config.get("celery")) | app = build_app(config.get("celery")) | ||||
| app.set_current() | app.set_current() | ||||
| logger = logging.getLogger(__name__ + ".runner") | logger = logging.getLogger(__name__ + ".runner") | ||||
| scheduler = ctx.obj["scheduler"] | scheduler = ctx.obj["scheduler"] | ||||
| logger.debug("Scheduler %s" % scheduler) | logger.debug("Scheduler %s" % scheduler) | ||||
| task_types = [] | task_types = [] | ||||
| for task_type_name in task_type_names: | for task_type_name in task_type_names: | ||||
| task_type = scheduler.get_task_type(task_type_name) | task_type = scheduler.get_task_type(task_type_name) | ||||
| if not task_type: | if not task_type: | ||||
| raise ValueError(f"Unknown {task_type_name}") | raise ValueError(f"Unknown {task_type_name}") | ||||
| task_types.append(task_type) | task_types.append(task_type) | ||||
| try: | try: | ||||
| while True: | while True: | ||||
| if not task_types: | |||||
| task_types = scheduler.get_task_types() | |||||
| logger.debug("Run ready tasks") | logger.debug("Run ready tasks") | ||||
| try: | try: | ||||
| ntasks = len(run_ready_tasks(scheduler, app, task_types, with_priority)) | ntasks = len(run_ready_tasks(scheduler, app, task_types, with_priority)) | ||||
| if ntasks: | if ntasks: | ||||
| logger.info("Scheduled %s tasks", ntasks) | logger.info("Scheduled %s tasks", ntasks) | ||||
| except Exception: | except Exception: | ||||
| logger.exception("Unexpected error in run_ready_tasks()") | logger.exception("Unexpected error in run_ready_tasks()") | ||||
| if not period: | if not period: | ||||
| ▲ Show 20 Lines • Show All 60 Lines • Show Last 20 Lines | |||||