Page MenuHomeSoftware Heritage

D4856.id17192.diff
No OneTemporary

D4856.id17192.diff

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -15,6 +15,7 @@
rev: v1.16.0
hooks:
- id: codespell
+ args: [-L simpy]
- repo: local
hooks:
diff --git a/docs/index.rst b/docs/index.rst
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -166,4 +166,5 @@
:maxdepth: 2
cli
+ simulator
/apidoc/swh.scheduler
diff --git a/docs/simulator.rst b/docs/simulator.rst
new file mode 100644
--- /dev/null
+++ b/docs/simulator.rst
@@ -0,0 +1,55 @@
+Software Heritage Scheduler Simulator
+=====================================
+
+This component simulates the interaction between the scheduling and loading
+infrastructure of Software Heritage. This allows quick(er) development of new
+task scheduling policies without having to wait for the actual infrastructure
+to perform (heavy) loading tasks.
+
+Simulator components
+--------------------
+
+- real instance of the scheduler database
+- simulated task queues
+- simulated workers
+- simulated load tasks
+- simulated archive -> scheduler feedback loop
+
+Installing the simulator
+------------------------
+
+The simulator depends on SimPy and other specific libraries. To install them,
+please use:
+
+.. code-block:: bash
+
+ pip install 'swh.scheduler[simulator]'
+
+Running the simulator
+---------------------
+
+The simulator uses a real instance of the scheduler database, which is (at
+least for now) persistent across runs of the simulator. You need to set that up
+beforehand:
+
+.. code-block:: bash
+
+ # if you want to use a temporary instance of postgresql
+ eval `pifpaf run postgresql`
+
+ # Set this variable for the simulator to know which db to connect to. pifpaf
+ # sets other variables like PGPORT, PGHOST, ...
+ export PGDATABASE=swh-scheduler
+
+ # Create/initialize the scheduler database
+ swh db create scheduler -d $PGDATABASE
+ swh db init scheduler -d $PGDATABASE
+
+ # This generates some data in the scheduler database. You can also feed the
+ # database with more realistic data, e.g. from a lister or from a dump of the
+ # production database.
+ python3 -m swh.scheduler.simulator fill_test_data
+
+ # Run the simulator itself, interacting with the scheduler database you've
+ # just seeded.
+ python3 -m swh.scheduler.simulator run
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -34,3 +34,6 @@
[mypy-pytest_postgresql.*]
ignore_missing_imports = True
+
+[mypy-simpy.*]
+ignore_missing_imports = True
diff --git a/requirements-simulator.txt b/requirements-simulator.txt
new file mode 100644
--- /dev/null
+++ b/requirements-simulator.txt
@@ -0,0 +1 @@
+simpy>=3,<4
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -48,7 +48,10 @@
setup_requires=["setuptools-scm"],
use_scm_version=True,
install_requires=parse_requirements() + parse_requirements("swh"),
- extras_require={"testing": parse_requirements("test")},
+ extras_require={
+ "testing": parse_requirements("test", "simulator"),
+ "simulator": parse_requirements("simulator"),
+ },
include_package_data=True,
entry_points="""
[swh.cli.subcommands]
diff --git a/swh/scheduler/simulator/__init__.py b/swh/scheduler/simulator/__init__.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/simulator/__init__.py
@@ -0,0 +1,144 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from datetime import datetime, timezone
+import logging
+from typing import Dict, Generator, Iterator, Tuple
+
+from simpy import Environment, Event, Store
+
+from swh.scheduler import get_scheduler
+from swh.scheduler.model import ListedOrigin
+
+logger = logging.getLogger(__name__)
+
+
+class Queue(Store):
+ """Model a queue of objects to be passed between processes."""
+
+ def __len__(self):
+ return len(self.items or [])
+
+ def slots_remaining(self):
+ return self.capacity - len(self)
+
+
+def load_task_duration(visit_type: str, origin: str) -> float:
+ return 101.0
+
+
+def worker_process(
+ env: Environment, name: str, task_queue: Queue, status_queue: Queue
+) -> Generator[Event, Tuple[str, str], None]:
+ """A worker which consumes tasks from the input task_queue. Tasks
+ themselves send OriginVisitStatus objects to the status_queue."""
+ logger.debug("%s worker %s: Start", env.now, name)
+ while True:
+ visit_type, origin = yield task_queue.get()
+ logger.debug(
+ "%s worker %s: Run task %s origin=%s", env.now, name, visit_type, origin
+ )
+ yield env.process(
+ load_task_process(env, visit_type, origin, status_queue=status_queue)
+ )
+
+
+def load_task_process(
+ env: Environment, visit_type: str, origin: str, status_queue: Queue
+) -> Iterator[Event]:
+ """A loading task. This pushes OriginVisitStatus objects to the
+ status_queue to simulate the visible outcomes of the task.
+
+ Uses the `load_task_duration` function to determine its run time.
+ """
+ # TODO: OVS generation
+ task_duration = load_task_duration(visit_type, origin)
+ logger.debug("%s task %s origin=%s: Start", env.now, visit_type, origin)
+ yield env.timeout(task_duration)
+ logger.debug("%s task %s origin=%s: End", env.now, visit_type, origin)
+
+
+def scheduler_runner_process(
+ env: Environment, scheduler, task_queues: Dict[str, Queue], policy: str,
+) -> Iterator[Event]:
+ """Scheduler runner. Grabs next visits from the database according to the
+ scheduling policy, and fills the task_queues accordingly."""
+
+ while True:
+ for visit_type, queue in task_queues.items():
+ logger.debug("%s runner: processing %s", env.now, visit_type)
+ min_batch_size = max(queue.capacity // 10, 1)
+ remaining = queue.slots_remaining()
+ if remaining < min_batch_size:
+ logger.debug(
+ "%s runner: not enough slots in %s: %s",
+ env.now,
+ visit_type,
+ remaining,
+ )
+ continue
+ next_origins = scheduler.grab_next_visits(
+ visit_type, remaining, policy=policy
+ )
+ logger.debug(
+ "%s runner: running %s %s tasks", env.now, visit_type, len(next_origins)
+ )
+ for origin in next_origins:
+ yield queue.put((origin.visit_type, origin.url))
+
+ yield env.timeout(10.0)
+
+
+def setup(
+ env: Environment,
+ workers_per_type: Dict[str, int],
+ task_queue_capacity: int,
+ policy: str,
+):
+ # We expect PGHOST, PGPORT, ... set externally
+ scheduler = get_scheduler(cls="local", db="")
+
+ task_queues = {
+ visit_type: Queue(env, capacity=task_queue_capacity)
+ for visit_type in workers_per_type
+ }
+ status_queue = Queue(env)
+
+ env.process(scheduler_runner_process(env, scheduler, task_queues, policy))
+
+ for visit_type, num_workers in workers_per_type.items():
+ task_queue = task_queues[visit_type]
+ for i in range(num_workers):
+ worker_name = f"worker-{visit_type}-{i}"
+ env.process(worker_process(env, worker_name, task_queue, status_queue))
+
+
+def fill_test_data():
+ scheduler = get_scheduler(cls="local", db="")
+ stored_lister = scheduler.get_or_create_lister(name="example")
+ scheduler.record_listed_origins(
+ [
+ ListedOrigin(
+ lister_id=stored_lister.id,
+ url=f"https://example.com/{i:04d}.git",
+ visit_type="git",
+ last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc),
+ )
+ for i in range(1000)
+ ]
+ )
+
+
+def run():
+ logging.basicConfig(level=logging.DEBUG)
+ NUM_WORKERS = 2
+ env = Environment()
+ setup(
+ env,
+ workers_per_type={"git": NUM_WORKERS},
+ task_queue_capacity=100,
+ policy="oldest_scheduled_first",
+ )
+ env.run(until=10000)
diff --git a/swh/scheduler/simulator/__main__.py b/swh/scheduler/simulator/__main__.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/simulator/__main__.py
@@ -0,0 +1,26 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import sys
+
+from . import fill_test_data, run
+
+
+def usage():
+ print(f"Usage: {sys.argv[0]} fill_test_data/run")
+ sys.exit(2)
+
+
+try:
+ myself, arg = sys.argv
+except ValueError:
+ usage()
+
+if arg == "run":
+ run()
+elif arg == "fill_test_data":
+ fill_test_data()
+else:
+ usage()

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 3:19 PM (5 d, 22 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3229989

Event Timeline