Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/celery_backend/runner.py
Show All 14 Lines | |||||
from swh.scheduler.utils import utcnow | from swh.scheduler.utils import utcnow | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
def run_ready_tasks( | def run_ready_tasks( | ||||
backend: SchedulerInterface, | backend: SchedulerInterface, | ||||
app, | app, | ||||
task_types: List[Dict] = [], | task_types_list: List[Dict] = [], | ||||
douardda: i don't really see the purpose of renaming this variable (since it's properly type annotated)… | |||||
with_priority: bool = False, | with_priority: bool = False, | ||||
) -> List[Dict]: | ) -> List[Dict]: | ||||
"""Schedule tasks ready to be scheduled. | """Schedule tasks ready to be scheduled. | ||||
This lookups any tasks per task type and mass schedules those accordingly (send | This lookups any tasks per task type and mass schedules those accordingly (send | ||||
messages to rabbitmq and mark as scheduled equivalent tasks in the scheduler | messages to rabbitmq and mark as scheduled equivalent tasks in the scheduler | ||||
backend). | backend). | ||||
Show All 22 Lines | Returns: | ||||
backend_tasks = run_ready_tasks(self.scheduler, app) | backend_tasks = run_ready_tasks(self.scheduler, app) | ||||
for task in backend_tasks: | for task in backend_tasks: | ||||
AsyncResult(id=task['backend_id']).get() | AsyncResult(id=task['backend_id']).get() | ||||
""" | """ | ||||
all_backend_tasks: List[Dict] = [] | all_backend_tasks: List[Dict] = [] | ||||
while True: | while True: | ||||
if not task_types: | if not task_types_list: | ||||
Done Inline ActionsThis made sense for an old implementation which no longer is the case here. ardumont: This made sense for an old implementation which no longer is the case here.
I'll revert as well. | |||||
task_types = backend.get_task_types() | task_types_list = backend.get_task_types() | ||||
task_types_d = {} | task_types_d = {} | ||||
pending_tasks = [] | pending_tasks = [] | ||||
for task_type in task_types: | for task_type in task_types_list: | ||||
task_type_name = task_type["type"] | task_type_name = task_type["type"] | ||||
task_types_d[task_type_name] = task_type | task_types_d[task_type_name] = task_type | ||||
max_queue_length = task_type["max_queue_length"] or 0 | max_queue_length = task_type["max_queue_length"] or 0 | ||||
backend_name = task_type["backend_name"] | backend_name = task_type["backend_name"] | ||||
if with_priority: | if with_priority: | ||||
# grab max_queue_length (or 10) potential tasks with any priority for | # grab max_queue_length (or 10) potential tasks with any priority for | ||||
# the same type (limit the result to avoid too long running queries) | # the same type (limit the result to avoid too long running queries) | ||||
▲ Show 20 Lines • Show All 90 Lines • Show Last 20 Lines |
i don't really see the purpose of renaming this variable (since it's properly type annotated), but meh