diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,6 +15,7 @@ rev: v1.16.0 hooks: - id: codespell + args: [-L simpy, -L hist] - repo: local hooks: diff --git a/docs/index.rst b/docs/index.rst --- a/docs/index.rst +++ b/docs/index.rst @@ -166,4 +166,5 @@ :maxdepth: 2 cli + simulator /apidoc/swh.scheduler diff --git a/docs/simulator.rst b/docs/simulator.rst new file mode 100644 --- /dev/null +++ b/docs/simulator.rst @@ -0,0 +1,55 @@ +Software Heritage Scheduler Simulator +===================================== + +This component simulates the interaction between the scheduling and loading +infrastructure of Software Heritage. This allows quick(er) development of new +task scheduling policies without having to wait for the actual infrastructure +to perform (heavy) loading tasks. + +Simulator components +-------------------- + +- real instance of the scheduler database +- simulated task queues +- simulated workers +- simulated load tasks +- simulated archive -> scheduler feedback loop + +Installing the simulator +------------------------ + +The simulator depends on SimPy and other specific libraries. To install them, +please use: + +.. code-block:: bash + + pip install 'swh.scheduler[simulator]' + +Running the simulator +--------------------- + +The simulator uses a real instance of the scheduler database, which is (at +least for now) persistent across runs of the simulator. You need to set that up +beforehand: + +.. code-block:: bash + + # if you want to use a temporary instance of postgresql + eval `pifpaf run postgresql` + + # Set this variable for the simulator to know which db to connect to. pifpaf + # sets other variables like PGPORT, PGHOST, ... + export PGDATABASE=swh-scheduler + + # Create/initialize the scheduler database + swh db create scheduler -d $PGDATABASE + swh db init scheduler -d $PGDATABASE + + # This generates some data in the scheduler database. You can also feed the + # database with more realistic data, e.g. from a lister or from a dump of the + # production database. + swh scheduler simulator fill-test-data + + # Run the simulator itself, interacting with the scheduler database you've + # just seeded. + swh scheduler simulator run --scheduler origin_scheduler diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -26,6 +26,9 @@ [mypy-pika.*] ignore_missing_imports = True +[mypy-plotille.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True @@ -37,3 +40,6 @@ [mypy-pytest_postgresql.*] ignore_missing_imports = True + +[mypy-simpy.*] +ignore_missing_imports = True diff --git a/requirements-simulator.txt b/requirements-simulator.txt new file mode 100644 --- /dev/null +++ b/requirements-simulator.txt @@ -0,0 +1,2 @@ +plotille +simpy>=3,<4 diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -16,22 +16,23 @@ long_description = f.read() -def parse_requirements(name=None): - if name: - reqf = "requirements-%s.txt" % name - else: - reqf = "requirements.txt" - +def parse_requirements(*names): requirements = [] - if not path.exists(reqf): - return requirements + for name in names: + if name: + reqf = "requirements-%s.txt" % name + else: + reqf = "requirements.txt" + + if not path.exists(reqf): + return requirements - with open(reqf) as f: - for line in f.readlines(): - line = line.strip() - if not line or line.startswith("#"): - continue - requirements.append(line) + with open(reqf) as f: + for line in f.readlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + requirements.append(line) return requirements @@ -47,10 +48,11 @@ packages=find_packages(), setup_requires=["setuptools-scm"], use_scm_version=True, - install_requires=parse_requirements() + parse_requirements("swh"), + install_requires=parse_requirements(None, "swh"), extras_require={ - "testing": parse_requirements("test") + parse_requirements("journal"), + "testing": parse_requirements("test", "journal", "simulator"), "journal": parse_requirements("journal"), + "simulator": parse_requirements("simulator"), }, include_package_data=True, entry_points=""" diff --git a/swh/scheduler/cli/__init__.py b/swh/scheduler/cli/__init__.py --- a/swh/scheduler/cli/__init__.py +++ b/swh/scheduler/cli/__init__.py @@ -82,7 +82,7 @@ ctx.obj["config"] = conf -from . import admin, celery_monitor, origin, task, task_type # noqa +from . import admin, celery_monitor, origin, simulator, task, task_type # noqa def main(): diff --git a/swh/scheduler/cli/simulator.py b/swh/scheduler/cli/simulator.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/cli/simulator.py @@ -0,0 +1,57 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import time + +import click + +from . import cli + + +@cli.group("simulator") +def simulator(): + """Scheduler simulator.""" + pass + + +@simulator.command("fill-test-data") +def fill_test_data_command(): + """Fill the scheduler with test data for simulation purposes.""" + from swh.scheduler.simulator import fill_test_data + + click.echo("Filling test data...") + start = time.monotonic() + fill_test_data() + runtime = time.monotonic() - start + click.echo(f"Completed in {runtime:.2f} seconds") + + +@simulator.command("run") +@click.option( + "--scheduler", + "-s", + type=click.Choice(["task_scheduler", "origin_scheduler"]), + default="origin_scheduler", + help="Scheduler to simulate", +) +@click.option( + "--policy", + "-p", + type=click.Choice(["oldest_scheduled_first"]), + default="oldest_scheduled_first", + help="Scheduling policy to simulate (only for origin_scheduler)", +) +@click.option("--runtime", "-t", type=float, help="Simulated runtime") +def run_command(scheduler, policy, runtime): + """Run the scheduler simulator. + + By default, the simulation runs forever. You can cap the simulated runtime + with the --runtime option, and you can always press Ctrl+C to interrupt the + running simulation. + """ + from swh.scheduler.simulator import run + + policy = policy if scheduler == "origin_scheduler" else None + run(scheduler=scheduler, policy=policy, runtime=runtime) diff --git a/swh/scheduler/simulator/__init__.py b/swh/scheduler/simulator/__init__.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/simulator/__init__.py @@ -0,0 +1,126 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import datetime, timedelta, timezone +import logging +from typing import Dict, Generator, Optional + +from simpy import Event + +from swh.scheduler import get_scheduler +from swh.scheduler.model import ListedOrigin + +from . import origin_scheduler, task_scheduler +from .common import Environment, Queue, SimulationReport, Task +from .origins import load_task_process + +logger = logging.getLogger(__name__) + + +def worker_process( + env: Environment, name: str, task_queue: Queue, status_queue: Queue +) -> Generator[Event, Task, None]: + """A worker which consumes tasks from the input task_queue. Tasks + themselves send OriginVisitStatus objects to the status_queue.""" + logger.debug("%s worker %s: Start", env.time, name) + while True: + task = yield task_queue.get() + logger.debug( + "%s worker %s: Run task %s origin=%s", + env.time, + name, + task.visit_type, + task.origin, + ) + yield env.process(load_task_process(env, task, status_queue=status_queue)) + + +def setup( + env: Environment, + scheduler: str, + policy: Optional[str], + workers_per_type: Dict[str, int], + task_queue_capacity: int, +): + # We expect PGHOST, PGPORT, ... set externally + task_queues = { + visit_type: Queue(env, capacity=task_queue_capacity) + for visit_type in workers_per_type + } + status_queue = Queue(env) + + if scheduler == "origin_scheduler": + if policy is None: + raise ValueError("origin_scheduler needs a scheduling policy") + env.process(origin_scheduler.scheduler_runner_process(env, task_queues, policy)) + env.process( + origin_scheduler.scheduler_journal_client_process(env, status_queue) + ) + elif scheduler == "task_scheduler": + if policy is not None: + raise ValueError("task_scheduler doesn't support a scheduling policy") + env.process(task_scheduler.scheduler_runner_process(env, task_queues)) + env.process(task_scheduler.scheduler_listener_process(env, status_queue)) + else: + raise ValueError(f"Unknown scheduler: {scheduler}") + + for visit_type, num_workers in workers_per_type.items(): + task_queue = task_queues[visit_type] + for i in range(num_workers): + worker_name = f"worker-{visit_type}-{i}" + env.process(worker_process(env, worker_name, task_queue, status_queue)) + + +def fill_test_data(num_origins: int = 100000): + scheduler = get_scheduler(cls="local", db="") + + stored_lister = scheduler.get_or_create_lister(name="example") + assert stored_lister.id is not None + + origins = [ + ListedOrigin( + lister_id=stored_lister.id, + url=f"https://example.com/{i:04d}.git", + visit_type="git", + last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc), + ) + for i in range(num_origins) + ] + scheduler.record_listed_origins(origins) + + scheduler.create_tasks( + [ + { + **origin.as_task_dict(), + "policy": "recurring", + "next_run": origin.last_update, + "interval": timedelta(days=64), + } + for origin in origins + ] + ) + + +def run(scheduler: str, policy: Optional[str], runtime: Optional[int]): + NUM_WORKERS = 48 + start_time = datetime.now(tz=timezone.utc) + env = Environment(start_time=start_time) + env.scheduler = get_scheduler(cls="local", db="") + env.report = SimulationReport() + setup( + env, + scheduler=scheduler, + policy=policy, + workers_per_type={"git": NUM_WORKERS}, + task_queue_capacity=10000, + ) + try: + env.run(until=runtime) + except KeyboardInterrupt: + pass + finally: + end_time = env.time + print("total time:", end_time - start_time) + print(env.report.format()) diff --git a/swh/scheduler/simulator/common.py b/swh/scheduler/simulator/common.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/simulator/common.py @@ -0,0 +1,102 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from dataclasses import dataclass, field +from datetime import datetime, timedelta +import textwrap +from typing import Dict, List, Tuple +import uuid + +import plotille +from simpy import Environment as _Environment +from simpy import Store + +from swh.model.model import OriginVisitStatus +from swh.scheduler.interface import SchedulerInterface + + +@dataclass +class SimulationReport: + DURATION_THRESHOLD = 3600 + """Max duration for histograms""" + + total_visits: int = 0 + """Total count of finished visits""" + + visit_runtimes: Dict[Tuple[str, bool], List[float]] = field(default_factory=dict) + """Collect the visit runtimes for each (status, eventful) tuple""" + + def record_visit(self, duration: float, eventful: bool, status: str) -> None: + self.total_visits += 1 + self.visit_runtimes.setdefault((status, eventful), []).append(duration) + + @property + def useless_visits(self): + """Number of uneventful, full visits""" + return len(self.visit_runtimes.get(("full", False), [])) + + def runtime_histogram(self, status: str, eventful: bool) -> str: + runtimes = self.visit_runtimes.get((status, eventful), []) + return plotille.hist( + [runtime for runtime in runtimes if runtime <= self.DURATION_THRESHOLD] + ) + + def format(self): + full_visits = self.visit_runtimes.get(("full", True), []) + histogram = self.runtime_histogram("full", True) + long_tasks = sum(runtime > self.DURATION_THRESHOLD for runtime in full_visits) + + return ( + textwrap.dedent( + f"""\ + Total visits: {self.total_visits} + Useless visits: {self.useless_visits} + Eventful visits: {len(full_visits)} + Very long running tasks: {long_tasks} + Visit time histogram for eventful visits: + """ + ) + + histogram + ) + + +@dataclass +class Task: + visit_type: str + origin: str + backend_id: uuid.UUID = field(default_factory=uuid.uuid4) + + +@dataclass +class TaskEvent: + task: Task + status: OriginVisitStatus + eventful: bool = field(default=False) + + +class Queue(Store): + """Model a queue of objects to be passed between processes.""" + + def __len__(self): + return len(self.items or []) + + def slots_remaining(self): + return self.capacity - len(self) + + +class Environment(_Environment): + report: SimulationReport + scheduler: SchedulerInterface + + def __init__(self, start_time: datetime): + if start_time.tzinfo is None: + raise ValueError("start_time must have timezone information") + self.start_time = start_time + super().__init__() + + @property + def time(self): + """Get the current simulated wall clock time""" + return self.start_time + timedelta(seconds=self.now) diff --git a/swh/scheduler/simulator/origin_scheduler.py b/swh/scheduler/simulator/origin_scheduler.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/simulator/origin_scheduler.py @@ -0,0 +1,69 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Agents using the new origin-aware scheduler.""" + +import logging +from typing import Any, Dict, Generator, Iterator, List + +from simpy import Event + +from swh.scheduler.journal_client import process_journal_objects + +from .common import Environment, Queue, Task, TaskEvent + +logger = logging.getLogger(__name__) + + +def scheduler_runner_process( + env: Environment, task_queues: Dict[str, Queue], policy: str, +) -> Iterator[Event]: + """Scheduler runner. Grabs next visits from the database according to the + scheduling policy, and fills the task_queues accordingly.""" + + while True: + for visit_type, queue in task_queues.items(): + min_batch_size = max(queue.capacity // 10, 1) + remaining = queue.slots_remaining() + if remaining < min_batch_size: + continue + next_origins = env.scheduler.grab_next_visits( + visit_type, remaining, policy=policy + ) + logger.debug( + "%s runner: running %s %s tasks", + env.time, + visit_type, + len(next_origins), + ) + for origin in next_origins: + yield queue.put(Task(visit_type=origin.visit_type, origin=origin.url)) + + yield env.timeout(10.0) + + +def scheduler_journal_client_process( + env: Environment, status_queue: Queue +) -> Generator[Event, TaskEvent, None]: + """Scheduler journal client. Every once in a while, pulls + `OriginVisitStatus`es from the status_queue to update the scheduler + origin_visit_stats table.""" + BATCH_SIZE = 100 + + statuses: List[Dict[str, Any]] = [] + while True: + task_event = yield status_queue.get() + statuses.append(task_event.status.to_dict()) + if len(statuses) < BATCH_SIZE: + continue + + logger.debug( + "%s journal client: processing %s statuses", env.time, len(statuses) + ) + process_journal_objects( + {"origin_visit_status": statuses}, scheduler=env.scheduler + ) + + statuses = [] diff --git a/swh/scheduler/simulator/origins.py b/swh/scheduler/simulator/origins.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/simulator/origins.py @@ -0,0 +1,119 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import timedelta +import hashlib +import logging +import os +from typing import Iterator, Optional, Tuple + +import attr +from simpy import Event + +from swh.model.model import OriginVisitStatus +from swh.scheduler.model import OriginVisitStats + +from .common import Environment, Queue, Task, TaskEvent + +logger = logging.getLogger(__name__) + + +class OriginModel: + MIN_RUN_TIME = 0.5 + """Minimal run time for a visit (retrieved from production data)""" + + MAX_RUN_TIME = 7200 + """Max run time for a visit""" + + PER_COMMIT_RUN_TIME = 0.1 + """Run time per commit""" + + def __init__(self, type: str, origin: str): + self.type = type + self.origin = origin + + def seconds_between_commits(self): + n_bytes = 2 + num_buckets = 2 ** (8 * n_bytes) + bucket = int.from_bytes( + hashlib.md5(self.origin.encode()).digest()[0:n_bytes], "little" + ) + # minimum: 1 second (bucket == 0) + # max: 10 years (bucket == num_buckets - 1) + ten_y = 10 * 365 * 24 * 3600 + + return ten_y ** (bucket / num_buckets) + # return 1 + (ten_y - 1) * (bucket / (num_buckets - 1)) + + def load_task_characteristics( + self, env: Environment, stats: Optional[OriginVisitStats] + ) -> Tuple[float, bool, str]: + """Returns the (run_time, eventfulness, end_status) of the next + origin visit.""" + if stats and stats.last_eventful: + time_since_last_successful_run = env.time - stats.last_eventful + else: + time_since_last_successful_run = timedelta(days=365) + + seconds_between_commits = self.seconds_between_commits() + logger.debug( + "Interval between commits: %s", timedelta(seconds=seconds_between_commits) + ) + + seconds_since_last_successful = time_since_last_successful_run.total_seconds() + if seconds_since_last_successful < seconds_between_commits: + # No commits since last visit => uneventful + return (self.MIN_RUN_TIME, False, "full") + else: + n_commits = seconds_since_last_successful / seconds_between_commits + run_time = self.MIN_RUN_TIME + self.PER_COMMIT_RUN_TIME * n_commits + if run_time > self.MAX_RUN_TIME: + return (self.MAX_RUN_TIME, False, "partial") + else: + return (run_time, True, "full") + + +def load_task_process( + env: Environment, task: Task, status_queue: Queue +) -> Iterator[Event]: + """A loading task. This pushes OriginVisitStatus objects to the + status_queue to simulate the visible outcomes of the task. + + Uses the `load_task_duration` function to determine its run time. + """ + # This is cheating; actual tasks access the state from the storage, not the + # scheduler + stats = env.scheduler.origin_visit_stats_get(task.origin, task.visit_type) + last_snapshot = stats.last_snapshot if stats else None + + status = OriginVisitStatus( + origin=task.origin, + visit=42, + type=task.visit_type, + status="created", + date=env.time, + snapshot=None, + ) + origin_model = OriginModel(task.visit_type, task.origin) + (run_time, eventful, end_status) = origin_model.load_task_characteristics( + env, stats + ) + logger.debug("%s task %s origin=%s: Start", env.time, task.visit_type, task.origin) + yield status_queue.put(TaskEvent(task=task, status=status)) + yield env.timeout(run_time) + logger.debug("%s task %s origin=%s: End", env.time, task.visit_type, task.origin) + + new_snapshot = os.urandom(20) if eventful else last_snapshot + yield status_queue.put( + TaskEvent( + task=task, + status=attr.evolve( + status, status=end_status, date=env.time, snapshot=new_snapshot + ), + eventful=eventful, + ) + ) + + env.report.record_visit(run_time, eventful, end_status) diff --git a/swh/scheduler/simulator/task_scheduler.py b/swh/scheduler/simulator/task_scheduler.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/simulator/task_scheduler.py @@ -0,0 +1,77 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Agents using the "old" task-based scheduler.""" + +import logging +from typing import Dict, Generator, Iterator + +from simpy import Event + +from .common import Environment, Queue, Task, TaskEvent + +logger = logging.getLogger(__name__) + + +def scheduler_runner_process( + env: Environment, task_queues: Dict[str, Queue], +) -> Iterator[Event]: + """Scheduler runner. Grabs next visits from the database according to the + scheduling policy, and fills the task_queues accordingly.""" + + while True: + for visit_type, queue in task_queues.items(): + min_batch_size = max(queue.capacity // 10, 1) + remaining = queue.slots_remaining() + if remaining < min_batch_size: + continue + next_tasks = env.scheduler.grab_ready_tasks( + f"load-{visit_type}", num_tasks=remaining, timestamp=env.time + ) + logger.debug( + "%s runner: running %s %s tasks", env.time, visit_type, len(next_tasks), + ) + + sim_tasks = [ + Task(visit_type=visit_type, origin=task["arguments"]["kwargs"]["url"]) + for task in next_tasks + ] + + env.scheduler.mass_schedule_task_runs( + [ + { + "task": task["id"], + "scheduled": env.time, + "backend_id": str(sim_task.backend_id), + } + for task, sim_task in zip(next_tasks, sim_tasks) + ] + ) + + for sim_task in sim_tasks: + yield queue.put(sim_task) + + yield env.timeout(10.0) + + +def scheduler_listener_process( + env: Environment, status_queue: Queue +) -> Generator[Event, TaskEvent, None]: + """Scheduler listener. In the real world this would listen to celery + events, but we listen to the status_queue and simulate celery events from + that.""" + while True: + event = yield status_queue.get() + if event.status.status == "ongoing": + env.scheduler.start_task_run(event.task.backend_id, timestamp=env.time) + else: + if event.status.status == "full": + status = "eventful" if event.eventful else "uneventful" + else: + status = "failed" + + env.scheduler.end_task_run( + str(event.task.backend_id), status=status, timestamp=env.time + ) diff --git a/swh/scheduler/tests/test_simulator.py b/swh/scheduler/tests/test_simulator.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/test_simulator.py @@ -0,0 +1,45 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +import swh.scheduler.simulator as simulator +from swh.scheduler.tests.common import TASK_TYPES + +NUM_ORIGINS = 42 +TEST_RUNTIME = 1000 + + +@pytest.fixture +def monkeypatch_get_scheduler(swh_scheduler, monkeypatch): + def get_scheduler(*args, **kwargs): + return swh_scheduler + + monkeypatch.setattr(simulator, "get_scheduler", get_scheduler) + + for task_type in TASK_TYPES.values(): + swh_scheduler.create_task_type(task_type) + + +def test_fill_test_data(swh_scheduler, monkeypatch_get_scheduler): + simulator.fill_test_data(num_origins=NUM_ORIGINS) + + res = swh_scheduler.get_listed_origins() + assert len(res.origins) == NUM_ORIGINS + assert res.next_page_token is None + + res = swh_scheduler.search_tasks() + assert len(res) == NUM_ORIGINS + + +@pytest.mark.parametrize("policy", ("oldest_scheduled_first",)) +def test_run_origin_scheduler(swh_scheduler, monkeypatch_get_scheduler, policy): + simulator.fill_test_data(num_origins=NUM_ORIGINS) + simulator.run("origin_scheduler", policy=policy, runtime=TEST_RUNTIME) + + +def test_run_task_scheduler(swh_scheduler, monkeypatch_get_scheduler): + simulator.fill_test_data(num_origins=NUM_ORIGINS) + simulator.run("task_scheduler", policy=None, runtime=TEST_RUNTIME)