Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/cli/admin.py
Show All 22 Lines | help=( | ||||
"executed. Set to 0 (default) for a one shot." | "executed. Set to 0 (default) for a one shot." | ||||
), | ), | ||||
) | ) | ||||
@click.option( | @click.option( | ||||
"--task-type", | "--task-type", | ||||
"task_type_names", | "task_type_names", | ||||
multiple=True, | multiple=True, | ||||
default=[], | default=[], | ||||
help="Task type names (e.g load-git, load-hg, list-github-full, ...) to schedule.", | help=( | ||||
"Task types to schedule. If not provided, this iterates over every " | |||||
"task types referenced in the scheduler backend." | |||||
), | |||||
) | ) | ||||
@click.option( | @click.option( | ||||
"--with-priority/--without-priority", | "--with-priority / --without-priority", | ||||
is_flag=True, | is_flag=True, | ||||
default=False, | default=False, | ||||
help=( | help=( | ||||
"Determine if those tasks should be the ones with priority or not." | "Determine if those tasks should be the ones with priority or not." | ||||
"By default, this deals with tasks without any priority." | "By default, this deals with tasks without any priority." | ||||
), | ), | ||||
) | ) | ||||
@click.pass_context | @click.pass_context | ||||
def runner(ctx, period, task_type_names, with_priority): | def runner(ctx, period, task_type_names, with_priority): | ||||
"""Starts a swh-scheduler runner service. | """Starts a swh-scheduler runner service. | ||||
This process is responsible for checking for ready-to-run tasks and | This process is responsible for checking for ready-to-run tasks and | ||||
schedule them.""" | schedule them.""" | ||||
from swh.scheduler.celery_backend.config import build_app | from swh.scheduler.celery_backend.config import build_app | ||||
from swh.scheduler.celery_backend.runner import run_ready_tasks | from swh.scheduler.celery_backend.runner import run_ready_tasks | ||||
config = ctx.obj["config"] | config = ctx.obj["config"] | ||||
app = build_app(config.get("celery")) | app = build_app(config.get("celery")) | ||||
app.set_current() | app.set_current() | ||||
logger = logging.getLogger(__name__ + ".runner") | logger = logging.getLogger(__name__ + ".runner") | ||||
scheduler = ctx.obj["scheduler"] | scheduler = ctx.obj["scheduler"] | ||||
logger.debug("Scheduler %s" % scheduler) | logger.debug("Scheduler %s" % scheduler) | ||||
task_types = [] | task_types = [] | ||||
for task_type_name in task_type_names: | for task_type_name in task_type_names: | ||||
task_type = scheduler.get_task_type(task_type_name) | task_type = scheduler.get_task_type(task_type_name) | ||||
if not task_type: | if not task_type: | ||||
raise ValueError(f"Unknown {task_type_name}") | raise ValueError(f"Unknown {task_type_name}") | ||||
task_types.append(task_type) | task_types.append(task_type) | ||||
try: | try: | ||||
while True: | while True: | ||||
if not task_types: | |||||
task_types = scheduler.get_task_types() | |||||
logger.debug("Run ready tasks") | logger.debug("Run ready tasks") | ||||
try: | try: | ||||
ntasks = len(run_ready_tasks(scheduler, app, task_types, with_priority)) | ntasks = len(run_ready_tasks(scheduler, app, task_types, with_priority)) | ||||
if ntasks: | if ntasks: | ||||
logger.info("Scheduled %s tasks", ntasks) | logger.info("Scheduled %s tasks", ntasks) | ||||
except Exception: | except Exception: | ||||
logger.exception("Unexpected error in run_ready_tasks()") | logger.exception("Unexpected error in run_ready_tasks()") | ||||
if not period: | if not period: | ||||
▲ Show 20 Lines • Show All 60 Lines • Show Last 20 Lines |