diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -316,22 +316,13 @@ count: int, policy: str, timestamp: Optional[datetime.datetime] = None, + scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), + failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), + notfound_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), + tablesample: Optional[float] = None, db=None, cur=None, ) -> List[ListedOrigin]: - """Get at most the `count` next origins that need to be visited with - the `visit_type` loader according to the given scheduling `policy`. - - This will mark the origins as scheduled in the origin_visit_stats - table, to avoid scheduling multiple visits to the same origin. - - Arguments: - visit_type: type of visits to schedule - count: number of visits to schedule - policy: the scheduling policy used to select which visits to schedule - timestamp: the mocked timestamp at which we're recording that the visits are - being scheduled (defaults to the current time) - """ if timestamp is None: timestamp = utcnow() @@ -348,21 +339,41 @@ where_clauses.append("visit_type = %s") query_args.append(visit_type) - # Don't re-schedule visits if they're already scheduled but we haven't - # recorded a result yet, unless they've been scheduled more than a week - # ago (it probably means we've lost them in flight somewhere). - where_clauses.append( - """origin_visit_stats.last_scheduled IS NULL - OR origin_visit_stats.last_scheduled < GREATEST( - %s - '7 day'::interval, - origin_visit_stats.last_eventful, - origin_visit_stats.last_uneventful, - origin_visit_stats.last_failed, - origin_visit_stats.last_notfound + if scheduled_cooldown: + # Don't re-schedule visits if they're already scheduled but we haven't + # recorded a result yet, unless they've been scheduled more than a week + # ago (it probably means we've lost them in flight somewhere). + where_clauses.append( + """origin_visit_stats.last_scheduled IS NULL + OR origin_visit_stats.last_scheduled < GREATEST( + %s - %s, + origin_visit_stats.last_eventful, + origin_visit_stats.last_uneventful, + origin_visit_stats.last_failed, + origin_visit_stats.last_notfound + ) + """ ) - """ - ) - query_args.append(timestamp) + query_args.append(timestamp) + query_args.append(scheduled_cooldown) + + if failed_cooldown: + # Don't retry failed origins too often + where_clauses.append( + "origin_visit_stats.last_failed is null " + "or origin_visit_stats.last_failed < %s - %s" + ) + query_args.append(timestamp) + query_args.append(failed_cooldown) + + if notfound_cooldown: + # Don't retry not found origins too often + where_clauses.append( + "origin_visit_stats.last_notfound is null " + "or origin_visit_stats.last_notfound < %s - %s" + ) + query_args.append(timestamp) + query_args.append(notfound_cooldown) if policy == "oldest_scheduled_first": order_by = "origin_visit_stats.last_scheduled NULLS FIRST" @@ -373,6 +384,15 @@ # order by increasing last_update (oldest first) where_clauses.append("listed_origins.last_update IS NOT NULL") order_by = "listed_origins.last_update" + elif policy == "never_visited_unknown_last_update": + # never visited origins have a NULL last_snapshot + where_clauses.append("origin_visit_stats.last_snapshot IS NULL") + + # Unknown last update + where_clauses.append("listed_origins.last_update IS NULL") + + # Try to get at the oldest origins first + order_by = "listed_origins.first_seen" elif policy == "already_visited_order_by_lag": # TODO: store "visit lag" in a materialized view? @@ -403,11 +423,17 @@ else: raise UnknownPolicy(f"Unknown scheduling policy {policy}") + if tablesample: + table = "listed_origins tablesample SYSTEM (%s)" + query_args.insert(0, tablesample) + else: + table = "listed_origins" + select_query = f""" SELECT {origin_select_cols} FROM - listed_origins + {table} LEFT JOIN origin_visit_stats USING (url, visit_type) WHERE diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -8,7 +8,7 @@ import os from time import monotonic as _monotonic import traceback -from typing import Any, Dict +from typing import Any, Dict, Optional import urllib.parse from celery import Celery @@ -225,6 +225,26 @@ return stats.get("messages") +MAX_NUM_TASKS = 10000 + + +def get_available_slots(app, queue_name: str, max_length: Optional[int]): + """Get the number of tasks that can be sent to `queue_name`, when + the queue is limited to `max_length`.""" + + if not max_length: + return MAX_NUM_TASKS + + try: + queue_length = get_queue_length(app, queue_name) + except ValueError: + # Unknown queue length, just schedule all the tasks + return MAX_NUM_TASKS + + # Clamp the return value to MAX_NUM_TASKS + return min(max_length - queue_length, MAX_NUM_TASKS) + + def register_task_class(app, name, cls): """Register a class-based task under the given name""" if name in app.tasks: diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -10,14 +10,12 @@ from swh.core.statsd import statsd from swh.scheduler import get_scheduler +from swh.scheduler.celery_backend.config import MAX_NUM_TASKS, get_available_slots from swh.scheduler.interface import SchedulerInterface from swh.scheduler.utils import utcnow logger = logging.getLogger(__name__) -# Max batch size for tasks -MAX_NUM_TASKS = 10000 - def run_ready_tasks(backend: SchedulerInterface, app) -> List[Dict]: """Schedule tasks ready to be scheduled. @@ -56,23 +54,11 @@ for task_type in backend.get_task_types(): task_type_name = task_type["type"] task_types[task_type_name] = task_type - max_queue_length = task_type["max_queue_length"] - if max_queue_length is None: - max_queue_length = 0 + + max_queue_length = task_type["max_queue_length"] or 0 backend_name = task_type["backend_name"] - if max_queue_length: - try: - queue_length = app.get_queue_length(backend_name) - except ValueError: - queue_length = None - - if queue_length is None: - # Running without RabbitMQ (probably a test env). - num_tasks = MAX_NUM_TASKS - else: - num_tasks = min(max_queue_length - queue_length, MAX_NUM_TASKS) - else: - num_tasks = MAX_NUM_TASKS + num_tasks = get_available_slots(app, backend_name, max_queue_length) + # only pull tasks if the buffer is at least 1/5th empty (= 80% # full), to help postgresql use properly indexed queries. if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py --- a/swh/scheduler/cli/origin.py +++ b/swh/scheduler/cli/origin.py @@ -145,6 +145,52 @@ click.echo_via_pager("\n".join(output)) +@origin.command("send-to-celery") +@click.option( + "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" +) +@click.option( + "--queue", "-q", help="Target celery queue", type=str, +) +@click.option( + "--tablesample", help="Table sampling percentage", type=float, +) +@click.argument("type", type=str) +@click.pass_context +def send_to_celery( + ctx, policy: str, queue: Optional[str], tablesample: Optional[float], type: str +): + """Send the next origin visits of the TYPE loader to celery, filling the queue.""" + from kombu.utils.uuid import uuid + + from swh.scheduler.celery_backend.config import app, get_available_slots + + scheduler = ctx.obj["scheduler"] + + task_type = scheduler.get_task_type(f"load-{type}") + + task_name = task_type["backend_name"] + queue_name = queue or task_name + + num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) + + print(num_tasks, "slots available in celery queue") + origins = scheduler.grab_next_visits( + type, num_tasks, policy=policy, tablesample=tablesample + ) + + print(len(origins), "visits to send to celery") + for origin in origins: + task_dict = origin.as_task_dict() + app.send_task( + task_name, + task_id=uuid(), + args=task_dict["arguments"]["args"], + kwargs=task_dict["arguments"]["kwargs"], + queue=queue_name, + ) + + @origin.command("update-metrics") @click.option("--lister", default=None, help="Only update metrics for this lister") @click.option( diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -389,6 +389,10 @@ count: int, policy: str, timestamp: Optional[datetime.datetime] = None, + scheduled_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=7), + failed_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=14), + notfound_cooldown: Optional[datetime.timedelta] = datetime.timedelta(days=31), + tablesample: Optional[float] = None, ) -> List[ListedOrigin]: """Get at most the `count` next origins that need to be visited with the `visit_type` loader according to the given scheduling `policy`. @@ -402,6 +406,14 @@ policy: the scheduling policy used to select which visits to schedule timestamp: the mocked timestamp at which we're recording that the visits are being scheduled (defaults to the current time) + scheduled_cooldown: the minimal interval before which we can schedule + the same origin again + failed_cooldown: the minimal interval before which we can reschedule a + failed origin + notfound_cooldown: the minimal interval before which we can reschedule a + notfound origin + tablesample: the percentage of the table on which we run the query + (None: no sampling) """ ...