Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9348110
D5809.id21156.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
8 KB
Subscribers
None
D5809.id21156.diff
View Options
diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py
--- a/swh/scheduler/backend.py
+++ b/swh/scheduler/backend.py
@@ -316,22 +316,13 @@
count: int,
policy: str,
timestamp: Optional[datetime.datetime] = None,
+ scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7),
+ failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14),
+ notfound_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31),
+ tablesample: Optional[float] = None,
db=None,
cur=None,
) -> List[ListedOrigin]:
- """Get at most the `count` next origins that need to be visited with
- the `visit_type` loader according to the given scheduling `policy`.
-
- This will mark the origins as scheduled in the origin_visit_stats
- table, to avoid scheduling multiple visits to the same origin.
-
- Arguments:
- visit_type: type of visits to schedule
- count: number of visits to schedule
- policy: the scheduling policy used to select which visits to schedule
- timestamp: the mocked timestamp at which we're recording that the visits are
- being scheduled (defaults to the current time)
- """
if timestamp is None:
timestamp = utcnow()
@@ -348,21 +339,41 @@
where_clauses.append("visit_type = %s")
query_args.append(visit_type)
- # Don't re-schedule visits if they're already scheduled but we haven't
- # recorded a result yet, unless they've been scheduled more than a week
- # ago (it probably means we've lost them in flight somewhere).
- where_clauses.append(
- """origin_visit_stats.last_scheduled IS NULL
- OR origin_visit_stats.last_scheduled < GREATEST(
- %s - '7 day'::interval,
- origin_visit_stats.last_eventful,
- origin_visit_stats.last_uneventful,
- origin_visit_stats.last_failed,
- origin_visit_stats.last_notfound
+ if scheduled_cooldown:
+ # Don't re-schedule visits if they're already scheduled but we haven't
+ # recorded a result yet, unless they've been scheduled more than a week
+ # ago (it probably means we've lost them in flight somewhere).
+ where_clauses.append(
+ """origin_visit_stats.last_scheduled IS NULL
+ OR origin_visit_stats.last_scheduled < GREATEST(
+ %s - %s,
+ origin_visit_stats.last_eventful,
+ origin_visit_stats.last_uneventful,
+ origin_visit_stats.last_failed,
+ origin_visit_stats.last_notfound
+ )
+ """
)
- """
- )
- query_args.append(timestamp)
+ query_args.append(timestamp)
+ query_args.append(scheduled_cooldown)
+
+ if failed_cooldown:
+ # Don't retry failed origins too often
+ where_clauses.append(
+ "origin_visit_stats.last_failed is null "
+ "or origin_visit_stats.last_failed < %s - %s"
+ )
+ query_args.append(timestamp)
+ query_args.append(failed_cooldown)
+
+ if notfound_cooldown:
+ # Don't retry not found origins too often
+ where_clauses.append(
+ "origin_visit_stats.last_notfound is null "
+ "or origin_visit_stats.last_notfound < %s - %s"
+ )
+ query_args.append(timestamp)
+ query_args.append(notfound_cooldown)
if policy == "oldest_scheduled_first":
order_by = "origin_visit_stats.last_scheduled NULLS FIRST"
@@ -373,6 +384,15 @@
# order by increasing last_update (oldest first)
where_clauses.append("listed_origins.last_update IS NOT NULL")
order_by = "listed_origins.last_update"
+ elif policy == "never_visited_unknown_last_update":
+ # never visited origins have a NULL last_snapshot
+ where_clauses.append("origin_visit_stats.last_snapshot IS NULL")
+
+ # Unknown last update
+ where_clauses.append("listed_origins.last_update IS NULL")
+
+ # Try to get at the oldest origins first
+ order_by = "listed_origins.first_seen"
elif policy == "already_visited_order_by_lag":
# TODO: store "visit lag" in a materialized view?
@@ -403,11 +423,17 @@
else:
raise UnknownPolicy(f"Unknown scheduling policy {policy}")
+ if tablesample:
+ table = "listed_origins tablesample SYSTEM (%s)"
+ query_args.insert(0, tablesample)
+ else:
+ table = "listed_origins"
+
select_query = f"""
SELECT
{origin_select_cols}
FROM
- listed_origins
+ {table}
LEFT JOIN
origin_visit_stats USING (url, visit_type)
WHERE
diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py
--- a/swh/scheduler/cli/origin.py
+++ b/swh/scheduler/cli/origin.py
@@ -145,6 +145,52 @@
click.echo_via_pager("\n".join(output))
+@origin.command("send-to-celery")
+@click.option(
+ "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy"
+)
+@click.option(
+ "--queue", "-q", help="Target celery queue", type=str,
+)
+@click.option(
+ "--tablesample", help="Table sampling percentage", type=float,
+)
+@click.argument("type", type=str)
+@click.pass_context
+def send_to_celery(
+ ctx, policy: str, queue: Optional[str], tablesample: Optional[float], type: str
+):
+ """Send the next origin visits of the TYPE loader to celery, filling the queue."""
+ from kombu.utils.uuid import uuid
+
+ from swh.scheduler.celery_backend.config import app, get_available_slots
+
+ scheduler = ctx.obj["scheduler"]
+
+ task_type = scheduler.get_task_type(f"load-{type}")
+
+ task_name = task_type["backend_name"]
+ queue_name = queue or task_name
+
+ num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"])
+
+ print(num_tasks, "slots available in celery queue")
+ origins = scheduler.grab_next_visits(
+ type, num_tasks, policy=policy, tablesample=tablesample
+ )
+
+ print(len(origins), "visits to send to celery")
+ for origin in origins:
+ task_dict = origin.as_task_dict()
+ app.send_task(
+ task_name,
+ task_id=uuid(),
+ args=task_dict["arguments"]["args"],
+ kwargs=task_dict["arguments"]["kwargs"],
+ queue=queue_name,
+ )
+
+
@origin.command("update-metrics")
@click.option("--lister", default=None, help="Only update metrics for this lister")
@click.option(
diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py
--- a/swh/scheduler/interface.py
+++ b/swh/scheduler/interface.py
@@ -389,6 +389,10 @@
count: int,
policy: str,
timestamp: Optional[datetime.datetime] = None,
+ scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7),
+ failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14),
+ notfound_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31),
+ tablesample: Optional[float] = None,
) -> List[ListedOrigin]:
"""Get at most the `count` next origins that need to be visited with
the `visit_type` loader according to the given scheduling `policy`.
@@ -402,6 +406,14 @@
policy: the scheduling policy used to select which visits to schedule
timestamp: the mocked timestamp at which we're recording that the visits are
being scheduled (defaults to the current time)
+ scheduled_cooldown: the minimal interval before which we can schedule
+ the same origin again
+ failed_cooldown: the minimal interval before which we can reschedule a
+ failed origin
+ notfound_cooldown: the minimal interval before which we can reschedule a
+ notfound origin
+ tablesample: the percentage of the table on which we run the query
+ (None: no sampling)
"""
...
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Jul 3 2025, 6:13 PM (4 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3231248
Attached To
D5809: Direct scheduling of origin visits in celery
Event Timeline
Log In to Comment