diff --git a/swh/scheduler/cli/simulator.py b/swh/scheduler/cli/simulator.py index 669bc34..beb7a8c 100644 --- a/swh/scheduler/cli/simulator.py +++ b/swh/scheduler/cli/simulator.py @@ -1,82 +1,87 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import time import click from . import cli @cli.group("simulator") def simulator(): """Scheduler simulator.""" pass @simulator.command("fill-test-data") @click.option( "--num-origins", "-n", type=int, default=100000, help="Number of listed origins to add", ) @click.pass_context def fill_test_data_command(ctx, num_origins): """Fill the scheduler with test data for simulation purposes.""" from swh.scheduler.simulator import fill_test_data click.echo(f"Filling {num_origins:,} listed origins data...") start = time.monotonic() fill_test_data(ctx.obj["scheduler"], num_origins=num_origins) runtime = time.monotonic() - start click.echo(f"Completed in {runtime:.2f} seconds") @simulator.command("run") @click.option( "--scheduler", "-s", type=click.Choice(["task_scheduler", "origin_scheduler"]), default="origin_scheduler", help="Scheduler to simulate", ) @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy to simulate (only for origin_scheduler)", ) @click.option("--runtime", "-t", type=float, help="Simulated runtime") @click.option( "--plots/--no-plots", "-P", "showplots", help="Show results as plots (with plotille)", ) +@click.option( + "--csv", "-o", "csvfile", type=click.File("w"), help="Export results in a CSV file" +) @click.pass_context -def run_command(ctx, scheduler, policy, runtime, showplots): +def run_command(ctx, scheduler, policy, runtime, showplots, csvfile): """Run the scheduler simulator. By default, the simulation runs forever. You can cap the simulated runtime with the --runtime option, and you can always press Ctrl+C to interrupt the running simulation. 'task_scheduler' is the "classic" task-based scheduler; 'origin_scheduler' is the new origin-visit-aware simulator. The latter uses --policy to decide which origins to schedule first based on information from listers. """ from swh.scheduler.simulator import run policy = policy if scheduler == "origin_scheduler" else None report = run( scheduler=ctx.obj["scheduler"], scheduler_type=scheduler, policy=policy, runtime=runtime, ) print(report.format(with_plots=showplots)) + if csvfile is not None: + report.metrics_csv(csvfile) diff --git a/swh/scheduler/simulator/common.py b/swh/scheduler/simulator/common.py index 749d424..c421d36 100644 --- a/swh/scheduler/simulator/common.py +++ b/swh/scheduler/simulator/common.py @@ -1,159 +1,191 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import csv from dataclasses import dataclass, field from datetime import datetime, timedelta import textwrap -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, TextIO, Tuple import uuid import plotille from simpy import Environment as _Environment from simpy import Store from swh.model.model import OriginVisitStatus from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import SchedulerMetrics @dataclass class SimulationReport: DURATION_THRESHOLD = 3600 """Max duration for histograms""" total_visits: int = 0 """Total count of finished visits""" visit_runtimes: Dict[Tuple[str, bool], List[float]] = field(default_factory=dict) """Collected visit runtimes for each (status, eventful) tuple""" scheduler_metrics: List[Tuple[datetime, List[SchedulerMetrics]]] = field( default_factory=list ) - """Collected scheduler metrics for every timestamp""" + """Collected scheduler metrics + + This is a list of couples (timestamp, [SchedulerMetrics,]): the list of + scheduler metrics collected at given timestamp. + """ visit_metrics: List[Tuple[datetime, int]] = field(default_factory=list) """Collected visit metrics over time""" latest_snapshots: Dict[Tuple[str, str], bytes] = field(default_factory=dict) """Collected latest snapshots for origins""" def record_visit( self, origin: Tuple[str, str], duration: float, status: str, snapshot=Optional[bytes], ) -> None: eventful = False if status == "full": eventful = snapshot != self.latest_snapshots.get(origin) self.latest_snapshots[origin] = snapshot self.total_visits += 1 self.visit_runtimes.setdefault((status, eventful), []).append(duration) def record_metrics( self, timestamp: datetime, scheduler_metrics: List[SchedulerMetrics] ): self.scheduler_metrics.append((timestamp, scheduler_metrics)) self.visit_metrics.append((timestamp, self.total_visits)) @property def uneventful_visits(self): """Number of uneventful, full visits""" return len(self.visit_runtimes.get(("full", False), [])) def runtime_histogram(self, status: str, eventful: bool) -> str: runtimes = self.visit_runtimes.get((status, eventful), []) return plotille.hist( [runtime for runtime in runtimes if runtime <= self.DURATION_THRESHOLD] ) def metrics_plot(self) -> str: timestamps, metric_lists = zip(*self.scheduler_metrics) known = [sum(m.origins_known for m in metrics) for metrics in metric_lists] never_visited = [ sum(m.origins_never_visited for m in metrics) for metrics in metric_lists ] figure = plotille.Figure() figure.x_label = "simulated time" figure.y_label = "origins" figure.scatter(timestamps, known, label="Known origins") figure.scatter(timestamps, never_visited, label="Origins never visited") visit_timestamps, n_visits = zip(*self.visit_metrics) figure.scatter(visit_timestamps, n_visits, label="Visits over time") return figure.show(legend=True) + def metrics_csv(self, fobj: TextIO) -> None: + """Export scheduling metrics in a csv file""" + csv_writer = csv.writer(fobj) + csv_writer.writerow( + [ + "timestamp", + "known_origins", + "enabled_origins", + "never_visited_origins", + "origins_with_pending_changes", + ] + ) + + timestamps, metric_lists = zip(*self.scheduler_metrics) + known = (sum(m.origins_known for m in metrics) for metrics in metric_lists) + enabled = (sum(m.origins_enabled for m in metrics) for metrics in metric_lists) + never_visited = ( + sum(m.origins_never_visited for m in metrics) for metrics in metric_lists + ) + pending_changes = ( + sum(m.origins_with_pending_changes for m in metrics) + for metrics in metric_lists + ) + csv_writer.writerows( + zip(timestamps, known, enabled, never_visited, pending_changes) + ) + def format(self, with_plots=True): full_visits = self.visit_runtimes.get(("full", True), []) long_tasks = sum(runtime > self.DURATION_THRESHOLD for runtime in full_visits) output = textwrap.dedent( f"""\ Total visits: {self.total_visits} Uneventful visits: {self.uneventful_visits} Eventful visits: {len(full_visits)} Very long running tasks: {long_tasks} """ ) if with_plots: histogram = self.runtime_histogram("full", True) plot = self.metrics_plot() output += ( "Visit time histogram for eventful visits:" + histogram + "\n" + textwrap.dedent( """\ Metrics over time: """ ) + plot ) return output @dataclass class Task: visit_type: str origin: str backend_id: uuid.UUID = field(default_factory=uuid.uuid4) @dataclass class TaskEvent: task: Task status: OriginVisitStatus eventful: bool = field(default=False) class Queue(Store): """Model a queue of objects to be passed between processes.""" def __len__(self): return len(self.items or []) def slots_remaining(self): return self.capacity - len(self) class Environment(_Environment): report: SimulationReport scheduler: SchedulerInterface def __init__(self, start_time: datetime): if start_time.tzinfo is None: raise ValueError("start_time must have timezone information") self.start_time = start_time super().__init__() @property def time(self): """Get the current simulated wall clock time""" return self.start_time + timedelta(seconds=self.now) diff --git a/swh/scheduler/tests/test_simulator.py b/swh/scheduler/tests/test_simulator.py index aa1b172..95085a5 100644 --- a/swh/scheduler/tests/test_simulator.py +++ b/swh/scheduler/tests/test_simulator.py @@ -1,64 +1,69 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import io + import pytest from swh.core.api.classes import stream_results import swh.scheduler.simulator as simulator from swh.scheduler.tests.common import TASK_TYPES NUM_ORIGINS = 42 TEST_RUNTIME = 1000 def test_fill_test_data(swh_scheduler): for task_type in TASK_TYPES.values(): swh_scheduler.create_task_type(task_type) simulator.fill_test_data(swh_scheduler, num_origins=NUM_ORIGINS) origins = list(stream_results(swh_scheduler.get_listed_origins)) assert len(origins) == NUM_ORIGINS res = swh_scheduler.search_tasks() assert len(res) == NUM_ORIGINS @pytest.mark.parametrize( "policy", ( "oldest_scheduled_first", "never_visited_oldest_update_first", "already_visited_order_by_lag", ), ) def test_run_origin_scheduler(swh_scheduler, policy): for task_type in TASK_TYPES.values(): swh_scheduler.create_task_type(task_type) simulator.fill_test_data(swh_scheduler, num_origins=NUM_ORIGINS) simulator.run( swh_scheduler, scheduler_type="origin_scheduler", policy=policy, runtime=TEST_RUNTIME, ) def test_run_task_scheduler(swh_scheduler): for task_type in TASK_TYPES.values(): swh_scheduler.create_task_type(task_type) simulator.fill_test_data(swh_scheduler, num_origins=NUM_ORIGINS) report = simulator.run( swh_scheduler, scheduler_type="task_scheduler", policy=None, runtime=TEST_RUNTIME, ) # just check these SimulationReport methods do not crash assert report.format(with_plots=True) assert report.format(with_plots=False) + fobj = io.StringIO() + report.metrics_csv(fobj=fobj) + assert fobj.getvalue()