Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/cli/admin.py
# Copyright (C) 2016-2021 The Software Heritage developers | # Copyright (C) 2016-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from __future__ import annotations | |||||
# WARNING: do not import unnecessary things here to keep cli startup time under | # WARNING: do not import unnecessary things here to keep cli startup time under | ||||
# control | # control | ||||
import logging | import logging | ||||
import time | import time | ||||
from typing import List, Tuple | |||||
import click | import click | ||||
from . import cli | from . import cli | ||||
@cli.command("start-runner") | @cli.command("start-runner") | ||||
@click.option( | @click.option( | ||||
Show All 34 Lines | def runner(ctx, period, task_type_names, with_priority): | ||||
from swh.scheduler.celery_backend.runner import run_ready_tasks | from swh.scheduler.celery_backend.runner import run_ready_tasks | ||||
config = ctx.obj["config"] | config = ctx.obj["config"] | ||||
app = build_app(config.get("celery")) | app = build_app(config.get("celery")) | ||||
app.set_current() | app.set_current() | ||||
logger = logging.getLogger(__name__ + ".runner") | logger = logging.getLogger(__name__ + ".runner") | ||||
scheduler = ctx.obj["scheduler"] | scheduler = ctx.obj["scheduler"] | ||||
logger.debug("Scheduler %s" % scheduler) | logger.debug("Scheduler %s", scheduler) | ||||
task_types = [] | task_types = [] | ||||
for task_type_name in task_type_names: | for task_type_name in task_type_names: | ||||
task_type = scheduler.get_task_type(task_type_name) | task_type = scheduler.get_task_type(task_type_name) | ||||
if not task_type: | if not task_type: | ||||
raise ValueError(f"Unknown {task_type_name}") | raise ValueError(f"Unknown {task_type_name}") | ||||
task_types.append(task_type) | task_types.append(task_type) | ||||
try: | try: | ||||
Show All 33 Lines | def listener(ctx): | ||||
listener = get_listener(broker, "celeryev.listener", scheduler_backend) | listener = get_listener(broker, "celeryev.listener", scheduler_backend) | ||||
try: | try: | ||||
listener.start_consuming() | listener.start_consuming() | ||||
finally: | finally: | ||||
listener.stop_consuming() | listener.stop_consuming() | ||||
@cli.command("schedule-recurrent") | |||||
@click.option( | |||||
"--visit-type", | |||||
"visit_types", | |||||
multiple=True, | |||||
default=[], | |||||
help=( | |||||
"Visit types to schedule. If not provided, this iterates over every " | |||||
"corresponding load task types referenced in the scheduler backend." | |||||
), | |||||
) | |||||
@click.pass_context | |||||
def schedule_recurrent(ctx, period: int, visit_types: List[str]): | |||||
"""Starts the scheduler for recurrent visits. | |||||
This runs one thread for each visit type, which regularly sends new visits | |||||
to celery. | |||||
""" | |||||
from queue import Queue | |||||
from swh.scheduler.celery_backend.recurrent_visits import ( | |||||
VisitSchedulerThreads, | |||||
logger, | |||||
spawn_visit_scheduler_thread, | |||||
terminate_visit_scheduler_threads, | |||||
) | |||||
config = ctx.obj["config"] | |||||
scheduler = ctx.obj["scheduler"] | |||||
if not visit_types: | |||||
visit_types = [] | |||||
# Figure out which visit types exist in the scheduler | |||||
all_task_types = scheduler.get_task_types() | |||||
for task_type in all_task_types: | |||||
if not task_type["type"].startswith("load-"): | |||||
# only consider loading tasks as recurring ones, the rest is dismissed | |||||
continue | |||||
# get visit type name from task type | |||||
visit_types.append(task_type["type"][5:]) | |||||
else: | |||||
# Check that the passed visit types exist in the scheduler | |||||
for visit_type in visit_types: | |||||
task_type_name = f"load-{visit_type}" | |||||
task_type = scheduler.get_task_type(task_type_name) | |||||
if not task_type: | |||||
raise ValueError(f"Unknown task type: {task_type_name}") | |||||
exc_queue: Queue[Tuple[str, BaseException]] = Queue() | |||||
threads: VisitSchedulerThreads = {} | |||||
try: | |||||
# Spawn initial threads | |||||
for visit_type in visit_types: | |||||
spawn_visit_scheduler_thread(threads, exc_queue, config, visit_type) | |||||
# Handle exceptions from child threads | |||||
while True: | |||||
visit_type, exc_info = exc_queue.get(block=True) | |||||
logger.exception( | |||||
"Thread %s died with exception; respawning", | |||||
visit_type, | |||||
exc_info=exc_info, | |||||
) | |||||
dead_thread = threads[visit_type][0] | |||||
dead_thread.join(timeout=1) | |||||
if dead_thread.is_alive(): | |||||
logger.warn( | |||||
"The thread for %s is still alive after sending an exception?! " | |||||
"Respawning anyway.", | |||||
visit_type, | |||||
) | |||||
spawn_visit_scheduler_thread(threads, exc_queue, config, visit_type) | |||||
except SystemExit: | |||||
remaining_threads = terminate_visit_scheduler_threads(threads) | |||||
if remaining_threads: | |||||
ctx.exit(1) | |||||
ctx.exit(0) | |||||
@cli.command("rpc-serve") | @cli.command("rpc-serve") | ||||
@click.option("--host", default="0.0.0.0", help="Host to run the scheduler server api") | @click.option("--host", default="0.0.0.0", help="Host to run the scheduler server api") | ||||
@click.option("--port", default=5008, type=click.INT, help="Binding port of the server") | @click.option("--port", default=5008, type=click.INT, help="Binding port of the server") | ||||
@click.option( | @click.option( | ||||
"--debug/--nodebug", | "--debug/--nodebug", | ||||
default=None, | default=None, | ||||
help=( | help=( | ||||
"Indicates if the server should run in debug mode. " | "Indicates if the server should run in debug mode. " | ||||
Show All 20 Lines |