Page MenuHomeSoftware Heritage

D5809.id21156.diff
No OneTemporary

D5809.id21156.diff

diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -316,22 +316,13 @@
count: int,
policy: str,
timestamp: Optional[datetime.datetime] = None,
+ scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7),
+ failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14),
+ notfound_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31),
+ tablesample: Optional[float] = None,
db=None,
cur=None,
) -> List[ListedOrigin]:
- """Get at most the `count` next origins that need to be visited with
- the `visit_type` loader according to the given scheduling `policy`.
-
- This will mark the origins as scheduled in the origin_visit_stats
- table, to avoid scheduling multiple visits to the same origin.
-
- Arguments:
- visit_type: type of visits to schedule
- count: number of visits to schedule
- policy: the scheduling policy used to select which visits to schedule
- timestamp: the mocked timestamp at which we're recording that the visits are
- being scheduled (defaults to the current time)
- """
if timestamp is None:
timestamp = utcnow()
@@ -348,21 +339,41 @@
where_clauses.append("visit_type = %s")
query_args.append(visit_type)
- # Don't re-schedule visits if they're already scheduled but we haven't
- # recorded a result yet, unless they've been scheduled more than a week
- # ago (it probably means we've lost them in flight somewhere).
- where_clauses.append(
- """origin_visit_stats.last_scheduled IS NULL
- OR origin_visit_stats.last_scheduled < GREATEST(
- %s - '7 day'::interval,
- origin_visit_stats.last_eventful,
- origin_visit_stats.last_uneventful,
- origin_visit_stats.last_failed,
- origin_visit_stats.last_notfound
+ if scheduled_cooldown:
+ # Don't re-schedule visits if they're already scheduled but we haven't
+ # recorded a result yet, unless they've been scheduled more than a week
+ # ago (it probably means we've lost them in flight somewhere).
+ where_clauses.append(
+ """origin_visit_stats.last_scheduled IS NULL
+ OR origin_visit_stats.last_scheduled < GREATEST(
+ %s - %s,
+ origin_visit_stats.last_eventful,
+ origin_visit_stats.last_uneventful,
+ origin_visit_stats.last_failed,
+ origin_visit_stats.last_notfound
+ )
+ """
)
- """
- )
- query_args.append(timestamp)
+ query_args.append(timestamp)
+ query_args.append(scheduled_cooldown)
+
+ if failed_cooldown:
+ # Don't retry failed origins too often
+ where_clauses.append(
+ "origin_visit_stats.last_failed is null "
+ "or origin_visit_stats.last_failed < %s - %s"
+ )
+ query_args.append(timestamp)
+ query_args.append(failed_cooldown)
+
+ if notfound_cooldown:
+ # Don't retry not found origins too often
+ where_clauses.append(
+ "origin_visit_stats.last_notfound is null "
+ "or origin_visit_stats.last_notfound < %s - %s"
+ )
+ query_args.append(timestamp)
+ query_args.append(notfound_cooldown)
if policy == "oldest_scheduled_first":
order_by = "origin_visit_stats.last_scheduled NULLS FIRST"
@@ -373,6 +384,15 @@
# order by increasing last_update (oldest first)
where_clauses.append("listed_origins.last_update IS NOT NULL")
order_by = "listed_origins.last_update"
+ elif policy == "never_visited_unknown_last_update":
+ # never visited origins have a NULL last_snapshot
+ where_clauses.append("origin_visit_stats.last_snapshot IS NULL")
+
+ # Unknown last update
+ where_clauses.append("listed_origins.last_update IS NULL")
+
+ # Try to get at the oldest origins first
+ order_by = "listed_origins.first_seen"
elif policy == "already_visited_order_by_lag":
# TODO: store "visit lag" in a materialized view?
@@ -403,11 +423,17 @@
else:
raise UnknownPolicy(f"Unknown scheduling policy {policy}")
+ if tablesample:
+ table = "listed_origins tablesample SYSTEM (%s)"
+ query_args.insert(0, tablesample)
+ else:
+ table = "listed_origins"
+
select_query = f"""
SELECT
{origin_select_cols}
FROM
- listed_origins
+ {table}
LEFT JOIN
origin_visit_stats USING (url, visit_type)
WHERE
diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py
--- a/swh/scheduler/cli/origin.py
+++ b/swh/scheduler/cli/origin.py
@@ -145,6 +145,52 @@
click.echo_via_pager("\n".join(output))
+@origin.command("send-to-celery")
+@click.option(
+ "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy"
+)
+@click.option(
+ "--queue", "-q", help="Target celery queue", type=str,
+)
+@click.option(
+ "--tablesample", help="Table sampling percentage", type=float,
+)
+@click.argument("type", type=str)
+@click.pass_context
+def send_to_celery(
+ ctx, policy: str, queue: Optional[str], tablesample: Optional[float], type: str
+):
+ """Send the next origin visits of the TYPE loader to celery, filling the queue."""
+ from kombu.utils.uuid import uuid
+
+ from swh.scheduler.celery_backend.config import app, get_available_slots
+
+ scheduler = ctx.obj["scheduler"]
+
+ task_type = scheduler.get_task_type(f"load-{type}")
+
+ task_name = task_type["backend_name"]
+ queue_name = queue or task_name
+
+ num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"])
+
+ print(num_tasks, "slots available in celery queue")
+ origins = scheduler.grab_next_visits(
+ type, num_tasks, policy=policy, tablesample=tablesample
+ )
+
+ print(len(origins), "visits to send to celery")
+ for origin in origins:
+ task_dict = origin.as_task_dict()
+ app.send_task(
+ task_name,
+ task_id=uuid(),
+ args=task_dict["arguments"]["args"],
+ kwargs=task_dict["arguments"]["kwargs"],
+ queue=queue_name,
+ )
+
+
@origin.command("update-metrics")
@click.option("--lister", default=None, help="Only update metrics for this lister")
@click.option(
diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py
--- a/swh/scheduler/interface.py
+++ b/swh/scheduler/interface.py
@@ -389,6 +389,10 @@
count: int,
policy: str,
timestamp: Optional[datetime.datetime] = None,
+ scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7),
+ failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14),
+ notfound_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31),
+ tablesample: Optional[float] = None,
) -> List[ListedOrigin]:
"""Get at most the `count` next origins that need to be visited with
the `visit_type` loader according to the given scheduling `policy`.
@@ -402,6 +406,14 @@
policy: the scheduling policy used to select which visits to schedule
timestamp: the mocked timestamp at which we're recording that the visits are
being scheduled (defaults to the current time)
+ scheduled_cooldown: the minimal interval before which we can schedule
+ the same origin again
+ failed_cooldown: the minimal interval before which we can reschedule a
+ failed origin
+ notfound_cooldown: the minimal interval before which we can reschedule a
+ notfound origin
+ tablesample: the percentage of the table on which we run the query
+ (None: no sampling)
"""
...

File Metadata

Mime Type
text/plain
Expires
Jul 3 2025, 6:13 PM (4 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3231248

Event Timeline