Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/cli/origin.py
Show First 20 Lines • Show All 139 Lines • ▼ Show 20 Lines | def schedule_next(ctx, policy: str, type: str, count: int): | ||||
output = ["Created %d tasks\n" % len(created)] | output = ["Created %d tasks\n" % len(created)] | ||||
for task in created: | for task in created: | ||||
output.append(pretty_print_task(task)) | output.append(pretty_print_task(task)) | ||||
click.echo_via_pager("\n".join(output)) | click.echo_via_pager("\n".join(output)) | ||||
@origin.command("send-to-celery") | |||||
@click.option( | |||||
"--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" | |||||
) | |||||
@click.option( | |||||
"--queue", "-q", help="Target celery queue", type=str, | |||||
) | |||||
@click.option( | |||||
"--tablesample", help="Table sampling percentage", type=float, | |||||
) | |||||
@click.argument("type", type=str) | |||||
@click.pass_context | |||||
def send_to_celery( | |||||
ctx, policy: str, queue: Optional[str], tablesample: Optional[float], type: str | |||||
): | |||||
"""Send the next origin visits of the TYPE loader to celery, filling the queue.""" | |||||
from kombu.utils.uuid import uuid | |||||
from swh.scheduler.celery_backend.config import app, get_available_slots | |||||
scheduler = ctx.obj["scheduler"] | |||||
task_type = scheduler.get_task_type(f"load-{type}") | |||||
task_name = task_type["backend_name"] | |||||
queue_name = queue or task_name | |||||
num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) | |||||
print(num_tasks, "slots available in celery queue") | |||||
origins = scheduler.grab_next_visits( | |||||
type, num_tasks, policy=policy, tablesample=tablesample | |||||
) | |||||
print(len(origins), "visits to send to celery") | |||||
for origin in origins: | |||||
task_dict = origin.as_task_dict() | |||||
app.send_task( | |||||
task_name, | |||||
task_id=uuid(), | |||||
args=task_dict["arguments"]["args"], | |||||
kwargs=task_dict["arguments"]["kwargs"], | |||||
queue=queue_name, | |||||
) | |||||
@origin.command("update-metrics") | @origin.command("update-metrics") | ||||
@click.option("--lister", default=None, help="Only update metrics for this lister") | @click.option("--lister", default=None, help="Only update metrics for this lister") | ||||
@click.option( | @click.option( | ||||
"--instance", default=None, help="Only update metrics for this lister instance" | "--instance", default=None, help="Only update metrics for this lister instance" | ||||
) | ) | ||||
@click.pass_context | @click.pass_context | ||||
def update_metrics(ctx, lister: Optional[str], instance: Optional[str]): | def update_metrics(ctx, lister: Optional[str], instance: Optional[str]): | ||||
"""Update the scheduler metrics on listed origins. | """Update the scheduler metrics on listed origins. | ||||
Show All 27 Lines |