diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,6 +15,7 @@ rev: v1.16.0 hooks: - id: codespell + args: [-L simpy] - repo: local hooks: diff --git a/docs/index.rst b/docs/index.rst --- a/docs/index.rst +++ b/docs/index.rst @@ -166,4 +166,5 @@ :maxdepth: 2 cli + simulator /apidoc/swh.scheduler diff --git a/docs/simulator.rst b/docs/simulator.rst new file mode 100644 --- /dev/null +++ b/docs/simulator.rst @@ -0,0 +1,55 @@ +Software Heritage Scheduler Simulator +===================================== + +This component simulates the interaction between the scheduling and loading +infrastructure of Software Heritage. This allows quick(er) development of new +task scheduling policies without having to wait for the actual infrastructure +to perform (heavy) loading tasks. + +Simulator components +-------------------- + +- real instance of the scheduler database +- simulated task queues +- simulated workers +- simulated load tasks +- simulated archive -> scheduler feedback loop + +Installing the simulator +------------------------ + +The simulator depends on SimPy and other specific libraries. To install them, +please use: + +.. code-block:: bash + + pip install 'swh.scheduler[simulator]' + +Running the simulator +--------------------- + +The simulator uses a real instance of the scheduler database, which is (at +least for now) persistent across runs of the simulator. You need to set that up +beforehand: + +.. code-block:: bash + + # if you want to use a temporary instance of postgresql + eval `pifpaf run postgresql` + + # Set this variable for the simulator to know which db to connect to. pifpaf + # sets other variables like PGPORT, PGHOST, ... + export PGDATABASE=swh-scheduler + + # Create/initialize the scheduler database + swh db create scheduler -d $PGDATABASE + swh db init scheduler -d $PGDATABASE + + # This generates some data in the scheduler database. You can also feed the + # database with more realistic data, e.g. from a lister or from a dump of the + # production database. + python3 -m swh.scheduler.simulator fill_test_data + + # Run the simulator itself, interacting with the scheduler database you've + # just seeded. + python3 -m swh.scheduler.simulator run diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -37,3 +37,6 @@ [mypy-pytest_postgresql.*] ignore_missing_imports = True + +[mypy-simpy.*] +ignore_missing_imports = True diff --git a/requirements-simulator.txt b/requirements-simulator.txt new file mode 100644 --- /dev/null +++ b/requirements-simulator.txt @@ -0,0 +1 @@ +simpy>=3,<4 diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -16,22 +16,23 @@ long_description = f.read() -def parse_requirements(name=None): - if name: - reqf = "requirements-%s.txt" % name - else: - reqf = "requirements.txt" - +def parse_requirements(*names): requirements = [] - if not path.exists(reqf): - return requirements + for name in names: + if name: + reqf = "requirements-%s.txt" % name + else: + reqf = "requirements.txt" + + if not path.exists(reqf): + return requirements - with open(reqf) as f: - for line in f.readlines(): - line = line.strip() - if not line or line.startswith("#"): - continue - requirements.append(line) + with open(reqf) as f: + for line in f.readlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + requirements.append(line) return requirements @@ -47,10 +48,11 @@ packages=find_packages(), setup_requires=["setuptools-scm"], use_scm_version=True, - install_requires=parse_requirements() + parse_requirements("swh"), + install_requires=parse_requirements(None, "swh"), extras_require={ - "testing": parse_requirements("test") + parse_requirements("journal"), + "testing": parse_requirements("test", "journal", "simulator"), "journal": parse_requirements("journal"), + "simulator": parse_requirements("simulator"), }, include_package_data=True, entry_points=""" diff --git a/swh/scheduler/simulator/__init__.py b/swh/scheduler/simulator/__init__.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/simulator/__init__.py @@ -0,0 +1,144 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import datetime, timezone +import logging +from typing import Dict, Generator, Iterator, Tuple + +from simpy import Environment, Event, Store + +from swh.scheduler import get_scheduler +from swh.scheduler.model import ListedOrigin + +logger = logging.getLogger(__name__) + + +class Queue(Store): + """Model a queue of objects to be passed between processes.""" + + def __len__(self): + return len(self.items or []) + + def slots_remaining(self): + return self.capacity - len(self) + + +def load_task_duration(visit_type: str, origin: str) -> float: + return 101.0 + + +def worker_process( + env: Environment, name: str, task_queue: Queue, status_queue: Queue +) -> Generator[Event, Tuple[str, str], None]: + """A worker which consumes tasks from the input task_queue. Tasks + themselves send OriginVisitStatus objects to the status_queue.""" + logger.debug("%s worker %s: Start", env.now, name) + while True: + visit_type, origin = yield task_queue.get() + logger.debug( + "%s worker %s: Run task %s origin=%s", env.now, name, visit_type, origin + ) + yield env.process( + load_task_process(env, visit_type, origin, status_queue=status_queue) + ) + + +def load_task_process( + env: Environment, visit_type: str, origin: str, status_queue: Queue +) -> Iterator[Event]: + """A loading task. This pushes OriginVisitStatus objects to the + status_queue to simulate the visible outcomes of the task. + + Uses the `load_task_duration` function to determine its run time. + """ + # TODO: OVS generation + task_duration = load_task_duration(visit_type, origin) + logger.debug("%s task %s origin=%s: Start", env.now, visit_type, origin) + yield env.timeout(task_duration) + logger.debug("%s task %s origin=%s: End", env.now, visit_type, origin) + + +def scheduler_runner_process( + env: Environment, scheduler, task_queues: Dict[str, Queue], policy: str, +) -> Iterator[Event]: + """Scheduler runner. Grabs next visits from the database according to the + scheduling policy, and fills the task_queues accordingly.""" + + while True: + for visit_type, queue in task_queues.items(): + logger.debug("%s runner: processing %s", env.now, visit_type) + min_batch_size = max(queue.capacity // 10, 1) + remaining = queue.slots_remaining() + if remaining < min_batch_size: + logger.debug( + "%s runner: not enough slots in %s: %s", + env.now, + visit_type, + remaining, + ) + continue + next_origins = scheduler.grab_next_visits( + visit_type, remaining, policy=policy + ) + logger.debug( + "%s runner: running %s %s tasks", env.now, visit_type, len(next_origins) + ) + for origin in next_origins: + yield queue.put((origin.visit_type, origin.url)) + + yield env.timeout(10.0) + + +def setup( + env: Environment, + workers_per_type: Dict[str, int], + task_queue_capacity: int, + policy: str, +): + # We expect PGHOST, PGPORT, ... set externally + scheduler = get_scheduler(cls="local", db="") + + task_queues = { + visit_type: Queue(env, capacity=task_queue_capacity) + for visit_type in workers_per_type + } + status_queue = Queue(env) + + env.process(scheduler_runner_process(env, scheduler, task_queues, policy)) + + for visit_type, num_workers in workers_per_type.items(): + task_queue = task_queues[visit_type] + for i in range(num_workers): + worker_name = f"worker-{visit_type}-{i}" + env.process(worker_process(env, worker_name, task_queue, status_queue)) + + +def fill_test_data(): + scheduler = get_scheduler(cls="local", db="") + stored_lister = scheduler.get_or_create_lister(name="example") + scheduler.record_listed_origins( + [ + ListedOrigin( + lister_id=stored_lister.id, + url=f"https://example.com/{i:04d}.git", + visit_type="git", + last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc), + ) + for i in range(1000) + ] + ) + + +def run(): + logging.basicConfig(level=logging.DEBUG) + NUM_WORKERS = 2 + env = Environment() + setup( + env, + workers_per_type={"git": NUM_WORKERS}, + task_queue_capacity=100, + policy="oldest_scheduled_first", + ) + env.run(until=10000) diff --git a/swh/scheduler/simulator/__main__.py b/swh/scheduler/simulator/__main__.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/simulator/__main__.py @@ -0,0 +1,31 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import sys + +from . import fill_test_data, run + + +def usage(): + print(f"Usage: {sys.argv[0]} fill_test_data/run") + sys.exit(2) + + +def main(argv): + try: + myself, arg = argv + except ValueError: + usage() + + if arg == "run": + run() + elif arg == "fill_test_data": + fill_test_data() + else: + usage() + + +if __name__ == "__main__": + main(sys.argv)