diff --git a/docs/simulator.rst b/docs/simulator.rst index ba10aa6..368deea 100644 --- a/docs/simulator.rst +++ b/docs/simulator.rst @@ -1,55 +1,65 @@ +.. _swh-scheduler-simulator: + Software Heritage Scheduler Simulator ===================================== This component simulates the interaction between the scheduling and loading infrastructure of Software Heritage. This allows quick(er) development of new task scheduling policies without having to wait for the actual infrastructure to perform (heavy) loading tasks. Simulator components -------------------- - real instance of the scheduler database -- simulated task queues -- simulated workers -- simulated load tasks -- simulated archive -> scheduler feedback loop +- simulated task queues: replaces RabbitMQ with simple in-memory structures +- simulated workers: replaces Celery with simple while loops +- simulated load tasks: replaces loaders with noops that take a certain time, + and generate synthetic OriginVisitStatus objects +- simulated archive -> scheduler feedback loop: OriginVisitStatus objects are + pushed to a simple queue which gets processed by the scheduler journal + client's process function directly (instead of going through swh.storage and + swh.journal (kafka)) + +In short, only the scheduler database and scheduler logic is kept; every other +component (RabbitMQ, Celery, Kafka, SWH loaders, SWH storage) is either replaced +with an barebones in-process utility, or removed entirely. Installing the simulator ------------------------ The simulator depends on SimPy and other specific libraries. To install them, please use: .. code-block:: bash pip install 'swh.scheduler[simulator]' Running the simulator --------------------- The simulator uses a real instance of the scheduler database, which is (at least for now) persistent across runs of the simulator. You need to set that up beforehand: .. code-block:: bash # if you want to use a temporary instance of postgresql eval `pifpaf run postgresql` # Set this variable for the simulator to know which db to connect to. pifpaf # sets other variables like PGPORT, PGHOST, ... export PGDATABASE=swh-scheduler # Create/initialize the scheduler database swh db create scheduler -d $PGDATABASE swh db init scheduler -d $PGDATABASE # This generates some data in the scheduler database. You can also feed the # database with more realistic data, e.g. from a lister or from a dump of the # production database. swh scheduler simulator fill-test-data # Run the simulator itself, interacting with the scheduler database you've # just seeded. swh scheduler simulator run --scheduler origin_scheduler diff --git a/swh/scheduler/cli/simulator.py b/swh/scheduler/cli/simulator.py index 154e55c..d3834bd 100644 --- a/swh/scheduler/cli/simulator.py +++ b/swh/scheduler/cli/simulator.py @@ -1,57 +1,61 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import time import click from . import cli @cli.group("simulator") def simulator(): """Scheduler simulator.""" pass @simulator.command("fill-test-data") def fill_test_data_command(): """Fill the scheduler with test data for simulation purposes.""" from swh.scheduler.simulator import fill_test_data click.echo("Filling test data...") start = time.monotonic() fill_test_data() runtime = time.monotonic() - start click.echo(f"Completed in {runtime:.2f} seconds") @simulator.command("run") @click.option( "--scheduler", "-s", type=click.Choice(["task_scheduler", "origin_scheduler"]), default="origin_scheduler", help="Scheduler to simulate", ) @click.option( "--policy", "-p", type=click.Choice(["oldest_scheduled_first"]), default="oldest_scheduled_first", help="Scheduling policy to simulate (only for origin_scheduler)", ) @click.option("--runtime", "-t", type=float, help="Simulated runtime") def run_command(scheduler, policy, runtime): """Run the scheduler simulator. By default, the simulation runs forever. You can cap the simulated runtime with the --runtime option, and you can always press Ctrl+C to interrupt the running simulation. + + 'task_scheduler' is the "classic" task-based scheduler; 'origin_scheduler' + is the new origin-visit-aware simulator. The latter uses --policy to decide + which origins to schedule first based on information from listers. """ from swh.scheduler.simulator import run policy = policy if scheduler == "origin_scheduler" else None run(scheduler=scheduler, policy=policy, runtime=runtime) diff --git a/swh/scheduler/simulator/__init__.py b/swh/scheduler/simulator/__init__.py index 138b74b..098bcfe 100644 --- a/swh/scheduler/simulator/__init__.py +++ b/swh/scheduler/simulator/__init__.py @@ -1,136 +1,144 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +"""This package runs the scheduler in a simulated environment, to evaluate +various metrics. See :ref:`swh-scheduler-simulator`. + +This module orchestrates of the simulator by initializing processes and connecting +them together; these processes are defined in modules in the package and +simulate/call specific components.""" + from datetime import datetime, timedelta, timezone import logging from typing import Dict, Generator, Optional from simpy import Event from swh.scheduler import get_scheduler from swh.scheduler.model import ListedOrigin from . import origin_scheduler, task_scheduler from .common import Environment, Queue, SimulationReport, Task from .origins import load_task_process logger = logging.getLogger(__name__) def worker_process( env: Environment, name: str, task_queue: Queue, status_queue: Queue ) -> Generator[Event, Task, None]: """A worker which consumes tasks from the input task_queue. Tasks themselves send OriginVisitStatus objects to the status_queue.""" logger.debug("%s worker %s: Start", env.time, name) while True: task = yield task_queue.get() logger.debug( "%s worker %s: Run task %s origin=%s", env.time, name, task.visit_type, task.origin, ) yield env.process(load_task_process(env, task, status_queue=status_queue)) def setup( env: Environment, scheduler: str, policy: Optional[str], workers_per_type: Dict[str, int], task_queue_capacity: int, min_batch_size: int, ): # We expect PGHOST, PGPORT, ... set externally task_queues = { visit_type: Queue(env, capacity=task_queue_capacity) for visit_type in workers_per_type } status_queue = Queue(env) if scheduler == "origin_scheduler": if policy is None: raise ValueError("origin_scheduler needs a scheduling policy") env.process( origin_scheduler.scheduler_runner_process( env, task_queues, policy, min_batch_size=min_batch_size ) ) env.process( origin_scheduler.scheduler_journal_client_process(env, status_queue) ) elif scheduler == "task_scheduler": if policy is not None: raise ValueError("task_scheduler doesn't support a scheduling policy") env.process( task_scheduler.scheduler_runner_process( env, task_queues, min_batch_size=min_batch_size ) ) env.process(task_scheduler.scheduler_listener_process(env, status_queue)) else: raise ValueError(f"Unknown scheduler: {scheduler}") for visit_type, num_workers in workers_per_type.items(): task_queue = task_queues[visit_type] for i in range(num_workers): worker_name = f"worker-{visit_type}-{i}" env.process(worker_process(env, worker_name, task_queue, status_queue)) def fill_test_data(num_origins: int = 100000): + """Fills the database with mock data to test the simulator.""" scheduler = get_scheduler(cls="local", db="") stored_lister = scheduler.get_or_create_lister(name="example") assert stored_lister.id is not None origins = [ ListedOrigin( lister_id=stored_lister.id, url=f"https://example.com/{i:04d}.git", visit_type="git", last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc), ) for i in range(num_origins) ] scheduler.record_listed_origins(origins) scheduler.create_tasks( [ { **origin.as_task_dict(), "policy": "recurring", "next_run": origin.last_update, "interval": timedelta(days=64), } for origin in origins ] ) def run(scheduler: str, policy: Optional[str], runtime: Optional[int]): NUM_WORKERS = 48 start_time = datetime.now(tz=timezone.utc) env = Environment(start_time=start_time) env.scheduler = get_scheduler(cls="local", db="") env.report = SimulationReport() setup( env, scheduler=scheduler, policy=policy, workers_per_type={"git": NUM_WORKERS}, task_queue_capacity=10000, min_batch_size=1000, ) try: env.run(until=runtime) except KeyboardInterrupt: pass finally: end_time = env.time print("total time:", end_time - start_time) print(env.report.format()) diff --git a/swh/scheduler/simulator/origins.py b/swh/scheduler/simulator/origins.py index f6b941b..b394e64 100644 --- a/swh/scheduler/simulator/origins.py +++ b/swh/scheduler/simulator/origins.py @@ -1,119 +1,128 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +"""This module implements a model of the frequency of updates of an origin +and how long it takes to load it.""" + from datetime import timedelta import hashlib import logging import os from typing import Iterator, Optional, Tuple import attr from simpy import Event from swh.model.model import OriginVisitStatus from swh.scheduler.model import OriginVisitStats from .common import Environment, Queue, Task, TaskEvent logger = logging.getLogger(__name__) class OriginModel: MIN_RUN_TIME = 0.5 """Minimal run time for a visit (retrieved from production data)""" MAX_RUN_TIME = 7200 """Max run time for a visit""" PER_COMMIT_RUN_TIME = 0.1 """Run time per commit""" def __init__(self, type: str, origin: str): self.type = type self.origin = origin def seconds_between_commits(self): + """Returns a random 'average time between two commits' of this origin, + used to estimate the run time of a load task, and how much the loading + architecture is lagging behind origin updates.""" n_bytes = 2 num_buckets = 2 ** (8 * n_bytes) + + # Deterministic seed to generate "random" characteristics of this origin bucket = int.from_bytes( hashlib.md5(self.origin.encode()).digest()[0:n_bytes], "little" ) + # minimum: 1 second (bucket == 0) # max: 10 years (bucket == num_buckets - 1) ten_y = 10 * 365 * 24 * 3600 return ten_y ** (bucket / num_buckets) # return 1 + (ten_y - 1) * (bucket / (num_buckets - 1)) def load_task_characteristics( self, env: Environment, stats: Optional[OriginVisitStats] ) -> Tuple[float, bool, str]: """Returns the (run_time, eventfulness, end_status) of the next origin visit.""" if stats and stats.last_eventful: time_since_last_successful_run = env.time - stats.last_eventful else: time_since_last_successful_run = timedelta(days=365) seconds_between_commits = self.seconds_between_commits() logger.debug( "Interval between commits: %s", timedelta(seconds=seconds_between_commits) ) seconds_since_last_successful = time_since_last_successful_run.total_seconds() if seconds_since_last_successful < seconds_between_commits: # No commits since last visit => uneventful return (self.MIN_RUN_TIME, False, "full") else: n_commits = seconds_since_last_successful / seconds_between_commits run_time = self.MIN_RUN_TIME + self.PER_COMMIT_RUN_TIME * n_commits if run_time > self.MAX_RUN_TIME: return (self.MAX_RUN_TIME, False, "partial") else: return (run_time, True, "full") def load_task_process( env: Environment, task: Task, status_queue: Queue ) -> Iterator[Event]: """A loading task. This pushes OriginVisitStatus objects to the status_queue to simulate the visible outcomes of the task. Uses the `load_task_duration` function to determine its run time. """ # This is cheating; actual tasks access the state from the storage, not the # scheduler stats = env.scheduler.origin_visit_stats_get(task.origin, task.visit_type) last_snapshot = stats.last_snapshot if stats else None status = OriginVisitStatus( origin=task.origin, visit=42, type=task.visit_type, status="created", date=env.time, snapshot=None, ) origin_model = OriginModel(task.visit_type, task.origin) (run_time, eventful, end_status) = origin_model.load_task_characteristics( env, stats ) logger.debug("%s task %s origin=%s: Start", env.time, task.visit_type, task.origin) yield status_queue.put(TaskEvent(task=task, status=status)) yield env.timeout(run_time) logger.debug("%s task %s origin=%s: End", env.time, task.visit_type, task.origin) new_snapshot = os.urandom(20) if eventful else last_snapshot yield status_queue.put( TaskEvent( task=task, status=attr.evolve( status, status=end_status, date=env.time, snapshot=new_snapshot ), eventful=eventful, ) ) env.report.record_visit(run_time, eventful, end_status)