Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/cli/origin.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from __future__ import annotations | from __future__ import annotations | ||||
from typing import TYPE_CHECKING, Iterable, List, Optional | from typing import TYPE_CHECKING, Iterable, List, Optional | ||||
import click | import click | ||||
from . import cli | from . import cli | ||||
from ..utils import create_origin_task_dict | from ..utils import create_origin_task_dicts | ||||
if TYPE_CHECKING: | if TYPE_CHECKING: | ||||
from uuid import UUID | from uuid import UUID | ||||
from ..interface import SchedulerInterface | from ..interface import SchedulerInterface | ||||
from ..model import ListedOrigin | from ..model import ListedOrigin | ||||
▲ Show 20 Lines • Show All 103 Lines • ▼ Show 20 Lines | def schedule_next(ctx, policy: str, type: str, count: int): | ||||
scheduler = ctx.obj["scheduler"] | scheduler = ctx.obj["scheduler"] | ||||
origins = scheduler.grab_next_visits(type, count, policy=policy) | origins = scheduler.grab_next_visits(type, count, policy=policy) | ||||
created = scheduler.create_tasks( | created = scheduler.create_tasks( | ||||
[ | [ | ||||
{ | { | ||||
**create_origin_task_dict(origin), | **task_dict, | ||||
"policy": "oneshot", | "policy": "oneshot", | ||||
"next_run": utcnow(), | "next_run": utcnow(), | ||||
"retries_left": 1, | "retries_left": 1, | ||||
} | } | ||||
for origin in origins | for task_dict in create_origin_task_dicts(origins, scheduler) | ||||
] | ] | ||||
) | ) | ||||
output = ["Created %d tasks\n" % len(created)] | output = ["Created %d tasks\n" % len(created)] | ||||
for task in created: | for task in created: | ||||
output.append(pretty_print_task(task)) | output.append(pretty_print_task(task)) | ||||
click.echo_via_pager("\n".join(output)) | click.echo_via_pager("\n".join(output)) | ||||
▲ Show 20 Lines • Show All 59 Lines • ▼ Show 20 Lines | origins = scheduler.grab_next_visits( | ||||
num_tasks, | num_tasks, | ||||
policy=policy, | policy=policy, | ||||
tablesample=tablesample, | tablesample=tablesample, | ||||
enabled=enabled, | enabled=enabled, | ||||
lister_uuid=lister_uuid, | lister_uuid=lister_uuid, | ||||
) | ) | ||||
click.echo(f"{len(origins)} visits to send to celery") | click.echo(f"{len(origins)} visits to send to celery") | ||||
for origin in origins: | for task_dict in create_origin_task_dicts(origins, scheduler): | ||||
task_dict = create_origin_task_dict(origin) | |||||
app.send_task( | app.send_task( | ||||
task_name, | task_name, | ||||
task_id=uuid(), | task_id=uuid(), | ||||
args=task_dict["arguments"]["args"], | args=task_dict["arguments"]["args"], | ||||
kwargs=task_dict["arguments"]["kwargs"], | kwargs=task_dict["arguments"]["kwargs"], | ||||
queue=queue_name, | queue=queue_name, | ||||
) | ) | ||||
Show All 36 Lines |