Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/recurrent_visits.py
| Show All 18 Lines | |||||
| import random | import random | ||||
| from threading import Thread | from threading import Thread | ||||
| import time | import time | ||||
| from typing import TYPE_CHECKING, Any, Dict, List, Tuple | from typing import TYPE_CHECKING, Any, Dict, List, Tuple | ||||
| from kombu.utils.uuid import uuid | from kombu.utils.uuid import uuid | ||||
| from swh.scheduler.celery_backend.config import get_available_slots | from swh.scheduler.celery_backend.config import get_available_slots | ||||
| from swh.scheduler.utils import create_origin_task_dict | from swh.scheduler.utils import create_origin_task_dicts | ||||
| if TYPE_CHECKING: | if TYPE_CHECKING: | ||||
| from ..interface import SchedulerInterface | from ..interface import SchedulerInterface | ||||
| from ..model import ListedOrigin | from ..model import ListedOrigin | ||||
| logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
| ▲ Show 20 Lines • Show All 192 Lines • ▼ Show 20 Lines | ) -> float: | ||||
| if not origins: | if not origins: | ||||
| logger.debug("No origins to visit for type %s", visit_type) | logger.debug("No origins to visit for type %s", visit_type) | ||||
| return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF | return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF | ||||
| # Try to smooth the ingestion load, origins pulled by different | # Try to smooth the ingestion load, origins pulled by different | ||||
| # scheduling policies have different resource usage patterns | # scheduling policies have different resource usage patterns | ||||
| random.shuffle(origins) | random.shuffle(origins) | ||||
| for origin in origins: | for task_dict in create_origin_task_dicts(origins, scheduler): | ||||
| task_dict = create_origin_task_dict(origin) | |||||
| app.send_task( | app.send_task( | ||||
| queue_name, | queue_name, | ||||
| task_id=uuid(), | task_id=uuid(), | ||||
| args=task_dict["arguments"]["args"], | args=task_dict["arguments"]["args"], | ||||
| kwargs=task_dict["arguments"]["kwargs"], | kwargs=task_dict["arguments"]["kwargs"], | ||||
| queue=queue_name, | queue=queue_name, | ||||
| ) | ) | ||||
| ▲ Show 20 Lines • Show All 121 Lines • Show Last 20 Lines | |||||