diff --git a/swh/scheduler/simulator/__init__.py b/swh/scheduler/simulator/__init__.py index 483f9a9..138b74b 100644 --- a/swh/scheduler/simulator/__init__.py +++ b/swh/scheduler/simulator/__init__.py @@ -1,126 +1,136 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta, timezone import logging from typing import Dict, Generator, Optional from simpy import Event from swh.scheduler import get_scheduler from swh.scheduler.model import ListedOrigin from . import origin_scheduler, task_scheduler from .common import Environment, Queue, SimulationReport, Task from .origins import load_task_process logger = logging.getLogger(__name__) def worker_process( env: Environment, name: str, task_queue: Queue, status_queue: Queue ) -> Generator[Event, Task, None]: """A worker which consumes tasks from the input task_queue. Tasks themselves send OriginVisitStatus objects to the status_queue.""" logger.debug("%s worker %s: Start", env.time, name) while True: task = yield task_queue.get() logger.debug( "%s worker %s: Run task %s origin=%s", env.time, name, task.visit_type, task.origin, ) yield env.process(load_task_process(env, task, status_queue=status_queue)) def setup( env: Environment, scheduler: str, policy: Optional[str], workers_per_type: Dict[str, int], task_queue_capacity: int, + min_batch_size: int, ): # We expect PGHOST, PGPORT, ... set externally task_queues = { visit_type: Queue(env, capacity=task_queue_capacity) for visit_type in workers_per_type } status_queue = Queue(env) if scheduler == "origin_scheduler": if policy is None: raise ValueError("origin_scheduler needs a scheduling policy") - env.process(origin_scheduler.scheduler_runner_process(env, task_queues, policy)) + env.process( + origin_scheduler.scheduler_runner_process( + env, task_queues, policy, min_batch_size=min_batch_size + ) + ) env.process( origin_scheduler.scheduler_journal_client_process(env, status_queue) ) elif scheduler == "task_scheduler": if policy is not None: raise ValueError("task_scheduler doesn't support a scheduling policy") - env.process(task_scheduler.scheduler_runner_process(env, task_queues)) + env.process( + task_scheduler.scheduler_runner_process( + env, task_queues, min_batch_size=min_batch_size + ) + ) env.process(task_scheduler.scheduler_listener_process(env, status_queue)) else: raise ValueError(f"Unknown scheduler: {scheduler}") for visit_type, num_workers in workers_per_type.items(): task_queue = task_queues[visit_type] for i in range(num_workers): worker_name = f"worker-{visit_type}-{i}" env.process(worker_process(env, worker_name, task_queue, status_queue)) def fill_test_data(num_origins: int = 100000): scheduler = get_scheduler(cls="local", db="") stored_lister = scheduler.get_or_create_lister(name="example") assert stored_lister.id is not None origins = [ ListedOrigin( lister_id=stored_lister.id, url=f"https://example.com/{i:04d}.git", visit_type="git", last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc), ) for i in range(num_origins) ] scheduler.record_listed_origins(origins) scheduler.create_tasks( [ { **origin.as_task_dict(), "policy": "recurring", "next_run": origin.last_update, "interval": timedelta(days=64), } for origin in origins ] ) def run(scheduler: str, policy: Optional[str], runtime: Optional[int]): NUM_WORKERS = 48 start_time = datetime.now(tz=timezone.utc) env = Environment(start_time=start_time) env.scheduler = get_scheduler(cls="local", db="") env.report = SimulationReport() setup( env, scheduler=scheduler, policy=policy, workers_per_type={"git": NUM_WORKERS}, task_queue_capacity=10000, + min_batch_size=1000, ) try: env.run(until=runtime) except KeyboardInterrupt: pass finally: end_time = env.time print("total time:", end_time - start_time) print(env.report.format()) diff --git a/swh/scheduler/simulator/origin_scheduler.py b/swh/scheduler/simulator/origin_scheduler.py index 7af1263..3b9d59a 100644 --- a/swh/scheduler/simulator/origin_scheduler.py +++ b/swh/scheduler/simulator/origin_scheduler.py @@ -1,69 +1,68 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Agents using the new origin-aware scheduler.""" import logging from typing import Any, Dict, Generator, Iterator, List from simpy import Event from swh.scheduler.journal_client import process_journal_objects from .common import Environment, Queue, Task, TaskEvent logger = logging.getLogger(__name__) def scheduler_runner_process( - env: Environment, task_queues: Dict[str, Queue], policy: str, + env: Environment, task_queues: Dict[str, Queue], policy: str, min_batch_size: int ) -> Iterator[Event]: """Scheduler runner. Grabs next visits from the database according to the scheduling policy, and fills the task_queues accordingly.""" while True: for visit_type, queue in task_queues.items(): - min_batch_size = max(queue.capacity // 10, 1) remaining = queue.slots_remaining() if remaining < min_batch_size: continue next_origins = env.scheduler.grab_next_visits( visit_type, remaining, policy=policy ) logger.debug( "%s runner: running %s %s tasks", env.time, visit_type, len(next_origins), ) for origin in next_origins: yield queue.put(Task(visit_type=origin.visit_type, origin=origin.url)) yield env.timeout(10.0) def scheduler_journal_client_process( env: Environment, status_queue: Queue ) -> Generator[Event, TaskEvent, None]: """Scheduler journal client. Every once in a while, pulls `OriginVisitStatus`es from the status_queue to update the scheduler origin_visit_stats table.""" BATCH_SIZE = 100 statuses: List[Dict[str, Any]] = [] while True: task_event = yield status_queue.get() statuses.append(task_event.status.to_dict()) if len(statuses) < BATCH_SIZE: continue logger.debug( "%s journal client: processing %s statuses", env.time, len(statuses) ) process_journal_objects( {"origin_visit_status": statuses}, scheduler=env.scheduler ) statuses = [] diff --git a/swh/scheduler/simulator/task_scheduler.py b/swh/scheduler/simulator/task_scheduler.py index 6289cd9..a86a962 100644 --- a/swh/scheduler/simulator/task_scheduler.py +++ b/swh/scheduler/simulator/task_scheduler.py @@ -1,77 +1,76 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Agents using the "old" task-based scheduler.""" import logging from typing import Dict, Generator, Iterator from simpy import Event from .common import Environment, Queue, Task, TaskEvent logger = logging.getLogger(__name__) def scheduler_runner_process( - env: Environment, task_queues: Dict[str, Queue], + env: Environment, task_queues: Dict[str, Queue], min_batch_size: int, ) -> Iterator[Event]: """Scheduler runner. Grabs next visits from the database according to the scheduling policy, and fills the task_queues accordingly.""" while True: for visit_type, queue in task_queues.items(): - min_batch_size = max(queue.capacity // 10, 1) remaining = queue.slots_remaining() if remaining < min_batch_size: continue next_tasks = env.scheduler.grab_ready_tasks( f"load-{visit_type}", num_tasks=remaining, timestamp=env.time ) logger.debug( "%s runner: running %s %s tasks", env.time, visit_type, len(next_tasks), ) sim_tasks = [ Task(visit_type=visit_type, origin=task["arguments"]["kwargs"]["url"]) for task in next_tasks ] env.scheduler.mass_schedule_task_runs( [ { "task": task["id"], "scheduled": env.time, "backend_id": str(sim_task.backend_id), } for task, sim_task in zip(next_tasks, sim_tasks) ] ) for sim_task in sim_tasks: yield queue.put(sim_task) yield env.timeout(10.0) def scheduler_listener_process( env: Environment, status_queue: Queue ) -> Generator[Event, TaskEvent, None]: """Scheduler listener. In the real world this would listen to celery events, but we listen to the status_queue and simulate celery events from that.""" while True: event = yield status_queue.get() if event.status.status == "ongoing": env.scheduler.start_task_run(event.task.backend_id, timestamp=env.time) else: if event.status.status == "full": status = "eventful" if event.eventful else "uneventful" else: status = "failed" env.scheduler.end_task_run( str(event.task.backend_id), status=status, timestamp=env.time )