diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py index 8f43d8e..eacf8de 100644 --- a/swh/scheduler/celery_backend/recurrent_visits.py +++ b/swh/scheduler/celery_backend/recurrent_visits.py @@ -23,6 +23,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Tuple from kombu.utils.uuid import uuid +from swh.core.statsd import statsd from swh.scheduler.celery_backend.config import get_available_slots if TYPE_CHECKING: @@ -82,9 +83,6 @@ def grab_next_visits_policy_weights( policies used to pull the next tasks, and what proportion of the available num_visits they take. - This function emits a warning if the ratio of retrieved origins is off of - the requested ratio by more than 5%. - Returns: at most ``num_visits`` :py:class:`~swh.scheduler.model.ListedOrigin` objects """ @@ -102,34 +100,38 @@ def grab_next_visits_policy_weights( for policy, ratio in policy_ratio.items(): num_tasks_to_send = int(num_visits * ratio) - fetched_origins[policy] = scheduler.grab_next_visits( + fetched_origins[policy] = origins = scheduler.grab_next_visits( visit_type, num_tasks_to_send, policy=policy ) + statsd.increment( + "swh_scheduler_recurrent_visits_origins_fetched_total", + len(origins), + tags={"visit_type": visit_type, "policy": policy}, + ) + all_origins: List[ListedOrigin] = list( chain.from_iterable(fetched_origins.values()) ) + if not all_origins: return [] - # Check whether the ratios of origins fetched are skewed with respect to the - # ones we requested - fetched_origin_ratios = { - policy: len(origins) / len(all_origins) - for policy, origins in fetched_origins.items() - } - - for policy, expected_ratio in policy_ratio.items(): - # 5% of skew with respect to request - if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05: - logger.info( - "Skewed fetch for visit type %s with policy %s: fetched %s, " - "requested %s", - visit_type, - policy, - fetched_origin_ratios[policy], - expected_ratio, - ) + if logger.isEnabledFor(logging.DEBUG): + # Check whether the ratios of origins fetched are skewed with respect to the + # ones we requested + for policy, expected_ratio in policy_ratio.items(): + fetched_ratio = len(fetched_origins[policy]) / len(all_origins) + # 5% of skew with respect to request + if abs(fetched_ratio - expected_ratio) / expected_ratio > 0.05: + logger.debug( + "Skewed fetch for visit type %s with policy %s: fetched %s, " + "requested %s", + visit_type, + policy, + fetched_ratio, + expected_ratio, + ) return all_origins @@ -204,6 +206,12 @@ def send_visits_for_visit_type( queue=queue_name, ) + statsd.increment( + "swh_scheduler_recurrent_visits_scheduled_task_total", + len(origins), + tags={"visit_type": visit_type, "task_type": queue_name}, + ) + logger.info( "%s: %s visits scheduled in queue %s", visit_type, len(origins), queue_name, )