diff --git a/swh/scheduler/cli/simulator.py b/swh/scheduler/cli/simulator.py index bb3fbd6..669bc34 100644 --- a/swh/scheduler/cli/simulator.py +++ b/swh/scheduler/cli/simulator.py @@ -1,74 +1,82 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import time import click from . import cli @cli.group("simulator") def simulator(): """Scheduler simulator.""" pass @simulator.command("fill-test-data") @click.option( "--num-origins", "-n", type=int, default=100000, help="Number of listed origins to add", ) @click.pass_context def fill_test_data_command(ctx, num_origins): """Fill the scheduler with test data for simulation purposes.""" from swh.scheduler.simulator import fill_test_data click.echo(f"Filling {num_origins:,} listed origins data...") start = time.monotonic() fill_test_data(ctx.obj["scheduler"], num_origins=num_origins) runtime = time.monotonic() - start click.echo(f"Completed in {runtime:.2f} seconds") @simulator.command("run") @click.option( "--scheduler", "-s", type=click.Choice(["task_scheduler", "origin_scheduler"]), default="origin_scheduler", help="Scheduler to simulate", ) @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy to simulate (only for origin_scheduler)", ) @click.option("--runtime", "-t", type=float, help="Simulated runtime") +@click.option( + "--plots/--no-plots", + "-P", + "showplots", + help="Show results as plots (with plotille)", +) @click.pass_context -def run_command(ctx, scheduler, policy, runtime): +def run_command(ctx, scheduler, policy, runtime, showplots): """Run the scheduler simulator. By default, the simulation runs forever. You can cap the simulated runtime with the --runtime option, and you can always press Ctrl+C to interrupt the running simulation. 'task_scheduler' is the "classic" task-based scheduler; 'origin_scheduler' is the new origin-visit-aware simulator. The latter uses --policy to decide which origins to schedule first based on information from listers. """ from swh.scheduler.simulator import run policy = policy if scheduler == "origin_scheduler" else None - run( + report = run( scheduler=ctx.obj["scheduler"], scheduler_type=scheduler, policy=policy, runtime=runtime, ) + + print(report.format(with_plots=showplots)) diff --git a/swh/scheduler/simulator/__init__.py b/swh/scheduler/simulator/__init__.py index fa2228b..7de91c8 100644 --- a/swh/scheduler/simulator/__init__.py +++ b/swh/scheduler/simulator/__init__.py @@ -1,163 +1,163 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This package runs the scheduler in a simulated environment, to evaluate various metrics. See :ref:`swh-scheduler-simulator`. This module orchestrates of the simulator by initializing processes and connecting them together; these processes are defined in modules in the package and simulate/call specific components.""" from datetime import datetime, timedelta, timezone import logging from typing import Dict, Generator, Optional from simpy import Event from swh.scheduler.interface import SchedulerInterface from . import origin_scheduler, task_scheduler from .common import Environment, Queue, SimulationReport, Task from .origins import generate_listed_origin, lister_process, load_task_process logger = logging.getLogger(__name__) def update_metrics_process( env: Environment, update_interval: int ) -> Generator[Event, None, None]: """Update the scheduler metrics every `update_interval` (simulated) seconds, and add them to the SimulationReport """ t0 = env.time while True: metrics = env.scheduler.update_metrics(timestamp=env.time) env.report.record_metrics(env.time, metrics) dt = env.time - t0 logger.info("time:%s visits:%s", dt, env.report.total_visits) yield env.timeout(update_interval) def worker_process( env: Environment, name: str, task_queue: Queue, status_queue: Queue ) -> Generator[Event, Task, None]: """A worker which consumes tasks from the input task_queue. Tasks themselves send OriginVisitStatus objects to the status_queue.""" logger.debug("%s worker %s: Start", env.time, name) while True: task = yield task_queue.get() logger.debug( "%s worker %s: Run task %s origin=%s", env.time, name, task.visit_type, task.origin, ) yield env.process(load_task_process(env, task, status_queue=status_queue)) def setup( env: Environment, scheduler_type: str, policy: Optional[str], workers_per_type: Dict[str, int], task_queue_capacity: int, min_batch_size: int, metrics_update_interval: int, ): task_queues = { visit_type: Queue(env, capacity=task_queue_capacity) for visit_type in workers_per_type } status_queue = Queue(env) if scheduler_type == "origin_scheduler": if policy is None: raise ValueError("origin_scheduler needs a scheduling policy") env.process( origin_scheduler.scheduler_runner_process( env, task_queues, policy, min_batch_size=min_batch_size ) ) env.process( origin_scheduler.scheduler_journal_client_process(env, status_queue) ) elif scheduler_type == "task_scheduler": if policy is not None: raise ValueError("task_scheduler doesn't support a scheduling policy") env.process( task_scheduler.scheduler_runner_process( env, task_queues, min_batch_size=min_batch_size ) ) env.process(task_scheduler.scheduler_listener_process(env, status_queue)) else: raise ValueError(f"Unknown scheduler type to simulate: {scheduler_type}") env.process(update_metrics_process(env, metrics_update_interval)) for visit_type, num_workers in workers_per_type.items(): task_queue = task_queues[visit_type] for i in range(num_workers): worker_name = f"worker-{visit_type}-{i}" env.process(worker_process(env, worker_name, task_queue, status_queue)) lister = env.scheduler.get_or_create_lister(name="example") assert lister.id env.process(lister_process(env, lister.id)) def fill_test_data(scheduler: SchedulerInterface, num_origins: int = 100000): """Fills the database with mock data to test the simulator.""" stored_lister = scheduler.get_or_create_lister(name="example") assert stored_lister.id is not None # Generate 'num_origins' new origins origins = [generate_listed_origin(stored_lister.id) for _ in range(num_origins)] scheduler.record_listed_origins(origins) scheduler.create_tasks( [ { **origin.as_task_dict(), "policy": "recurring", "next_run": origin.last_update, "interval": timedelta(days=64), } for origin in origins ] ) def run( scheduler: SchedulerInterface, scheduler_type: str, policy: Optional[str], runtime: Optional[int], ): NUM_WORKERS = 48 start_time = datetime.now(tz=timezone.utc) env = Environment(start_time=start_time) env.scheduler = scheduler env.report = SimulationReport() setup( env, scheduler_type=scheduler_type, policy=policy, workers_per_type={"git": NUM_WORKERS}, task_queue_capacity=10000, min_batch_size=1000, metrics_update_interval=3600, ) try: env.run(until=runtime) except KeyboardInterrupt: pass finally: end_time = env.time print("total simulated time:", end_time - start_time) metrics = env.scheduler.update_metrics(timestamp=end_time) env.report.record_metrics(end_time, metrics) - print(env.report.format()) + return env.report diff --git a/swh/scheduler/simulator/common.py b/swh/scheduler/simulator/common.py index 9b2e9b0..749d424 100644 --- a/swh/scheduler/simulator/common.py +++ b/swh/scheduler/simulator/common.py @@ -1,157 +1,159 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import dataclass, field from datetime import datetime, timedelta import textwrap from typing import Dict, List, Optional, Tuple import uuid import plotille from simpy import Environment as _Environment from simpy import Store from swh.model.model import OriginVisitStatus from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import SchedulerMetrics @dataclass class SimulationReport: DURATION_THRESHOLD = 3600 """Max duration for histograms""" total_visits: int = 0 """Total count of finished visits""" visit_runtimes: Dict[Tuple[str, bool], List[float]] = field(default_factory=dict) """Collected visit runtimes for each (status, eventful) tuple""" scheduler_metrics: List[Tuple[datetime, List[SchedulerMetrics]]] = field( default_factory=list ) """Collected scheduler metrics for every timestamp""" visit_metrics: List[Tuple[datetime, int]] = field(default_factory=list) """Collected visit metrics over time""" latest_snapshots: Dict[Tuple[str, str], bytes] = field(default_factory=dict) """Collected latest snapshots for origins""" def record_visit( self, origin: Tuple[str, str], duration: float, status: str, snapshot=Optional[bytes], ) -> None: eventful = False if status == "full": eventful = snapshot != self.latest_snapshots.get(origin) self.latest_snapshots[origin] = snapshot self.total_visits += 1 self.visit_runtimes.setdefault((status, eventful), []).append(duration) def record_metrics( self, timestamp: datetime, scheduler_metrics: List[SchedulerMetrics] ): self.scheduler_metrics.append((timestamp, scheduler_metrics)) self.visit_metrics.append((timestamp, self.total_visits)) @property def uneventful_visits(self): """Number of uneventful, full visits""" return len(self.visit_runtimes.get(("full", False), [])) def runtime_histogram(self, status: str, eventful: bool) -> str: runtimes = self.visit_runtimes.get((status, eventful), []) return plotille.hist( [runtime for runtime in runtimes if runtime <= self.DURATION_THRESHOLD] ) def metrics_plot(self) -> str: timestamps, metric_lists = zip(*self.scheduler_metrics) known = [sum(m.origins_known for m in metrics) for metrics in metric_lists] never_visited = [ sum(m.origins_never_visited for m in metrics) for metrics in metric_lists ] figure = plotille.Figure() figure.x_label = "simulated time" figure.y_label = "origins" figure.scatter(timestamps, known, label="Known origins") figure.scatter(timestamps, never_visited, label="Origins never visited") visit_timestamps, n_visits = zip(*self.visit_metrics) figure.scatter(visit_timestamps, n_visits, label="Visits over time") return figure.show(legend=True) - def format(self): + def format(self, with_plots=True): full_visits = self.visit_runtimes.get(("full", True), []) - histogram = self.runtime_histogram("full", True) - plot = self.metrics_plot() long_tasks = sum(runtime > self.DURATION_THRESHOLD for runtime in full_visits) - return ( - textwrap.dedent( - f"""\ + output = textwrap.dedent( + f"""\ Total visits: {self.total_visits} Uneventful visits: {self.uneventful_visits} Eventful visits: {len(full_visits)} Very long running tasks: {long_tasks} - Visit time histogram for eventful visits: """ - ) - + histogram - + "\n" - + textwrap.dedent( - """\ - Metrics over time: - """ - ) - + plot ) + if with_plots: + histogram = self.runtime_histogram("full", True) + plot = self.metrics_plot() + output += ( + "Visit time histogram for eventful visits:" + + histogram + + "\n" + + textwrap.dedent( + """\ + Metrics over time: + """ + ) + + plot + ) + return output @dataclass class Task: visit_type: str origin: str backend_id: uuid.UUID = field(default_factory=uuid.uuid4) @dataclass class TaskEvent: task: Task status: OriginVisitStatus eventful: bool = field(default=False) class Queue(Store): """Model a queue of objects to be passed between processes.""" def __len__(self): return len(self.items or []) def slots_remaining(self): return self.capacity - len(self) class Environment(_Environment): report: SimulationReport scheduler: SchedulerInterface def __init__(self, start_time: datetime): if start_time.tzinfo is None: raise ValueError("start_time must have timezone information") self.start_time = start_time super().__init__() @property def time(self): """Get the current simulated wall clock time""" return self.start_time + timedelta(seconds=self.now)