Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/recurrent_visits.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
"""This schedules the recurrent visits, for listed origins, in Celery. | """This schedules the recurrent visits, for listed origins, in Celery. | ||||
For "oneshot" (save code now, lister) tasks, check the | For "oneshot" (save code now, lister) tasks, check the | ||||
:mod:`swh.scheduler.celery_backend.runner` and | :mod:`swh.scheduler.celery_backend.runner` and | ||||
Show All 9 Lines | |||||
import random | import random | ||||
from threading import Thread | from threading import Thread | ||||
import time | import time | ||||
from typing import TYPE_CHECKING, Any, Dict, List, Tuple | from typing import TYPE_CHECKING, Any, Dict, List, Tuple | ||||
from kombu.utils.uuid import uuid | from kombu.utils.uuid import uuid | ||||
from swh.scheduler.celery_backend.config import get_available_slots | from swh.scheduler.celery_backend.config import get_available_slots | ||||
from swh.scheduler.utils import create_origin_task_dict | |||||
if TYPE_CHECKING: | if TYPE_CHECKING: | ||||
from ..interface import SchedulerInterface | from ..interface import SchedulerInterface | ||||
from ..model import ListedOrigin | from ..model import ListedOrigin | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
▲ Show 20 Lines • Show All 193 Lines • ▼ Show 20 Lines | if not origins: | ||||
logger.debug("No origins to visit for type %s", visit_type) | logger.debug("No origins to visit for type %s", visit_type) | ||||
return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF | return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF | ||||
# Try to smooth the ingestion load, origins pulled by different | # Try to smooth the ingestion load, origins pulled by different | ||||
# scheduling policies have different resource usage patterns | # scheduling policies have different resource usage patterns | ||||
random.shuffle(origins) | random.shuffle(origins) | ||||
for origin in origins: | for origin in origins: | ||||
task_dict = origin.as_task_dict() | task_dict = create_origin_task_dict(origin) | ||||
app.send_task( | app.send_task( | ||||
queue_name, | queue_name, | ||||
task_id=uuid(), | task_id=uuid(), | ||||
args=task_dict["arguments"]["args"], | args=task_dict["arguments"]["args"], | ||||
kwargs=task_dict["arguments"]["kwargs"], | kwargs=task_dict["arguments"]["kwargs"], | ||||
queue=queue_name, | queue=queue_name, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 121 Lines • Show Last 20 Lines |