diff --git a/docs/simulator.rst b/docs/simulator.rst index 1405ea3..ba10aa6 100644 --- a/docs/simulator.rst +++ b/docs/simulator.rst @@ -1,55 +1,55 @@ Software Heritage Scheduler Simulator ===================================== This component simulates the interaction between the scheduling and loading infrastructure of Software Heritage. This allows quick(er) development of new task scheduling policies without having to wait for the actual infrastructure to perform (heavy) loading tasks. Simulator components -------------------- - real instance of the scheduler database - simulated task queues - simulated workers - simulated load tasks - simulated archive -> scheduler feedback loop Installing the simulator ------------------------ The simulator depends on SimPy and other specific libraries. To install them, please use: .. code-block:: bash pip install 'swh.scheduler[simulator]' Running the simulator --------------------- The simulator uses a real instance of the scheduler database, which is (at least for now) persistent across runs of the simulator. You need to set that up beforehand: .. code-block:: bash # if you want to use a temporary instance of postgresql eval `pifpaf run postgresql` # Set this variable for the simulator to know which db to connect to. pifpaf # sets other variables like PGPORT, PGHOST, ... export PGDATABASE=swh-scheduler # Create/initialize the scheduler database swh db create scheduler -d $PGDATABASE swh db init scheduler -d $PGDATABASE # This generates some data in the scheduler database. You can also feed the # database with more realistic data, e.g. from a lister or from a dump of the # production database. - python3 -m swh.scheduler.simulator fill_test_data + swh scheduler simulator fill-test-data # Run the simulator itself, interacting with the scheduler database you've # just seeded. - python3 -m swh.scheduler.simulator run + swh scheduler simulator run --scheduler origin_scheduler diff --git a/swh/scheduler/cli/__init__.py b/swh/scheduler/cli/__init__.py index 05c6d14..9a2ab19 100644 --- a/swh/scheduler/cli/__init__.py +++ b/swh/scheduler/cli/__init__.py @@ -1,99 +1,99 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import click from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.core.cli import swh as swh_cli_group # If you're looking for subcommand imports, they are further down this file to # avoid a circular import! @swh_cli_group.group( name="scheduler", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup ) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.option( "--database", "-d", default=None, help="Scheduling database DSN (imply cls is 'local')", ) @click.option( "--url", "-u", default=None, help="Scheduler's url access (imply cls is 'remote')" ) @click.option( "--no-stdout", is_flag=True, default=False, help="Do NOT output logs on the console" ) @click.pass_context def cli(ctx, config_file, database, url, no_stdout): """Software Heritage Scheduler tools. Use a local scheduler instance by default (plugged to the main scheduler db). """ try: from psycopg2 import OperationalError except ImportError: class OperationalError(Exception): pass from swh.core import config from swh.scheduler import DEFAULT_CONFIG, get_scheduler ctx.ensure_object(dict) logger = logging.getLogger(__name__) scheduler = None conf = config.read(config_file, DEFAULT_CONFIG) if "scheduler" not in conf: raise ValueError("missing 'scheduler' configuration") if database: conf["scheduler"]["cls"] = "local" conf["scheduler"]["db"] = database elif url: conf["scheduler"]["cls"] = "remote" conf["scheduler"]["url"] = url sched_conf = conf["scheduler"] try: logger.debug("Instantiating scheduler with %s", sched_conf) scheduler = get_scheduler(**sched_conf) except (ValueError, OperationalError): # it's the subcommand to decide whether not having a proper # scheduler instance is a problem. pass ctx.obj["scheduler"] = scheduler ctx.obj["config"] = conf -from . import admin, celery_monitor, journal, origin, task, task_type # noqa +from . import admin, celery_monitor, journal, origin, simulator, task, task_type # noqa def main(): import click.core click.core.DEPRECATED_HELP_NOTICE = """ DEPRECATED! Please use the command 'swh scheduler'.""" cli.deprecated = True return cli(auto_envvar_prefix="SWH_SCHEDULER") if __name__ == "__main__": main() diff --git a/swh/scheduler/cli/simulator.py b/swh/scheduler/cli/simulator.py new file mode 100644 index 0000000..154e55c --- /dev/null +++ b/swh/scheduler/cli/simulator.py @@ -0,0 +1,57 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import time + +import click + +from . import cli + + +@cli.group("simulator") +def simulator(): + """Scheduler simulator.""" + pass + + +@simulator.command("fill-test-data") +def fill_test_data_command(): + """Fill the scheduler with test data for simulation purposes.""" + from swh.scheduler.simulator import fill_test_data + + click.echo("Filling test data...") + start = time.monotonic() + fill_test_data() + runtime = time.monotonic() - start + click.echo(f"Completed in {runtime:.2f} seconds") + + +@simulator.command("run") +@click.option( + "--scheduler", + "-s", + type=click.Choice(["task_scheduler", "origin_scheduler"]), + default="origin_scheduler", + help="Scheduler to simulate", +) +@click.option( + "--policy", + "-p", + type=click.Choice(["oldest_scheduled_first"]), + default="oldest_scheduled_first", + help="Scheduling policy to simulate (only for origin_scheduler)", +) +@click.option("--runtime", "-t", type=float, help="Simulated runtime") +def run_command(scheduler, policy, runtime): + """Run the scheduler simulator. + + By default, the simulation runs forever. You can cap the simulated runtime + with the --runtime option, and you can always press Ctrl+C to interrupt the + running simulation. + """ + from swh.scheduler.simulator import run + + policy = policy if scheduler == "origin_scheduler" else None + run(scheduler=scheduler, policy=policy, runtime=runtime) diff --git a/swh/scheduler/simulator/__init__.py b/swh/scheduler/simulator/__init__.py index 07b6e5f..6c30d2b 100644 --- a/swh/scheduler/simulator/__init__.py +++ b/swh/scheduler/simulator/__init__.py @@ -1,110 +1,111 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta, timezone import logging from typing import Dict, Generator, Optional from simpy import Event from swh.scheduler import get_scheduler from swh.scheduler.model import ListedOrigin from .common import Environment, Queue, SimulationReport, Task from .origin_scheduler import scheduler_journal_client_process, scheduler_runner_process from .origins import load_task_process logger = logging.getLogger(__name__) def worker_process( env: Environment, name: str, task_queue: Queue, status_queue: Queue ) -> Generator[Event, Task, None]: """A worker which consumes tasks from the input task_queue. Tasks themselves send OriginVisitStatus objects to the status_queue.""" logger.debug("%s worker %s: Start", env.time, name) while True: task = yield task_queue.get() logger.debug( "%s worker %s: Run task %s origin=%s", env.time, name, task.visit_type, task.origin, ) yield env.process(load_task_process(env, task, status_queue=status_queue)) def setup( env: Environment, + scheduler: str, + policy: str, workers_per_type: Dict[str, int], task_queue_capacity: int, - policy: str, ): # We expect PGHOST, PGPORT, ... set externally task_queues = { visit_type: Queue(env, capacity=task_queue_capacity) for visit_type in workers_per_type } status_queue = Queue(env) env.process(scheduler_runner_process(env, task_queues, policy)) env.process(scheduler_journal_client_process(env, status_queue)) for visit_type, num_workers in workers_per_type.items(): task_queue = task_queues[visit_type] for i in range(num_workers): worker_name = f"worker-{visit_type}-{i}" env.process(worker_process(env, worker_name, task_queue, status_queue)) def fill_test_data(): scheduler = get_scheduler(cls="local", db="") stored_lister = scheduler.get_or_create_lister(name="example") origins = [ ListedOrigin( lister_id=stored_lister.id, url=f"https://example.com/{i:04d}.git", visit_type="git", last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc), ) for i in range(100000) ] scheduler.record_listed_origins(origins) scheduler.create_tasks( [ { **origin.as_task_dict(), "policy": "recurring", "next_run": origin.last_update, "interval": timedelta(days=64), } for origin in origins ] ) -def run(runtime: Optional[int]): - logging.basicConfig(level=logging.INFO) +def run(scheduler: str, policy: str, runtime: Optional[int]): NUM_WORKERS = 48 start_time = datetime.now(tz=timezone.utc) env = Environment(start_time=start_time) env.scheduler = get_scheduler(cls="local", db="") env.report = SimulationReport() setup( env, + scheduler=scheduler, + policy=policy, workers_per_type={"git": NUM_WORKERS}, task_queue_capacity=10000, - policy="oldest_scheduled_first", ) try: env.run(until=runtime) except KeyboardInterrupt: pass finally: end_time = env.time print("total time:", end_time - start_time) print(env.report.format()) diff --git a/swh/scheduler/simulator/__main__.py b/swh/scheduler/simulator/__main__.py deleted file mode 100644 index ddf5031..0000000 --- a/swh/scheduler/simulator/__main__.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright (C) 2021 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import sys - -from . import fill_test_data, run - - -def usage(): - print(f"Usage: {sys.argv[0]} fill_test_data/run []") - sys.exit(2) - - -def main(argv): - try: - myself, arg, *args = argv - except ValueError: - usage() - - if arg == "run": - run(int(args[0]) if args else None) - elif arg == "fill_test_data": - if args: - usage() - fill_test_data() - else: - usage() - - -if __name__ == "__main__": - main(sys.argv)