diff --git a/docs/simulator.rst b/docs/simulator.rst index 979a389..35708b6 100644 --- a/docs/simulator.rst +++ b/docs/simulator.rst @@ -1,80 +1,80 @@ .. _swh-scheduler-simulator: Software Heritage Scheduler Simulator ===================================== This component simulates the interaction between the scheduling and loading infrastructure of Software Heritage. This allows quick(er) development of new task scheduling policies without having to wait for the actual infrastructure to perform (heavy) loading tasks. Simulator components -------------------- - real instance of the scheduler database - simulated task queues: replaces RabbitMQ with simple in-memory structures - simulated workers: replaces Celery with simple while loops - simulated load tasks: replaces loaders with noops that take a certain time, and generate synthetic OriginVisitStatus objects - simulated archive -> scheduler feedback loop: OriginVisitStatus objects are pushed to a simple queue which gets processed by the scheduler journal client's process function directly (instead of going through swh.storage and swh.journal (kafka)) In short, only the scheduler database and scheduler logic is kept; every other component (RabbitMQ, Celery, Kafka, SWH loaders, SWH storage) is either replaced with an barebones in-process utility, or removed entirely. Installing the simulator ------------------------ The simulator depends on SimPy and other specific libraries. To install them, please use: .. code-block:: bash pip install 'swh.scheduler[simulator]' Running the simulator --------------------- The simulator uses a real instance of the scheduler database, which is (at least for now) persistent across runs of the simulator. You need to set that up beforehand: .. code-block:: bash # if you want to use a temporary instance of postgresql eval `pifpaf run postgresql` # Set this variable for the simulator to know which db to connect to. pifpaf # sets other variables like PGPORT, PGHOST, ... export PGDATABASE=swh-scheduler # Create/initialize the scheduler database swh db create scheduler -d $PGDATABASE swh db init scheduler -d $PGDATABASE # This generates some data in the scheduler database. You can also feed the # database with more realistic data, e.g. from a lister or from a dump of the # production database. swh scheduler -d "dbname=$PGDATABASE" simulator fill-test-data # Run the simulator itself, interacting with the scheduler database you've # just seeded. swh scheduler -d "dbname=$PGDATABASE" simulator run --scheduler origin_scheduler Origin model ------------ The origin model is how we represent the behaviors of origins: when they are created/discovered, how many commits they get and when, and when they fail to load. For now it is only a simple approximation designed to exercise simple cases: origin creation/discovery, a continuous stream of commits, and failure if they have too many commits to load at once. -For details, see :py:`swh.scheduler.simulator.origins`. +For details, see :py:mod:`swh.scheduler.simulator.origins`. To keep the simulation fast enough, each origin's state is kept in memory, so the simulator process will linearly increase in memory usage as it runs. diff --git a/swh/scheduler/simulator/origin_scheduler.py b/swh/scheduler/simulator/origin_scheduler.py index ca16912..682e12f 100644 --- a/swh/scheduler/simulator/origin_scheduler.py +++ b/swh/scheduler/simulator/origin_scheduler.py @@ -1,68 +1,68 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Agents using the new origin-aware scheduler.""" import logging from typing import Any, Dict, Generator, Iterator, List from simpy import Event from swh.scheduler.journal_client import process_journal_objects from .common import Environment, Queue, Task, TaskEvent logger = logging.getLogger(__name__) def scheduler_runner_process( env: Environment, task_queues: Dict[str, Queue], policy: str, min_batch_size: int ) -> Iterator[Event]: """Scheduler runner. Grabs next visits from the database according to the scheduling policy, and fills the task_queues accordingly.""" while True: for visit_type, queue in task_queues.items(): remaining = queue.slots_remaining() if remaining < min_batch_size: continue next_origins = env.scheduler.grab_next_visits( visit_type, remaining, policy=policy, timestamp=env.time ) logger.debug( "%s runner: running %s %s tasks", env.time, visit_type, len(next_origins), ) for origin in next_origins: yield queue.put(Task(visit_type=origin.visit_type, origin=origin.url)) yield env.timeout(10.0) def scheduler_journal_client_process( env: Environment, status_queue: Queue ) -> Generator[Event, TaskEvent, None]: """Scheduler journal client. Every once in a while, pulls - `OriginVisitStatus`es from the status_queue to update the scheduler - origin_visit_stats table.""" + :class:`OriginVisitStatuses ` + from the status_queue to update the scheduler origin_visit_stats table.""" BATCH_SIZE = 100 statuses: List[Dict[str, Any]] = [] while True: task_event = yield status_queue.get() statuses.append(task_event.status.to_dict()) if len(statuses) < BATCH_SIZE: continue logger.debug( "%s journal client: processing %s statuses", env.time, len(statuses) ) process_journal_objects( {"origin_visit_status": statuses}, scheduler=env.scheduler ) statuses = [] diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py index 46bbe81..0db6204 100644 --- a/swh/scheduler/task.py +++ b/swh/scheduler/task.py @@ -1,81 +1,85 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from celery import current_app import celery.app.task from celery.utils.log import get_task_logger from swh.core.statsd import Statsd def ts(): return int(datetime.utcnow().timestamp()) class SWHTask(celery.app.task.Task): """a schedulable task (abstract class) Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated """ _statsd = None _log = None + reject_on_worker_lost = None + """Inherited from :class:`celery.app.task.Task`, but we need to override + its docstring because it uses a custom ReST role""" + @property def statsd(self): if self._statsd: return self._statsd worker_name = current_app.conf.get("worker_name") if worker_name: self._statsd = Statsd( constant_tags={"task": self.name, "worker": worker_name,} ) return self._statsd else: statsd = Statsd( constant_tags={"task": self.name, "worker": "unknown worker",} ) return statsd def __call__(self, *args, **kwargs): self.statsd.increment("swh_task_called_count") self.statsd.gauge("swh_task_start_ts", ts()) with self.statsd.timed("swh_task_duration_seconds"): result = super().__call__(*args, **kwargs) try: status = result["status"] if status == "success": status = "eventful" if result.get("eventful") else "uneventful" except Exception: status = "eventful" if result else "uneventful" self.statsd.gauge("swh_task_end_ts", ts(), tags={"status": status}) return result def on_failure(self, exc, task_id, args, kwargs, einfo): self.statsd.increment("swh_task_failure_count") def on_success(self, retval, task_id, args, kwargs): self.statsd.increment("swh_task_success_count") # this is a swh specific event. Used to attach the retval to the # task_run self.send_event("task-result", result=retval) @property def log(self): if self._log is None: self._log = get_task_logger(self.name) return self._log def run(self, *args, **kwargs): self.log.debug("%s: args=%s, kwargs=%s", self.name, args, kwargs) ret = super().run(*args, **kwargs) self.log.debug("%s: OK => %s", self.name, ret) return ret