Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345377
D4856.id17192.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
8 KB
Subscribers
None
D4856.id17192.diff
View Options
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -15,6 +15,7 @@
rev: v1.16.0
hooks:
- id: codespell
+ args: [-L simpy]
- repo: local
hooks:
diff --git a/docs/index.rst b/docs/index.rst
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -166,4 +166,5 @@
:maxdepth: 2
cli
+ simulator
/apidoc/swh.scheduler
diff --git a/docs/simulator.rst b/docs/simulator.rst
new file mode 100644
--- /dev/null
+++ b/docs/simulator.rst
@@ -0,0 +1,55 @@
+Software Heritage Scheduler Simulator
+=====================================
+
+This component simulates the interaction between the scheduling and loading
+infrastructure of Software Heritage. This allows quick(er) development of new
+task scheduling policies without having to wait for the actual infrastructure
+to perform (heavy) loading tasks.
+
+Simulator components
+--------------------
+
+- real instance of the scheduler database
+- simulated task queues
+- simulated workers
+- simulated load tasks
+- simulated archive -> scheduler feedback loop
+
+Installing the simulator
+------------------------
+
+The simulator depends on SimPy and other specific libraries. To install them,
+please use:
+
+.. code-block:: bash
+
+ pip install 'swh.scheduler[simulator]'
+
+Running the simulator
+---------------------
+
+The simulator uses a real instance of the scheduler database, which is (at
+least for now) persistent across runs of the simulator. You need to set that up
+beforehand:
+
+.. code-block:: bash
+
+ # if you want to use a temporary instance of postgresql
+ eval `pifpaf run postgresql`
+
+ # Set this variable for the simulator to know which db to connect to. pifpaf
+ # sets other variables like PGPORT, PGHOST, ...
+ export PGDATABASE=swh-scheduler
+
+ # Create/initialize the scheduler database
+ swh db create scheduler -d $PGDATABASE
+ swh db init scheduler -d $PGDATABASE
+
+ # This generates some data in the scheduler database. You can also feed the
+ # database with more realistic data, e.g. from a lister or from a dump of the
+ # production database.
+ python3 -m swh.scheduler.simulator fill_test_data
+
+ # Run the simulator itself, interacting with the scheduler database you've
+ # just seeded.
+ python3 -m swh.scheduler.simulator run
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -34,3 +34,6 @@
[mypy-pytest_postgresql.*]
ignore_missing_imports = True
+
+[mypy-simpy.*]
+ignore_missing_imports = True
diff --git a/requirements-simulator.txt b/requirements-simulator.txt
new file mode 100644
--- /dev/null
+++ b/requirements-simulator.txt
@@ -0,0 +1 @@
+simpy>=3,<4
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -48,7 +48,10 @@
setup_requires=["setuptools-scm"],
use_scm_version=True,
install_requires=parse_requirements() + parse_requirements("swh"),
- extras_require={"testing": parse_requirements("test")},
+ extras_require={
+ "testing": parse_requirements("test", "simulator"),
+ "simulator": parse_requirements("simulator"),
+ },
include_package_data=True,
entry_points="""
[swh.cli.subcommands]
diff --git a/swh/scheduler/simulator/__init__.py b/swh/scheduler/simulator/__init__.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/simulator/__init__.py
@@ -0,0 +1,144 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from datetime import datetime, timezone
+import logging
+from typing import Dict, Generator, Iterator, Tuple
+
+from simpy import Environment, Event, Store
+
+from swh.scheduler import get_scheduler
+from swh.scheduler.model import ListedOrigin
+
+logger = logging.getLogger(__name__)
+
+
+class Queue(Store):
+ """Model a queue of objects to be passed between processes."""
+
+ def __len__(self):
+ return len(self.items or [])
+
+ def slots_remaining(self):
+ return self.capacity - len(self)
+
+
+def load_task_duration(visit_type: str, origin: str) -> float:
+ return 101.0
+
+
+def worker_process(
+ env: Environment, name: str, task_queue: Queue, status_queue: Queue
+) -> Generator[Event, Tuple[str, str], None]:
+ """A worker which consumes tasks from the input task_queue. Tasks
+ themselves send OriginVisitStatus objects to the status_queue."""
+ logger.debug("%s worker %s: Start", env.now, name)
+ while True:
+ visit_type, origin = yield task_queue.get()
+ logger.debug(
+ "%s worker %s: Run task %s origin=%s", env.now, name, visit_type, origin
+ )
+ yield env.process(
+ load_task_process(env, visit_type, origin, status_queue=status_queue)
+ )
+
+
+def load_task_process(
+ env: Environment, visit_type: str, origin: str, status_queue: Queue
+) -> Iterator[Event]:
+ """A loading task. This pushes OriginVisitStatus objects to the
+ status_queue to simulate the visible outcomes of the task.
+
+ Uses the `load_task_duration` function to determine its run time.
+ """
+ # TODO: OVS generation
+ task_duration = load_task_duration(visit_type, origin)
+ logger.debug("%s task %s origin=%s: Start", env.now, visit_type, origin)
+ yield env.timeout(task_duration)
+ logger.debug("%s task %s origin=%s: End", env.now, visit_type, origin)
+
+
+def scheduler_runner_process(
+ env: Environment, scheduler, task_queues: Dict[str, Queue], policy: str,
+) -> Iterator[Event]:
+ """Scheduler runner. Grabs next visits from the database according to the
+ scheduling policy, and fills the task_queues accordingly."""
+
+ while True:
+ for visit_type, queue in task_queues.items():
+ logger.debug("%s runner: processing %s", env.now, visit_type)
+ min_batch_size = max(queue.capacity // 10, 1)
+ remaining = queue.slots_remaining()
+ if remaining < min_batch_size:
+ logger.debug(
+ "%s runner: not enough slots in %s: %s",
+ env.now,
+ visit_type,
+ remaining,
+ )
+ continue
+ next_origins = scheduler.grab_next_visits(
+ visit_type, remaining, policy=policy
+ )
+ logger.debug(
+ "%s runner: running %s %s tasks", env.now, visit_type, len(next_origins)
+ )
+ for origin in next_origins:
+ yield queue.put((origin.visit_type, origin.url))
+
+ yield env.timeout(10.0)
+
+
+def setup(
+ env: Environment,
+ workers_per_type: Dict[str, int],
+ task_queue_capacity: int,
+ policy: str,
+):
+ # We expect PGHOST, PGPORT, ... set externally
+ scheduler = get_scheduler(cls="local", db="")
+
+ task_queues = {
+ visit_type: Queue(env, capacity=task_queue_capacity)
+ for visit_type in workers_per_type
+ }
+ status_queue = Queue(env)
+
+ env.process(scheduler_runner_process(env, scheduler, task_queues, policy))
+
+ for visit_type, num_workers in workers_per_type.items():
+ task_queue = task_queues[visit_type]
+ for i in range(num_workers):
+ worker_name = f"worker-{visit_type}-{i}"
+ env.process(worker_process(env, worker_name, task_queue, status_queue))
+
+
+def fill_test_data():
+ scheduler = get_scheduler(cls="local", db="")
+ stored_lister = scheduler.get_or_create_lister(name="example")
+ scheduler.record_listed_origins(
+ [
+ ListedOrigin(
+ lister_id=stored_lister.id,
+ url=f"https://example.com/{i:04d}.git",
+ visit_type="git",
+ last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc),
+ )
+ for i in range(1000)
+ ]
+ )
+
+
+def run():
+ logging.basicConfig(level=logging.DEBUG)
+ NUM_WORKERS = 2
+ env = Environment()
+ setup(
+ env,
+ workers_per_type={"git": NUM_WORKERS},
+ task_queue_capacity=100,
+ policy="oldest_scheduled_first",
+ )
+ env.run(until=10000)
diff --git a/swh/scheduler/simulator/__main__.py b/swh/scheduler/simulator/__main__.py
new file mode 100644
--- /dev/null
+++ b/swh/scheduler/simulator/__main__.py
@@ -0,0 +1,26 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import sys
+
+from . import fill_test_data, run
+
+
+def usage():
+ print(f"Usage: {sys.argv[0]} fill_test_data/run")
+ sys.exit(2)
+
+
+try:
+ myself, arg = sys.argv
+except ValueError:
+ usage()
+
+if arg == "run":
+ run()
+elif arg == "fill_test_data":
+ fill_test_data()
+else:
+ usage()
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 3:19 PM (6 d, 7 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3229989
Attached To
D4856: Introduce scaffolding for a scheduler simulator
Event Timeline
Log In to Comment