diff --git a/docs/simulator.rst b/docs/simulator.rst index 368deea..923d71a 100644 --- a/docs/simulator.rst +++ b/docs/simulator.rst @@ -1,65 +1,65 @@ .. _swh-scheduler-simulator: Software Heritage Scheduler Simulator ===================================== This component simulates the interaction between the scheduling and loading infrastructure of Software Heritage. This allows quick(er) development of new task scheduling policies without having to wait for the actual infrastructure to perform (heavy) loading tasks. Simulator components -------------------- - real instance of the scheduler database - simulated task queues: replaces RabbitMQ with simple in-memory structures - simulated workers: replaces Celery with simple while loops - simulated load tasks: replaces loaders with noops that take a certain time, and generate synthetic OriginVisitStatus objects - simulated archive -> scheduler feedback loop: OriginVisitStatus objects are pushed to a simple queue which gets processed by the scheduler journal client's process function directly (instead of going through swh.storage and swh.journal (kafka)) In short, only the scheduler database and scheduler logic is kept; every other component (RabbitMQ, Celery, Kafka, SWH loaders, SWH storage) is either replaced with an barebones in-process utility, or removed entirely. Installing the simulator ------------------------ The simulator depends on SimPy and other specific libraries. To install them, please use: .. code-block:: bash pip install 'swh.scheduler[simulator]' Running the simulator --------------------- The simulator uses a real instance of the scheduler database, which is (at least for now) persistent across runs of the simulator. You need to set that up beforehand: .. code-block:: bash # if you want to use a temporary instance of postgresql eval `pifpaf run postgresql` # Set this variable for the simulator to know which db to connect to. pifpaf # sets other variables like PGPORT, PGHOST, ... export PGDATABASE=swh-scheduler # Create/initialize the scheduler database swh db create scheduler -d $PGDATABASE swh db init scheduler -d $PGDATABASE # This generates some data in the scheduler database. You can also feed the # database with more realistic data, e.g. from a lister or from a dump of the # production database. - swh scheduler simulator fill-test-data + swh scheduler -d "dbname=$PGDATABASE" simulator fill-test-data # Run the simulator itself, interacting with the scheduler database you've # just seeded. - swh scheduler simulator run --scheduler origin_scheduler + swh scheduler -d "dbname=$PGDATABASE" simulator run --scheduler origin_scheduler diff --git a/swh/scheduler/cli/simulator.py b/swh/scheduler/cli/simulator.py index d3834bd..bade0d4 100644 --- a/swh/scheduler/cli/simulator.py +++ b/swh/scheduler/cli/simulator.py @@ -1,61 +1,68 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import time import click from . import cli @cli.group("simulator") def simulator(): """Scheduler simulator.""" pass @simulator.command("fill-test-data") -def fill_test_data_command(): +@click.pass_context +def fill_test_data_command(ctx): """Fill the scheduler with test data for simulation purposes.""" from swh.scheduler.simulator import fill_test_data click.echo("Filling test data...") start = time.monotonic() - fill_test_data() + fill_test_data(ctx.obj["scheduler"]) runtime = time.monotonic() - start click.echo(f"Completed in {runtime:.2f} seconds") @simulator.command("run") @click.option( "--scheduler", "-s", type=click.Choice(["task_scheduler", "origin_scheduler"]), default="origin_scheduler", help="Scheduler to simulate", ) @click.option( "--policy", "-p", type=click.Choice(["oldest_scheduled_first"]), default="oldest_scheduled_first", help="Scheduling policy to simulate (only for origin_scheduler)", ) @click.option("--runtime", "-t", type=float, help="Simulated runtime") -def run_command(scheduler, policy, runtime): +@click.pass_context +def run_command(ctx, scheduler, policy, runtime): """Run the scheduler simulator. By default, the simulation runs forever. You can cap the simulated runtime with the --runtime option, and you can always press Ctrl+C to interrupt the running simulation. 'task_scheduler' is the "classic" task-based scheduler; 'origin_scheduler' is the new origin-visit-aware simulator. The latter uses --policy to decide which origins to schedule first based on information from listers. """ from swh.scheduler.simulator import run policy = policy if scheduler == "origin_scheduler" else None - run(scheduler=scheduler, policy=policy, runtime=runtime) + run( + scheduler=ctx.obj["scheduler"], + scheduler_type=scheduler, + policy=policy, + runtime=runtime, + ) diff --git a/swh/scheduler/simulator/__init__.py b/swh/scheduler/simulator/__init__.py index 098bcfe..cc5a196 100644 --- a/swh/scheduler/simulator/__init__.py +++ b/swh/scheduler/simulator/__init__.py @@ -1,144 +1,146 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This package runs the scheduler in a simulated environment, to evaluate various metrics. See :ref:`swh-scheduler-simulator`. This module orchestrates of the simulator by initializing processes and connecting them together; these processes are defined in modules in the package and simulate/call specific components.""" from datetime import datetime, timedelta, timezone import logging from typing import Dict, Generator, Optional from simpy import Event -from swh.scheduler import get_scheduler +from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin from . import origin_scheduler, task_scheduler from .common import Environment, Queue, SimulationReport, Task from .origins import load_task_process logger = logging.getLogger(__name__) def worker_process( env: Environment, name: str, task_queue: Queue, status_queue: Queue ) -> Generator[Event, Task, None]: """A worker which consumes tasks from the input task_queue. Tasks themselves send OriginVisitStatus objects to the status_queue.""" logger.debug("%s worker %s: Start", env.time, name) while True: task = yield task_queue.get() logger.debug( "%s worker %s: Run task %s origin=%s", env.time, name, task.visit_type, task.origin, ) yield env.process(load_task_process(env, task, status_queue=status_queue)) def setup( env: Environment, - scheduler: str, + scheduler_type: str, policy: Optional[str], workers_per_type: Dict[str, int], task_queue_capacity: int, min_batch_size: int, ): - # We expect PGHOST, PGPORT, ... set externally task_queues = { visit_type: Queue(env, capacity=task_queue_capacity) for visit_type in workers_per_type } status_queue = Queue(env) - if scheduler == "origin_scheduler": + if scheduler_type == "origin_scheduler": if policy is None: raise ValueError("origin_scheduler needs a scheduling policy") env.process( origin_scheduler.scheduler_runner_process( env, task_queues, policy, min_batch_size=min_batch_size ) ) env.process( origin_scheduler.scheduler_journal_client_process(env, status_queue) ) - elif scheduler == "task_scheduler": + elif scheduler_type == "task_scheduler": if policy is not None: raise ValueError("task_scheduler doesn't support a scheduling policy") env.process( task_scheduler.scheduler_runner_process( env, task_queues, min_batch_size=min_batch_size ) ) env.process(task_scheduler.scheduler_listener_process(env, status_queue)) else: - raise ValueError(f"Unknown scheduler: {scheduler}") + raise ValueError(f"Unknown scheduler type to simulate: {scheduler_type}") for visit_type, num_workers in workers_per_type.items(): task_queue = task_queues[visit_type] for i in range(num_workers): worker_name = f"worker-{visit_type}-{i}" env.process(worker_process(env, worker_name, task_queue, status_queue)) -def fill_test_data(num_origins: int = 100000): +def fill_test_data(scheduler: SchedulerInterface, num_origins: int = 100000): """Fills the database with mock data to test the simulator.""" - scheduler = get_scheduler(cls="local", db="") - stored_lister = scheduler.get_or_create_lister(name="example") assert stored_lister.id is not None origins = [ ListedOrigin( lister_id=stored_lister.id, url=f"https://example.com/{i:04d}.git", visit_type="git", last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc), ) for i in range(num_origins) ] scheduler.record_listed_origins(origins) scheduler.create_tasks( [ { **origin.as_task_dict(), "policy": "recurring", "next_run": origin.last_update, "interval": timedelta(days=64), } for origin in origins ] ) -def run(scheduler: str, policy: Optional[str], runtime: Optional[int]): +def run( + scheduler: SchedulerInterface, + scheduler_type: str, + policy: Optional[str], + runtime: Optional[int], +): NUM_WORKERS = 48 start_time = datetime.now(tz=timezone.utc) env = Environment(start_time=start_time) - env.scheduler = get_scheduler(cls="local", db="") + env.scheduler = scheduler env.report = SimulationReport() setup( env, - scheduler=scheduler, + scheduler_type=scheduler_type, policy=policy, workers_per_type={"git": NUM_WORKERS}, task_queue_capacity=10000, min_batch_size=1000, ) try: env.run(until=runtime) except KeyboardInterrupt: pass finally: end_time = env.time print("total time:", end_time - start_time) print(env.report.format()) diff --git a/swh/scheduler/tests/test_simulator.py b/swh/scheduler/tests/test_simulator.py index 97eb1bd..a93542e 100644 --- a/swh/scheduler/tests/test_simulator.py +++ b/swh/scheduler/tests/test_simulator.py @@ -1,45 +1,53 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest import swh.scheduler.simulator as simulator from swh.scheduler.tests.common import TASK_TYPES NUM_ORIGINS = 42 TEST_RUNTIME = 1000 -@pytest.fixture -def monkeypatch_get_scheduler(swh_scheduler, monkeypatch): - def get_scheduler(*args, **kwargs): - return swh_scheduler - - monkeypatch.setattr(simulator, "get_scheduler", get_scheduler) - +def test_fill_test_data(swh_scheduler): for task_type in TASK_TYPES.values(): swh_scheduler.create_task_type(task_type) - -def test_fill_test_data(swh_scheduler, monkeypatch_get_scheduler): - simulator.fill_test_data(num_origins=NUM_ORIGINS) + simulator.fill_test_data(swh_scheduler, num_origins=NUM_ORIGINS) res = swh_scheduler.get_listed_origins() assert len(res.origins) == NUM_ORIGINS assert res.next_page_token is None res = swh_scheduler.search_tasks() assert len(res) == NUM_ORIGINS @pytest.mark.parametrize("policy", ("oldest_scheduled_first",)) -def test_run_origin_scheduler(swh_scheduler, monkeypatch_get_scheduler, policy): - simulator.fill_test_data(num_origins=NUM_ORIGINS) - simulator.run("origin_scheduler", policy=policy, runtime=TEST_RUNTIME) +def test_run_origin_scheduler(swh_scheduler, policy): + for task_type in TASK_TYPES.values(): + swh_scheduler.create_task_type(task_type) + simulator.fill_test_data(swh_scheduler, num_origins=NUM_ORIGINS) + simulator.run( + swh_scheduler, + scheduler_type="origin_scheduler", + policy=policy, + runtime=TEST_RUNTIME, + ) + + +def test_run_task_scheduler(swh_scheduler): + for task_type in TASK_TYPES.values(): + swh_scheduler.create_task_type(task_type) -def test_run_task_scheduler(swh_scheduler, monkeypatch_get_scheduler): - simulator.fill_test_data(num_origins=NUM_ORIGINS) - simulator.run("task_scheduler", policy=None, runtime=TEST_RUNTIME) + simulator.fill_test_data(swh_scheduler, num_origins=NUM_ORIGINS) + simulator.run( + swh_scheduler, + scheduler_type="task_scheduler", + policy=None, + runtime=TEST_RUNTIME, + )