Page MenuHomeSoftware Heritage
Paste P1218

Command-Line Input
ActivePublic

Authored by olasd on Nov 16 2021, 2:16 PM.
diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py
index 8f43d8e..eacf8de 100644
--- a/swh/scheduler/celery_backend/recurrent_visits.py
+++ b/swh/scheduler/celery_backend/recurrent_visits.py
@@ -23,6 +23,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Tuple
from kombu.utils.uuid import uuid
+from swh.core.statsd import statsd
from swh.scheduler.celery_backend.config import get_available_slots
if TYPE_CHECKING:
@@ -82,9 +83,6 @@ def grab_next_visits_policy_weights(
policies used to pull the next tasks, and what proportion of the available
num_visits they take.
- This function emits a warning if the ratio of retrieved origins is off of
- the requested ratio by more than 5%.
-
Returns:
at most ``num_visits`` :py:class:`~swh.scheduler.model.ListedOrigin` objects
"""
@@ -102,34 +100,38 @@ def grab_next_visits_policy_weights(
for policy, ratio in policy_ratio.items():
num_tasks_to_send = int(num_visits * ratio)
- fetched_origins[policy] = scheduler.grab_next_visits(
+ fetched_origins[policy] = origins = scheduler.grab_next_visits(
visit_type, num_tasks_to_send, policy=policy
)
+ statsd.increment(
+ "swh_scheduler_recurrent_visits_origins_fetched_total",
+ len(origins),
+ tags={"visit_type": visit_type, "policy": policy},
+ )
+
all_origins: List[ListedOrigin] = list(
chain.from_iterable(fetched_origins.values())
)
+
if not all_origins:
return []
- # Check whether the ratios of origins fetched are skewed with respect to the
- # ones we requested
- fetched_origin_ratios = {
- policy: len(origins) / len(all_origins)
- for policy, origins in fetched_origins.items()
- }
-
- for policy, expected_ratio in policy_ratio.items():
- # 5% of skew with respect to request
- if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05:
- logger.info(
- "Skewed fetch for visit type %s with policy %s: fetched %s, "
- "requested %s",
- visit_type,
- policy,
- fetched_origin_ratios[policy],
- expected_ratio,
- )
+ if logger.isEnabledFor(logging.DEBUG):
+ # Check whether the ratios of origins fetched are skewed with respect to the
+ # ones we requested
+ for policy, expected_ratio in policy_ratio.items():
+ fetched_ratio = len(fetched_origins[policy]) / len(all_origins)
+ # 5% of skew with respect to request
+ if abs(fetched_ratio - expected_ratio) / expected_ratio > 0.05:
+ logger.debug(
+ "Skewed fetch for visit type %s with policy %s: fetched %s, "
+ "requested %s",
+ visit_type,
+ policy,
+ fetched_ratio,
+ expected_ratio,
+ )
return all_origins
@@ -204,6 +206,12 @@ def send_visits_for_visit_type(
queue=queue_name,
)
+ statsd.increment(
+ "swh_scheduler_recurrent_visits_scheduled_task_total",
+ len(origins),
+ tags={"visit_type": visit_type, "task_type": queue_name},
+ )
+
logger.info(
"%s: %s visits scheduled in queue %s", visit_type, len(origins), queue_name,
)