Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/recurrent_visits.py
Show All 18 Lines | |||||
import random | import random | ||||
from threading import Thread | from threading import Thread | ||||
import time | import time | ||||
from typing import TYPE_CHECKING, Any, Dict, List, Tuple | from typing import TYPE_CHECKING, Any, Dict, List, Tuple | ||||
from kombu.utils.uuid import uuid | from kombu.utils.uuid import uuid | ||||
from swh.scheduler.celery_backend.config import get_available_slots | from swh.scheduler.celery_backend.config import get_available_slots | ||||
from swh.scheduler.utils import create_origin_task_dict | from swh.scheduler.utils import create_origin_task_dicts | ||||
if TYPE_CHECKING: | if TYPE_CHECKING: | ||||
from ..interface import SchedulerInterface | from ..interface import SchedulerInterface | ||||
from ..model import ListedOrigin | from ..model import ListedOrigin | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
▲ Show 20 Lines • Show All 192 Lines • ▼ Show 20 Lines | ) -> float: | ||||
if not origins: | if not origins: | ||||
logger.debug("No origins to visit for type %s", visit_type) | logger.debug("No origins to visit for type %s", visit_type) | ||||
return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF | return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF | ||||
# Try to smooth the ingestion load, origins pulled by different | # Try to smooth the ingestion load, origins pulled by different | ||||
# scheduling policies have different resource usage patterns | # scheduling policies have different resource usage patterns | ||||
random.shuffle(origins) | random.shuffle(origins) | ||||
for origin in origins: | for task_dict in create_origin_task_dicts(origins, scheduler): | ||||
task_dict = create_origin_task_dict(origin) | |||||
app.send_task( | app.send_task( | ||||
queue_name, | queue_name, | ||||
task_id=uuid(), | task_id=uuid(), | ||||
args=task_dict["arguments"]["args"], | args=task_dict["arguments"]["args"], | ||||
kwargs=task_dict["arguments"]["kwargs"], | kwargs=task_dict["arguments"]["kwargs"], | ||||
queue=queue_name, | queue=queue_name, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 121 Lines • Show Last 20 Lines |