diff --git a/docs/simulator.rst b/docs/simulator.rst index 923d71a..979a389 100644 --- a/docs/simulator.rst +++ b/docs/simulator.rst @@ -1,65 +1,80 @@ .. _swh-scheduler-simulator: Software Heritage Scheduler Simulator ===================================== This component simulates the interaction between the scheduling and loading infrastructure of Software Heritage. This allows quick(er) development of new task scheduling policies without having to wait for the actual infrastructure to perform (heavy) loading tasks. Simulator components -------------------- - real instance of the scheduler database - simulated task queues: replaces RabbitMQ with simple in-memory structures - simulated workers: replaces Celery with simple while loops - simulated load tasks: replaces loaders with noops that take a certain time, and generate synthetic OriginVisitStatus objects - simulated archive -> scheduler feedback loop: OriginVisitStatus objects are pushed to a simple queue which gets processed by the scheduler journal client's process function directly (instead of going through swh.storage and swh.journal (kafka)) In short, only the scheduler database and scheduler logic is kept; every other component (RabbitMQ, Celery, Kafka, SWH loaders, SWH storage) is either replaced with an barebones in-process utility, or removed entirely. Installing the simulator ------------------------ The simulator depends on SimPy and other specific libraries. To install them, please use: .. code-block:: bash pip install 'swh.scheduler[simulator]' Running the simulator --------------------- The simulator uses a real instance of the scheduler database, which is (at least for now) persistent across runs of the simulator. You need to set that up beforehand: .. code-block:: bash # if you want to use a temporary instance of postgresql eval `pifpaf run postgresql` # Set this variable for the simulator to know which db to connect to. pifpaf # sets other variables like PGPORT, PGHOST, ... export PGDATABASE=swh-scheduler # Create/initialize the scheduler database swh db create scheduler -d $PGDATABASE swh db init scheduler -d $PGDATABASE # This generates some data in the scheduler database. You can also feed the # database with more realistic data, e.g. from a lister or from a dump of the # production database. swh scheduler -d "dbname=$PGDATABASE" simulator fill-test-data # Run the simulator itself, interacting with the scheduler database you've # just seeded. swh scheduler -d "dbname=$PGDATABASE" simulator run --scheduler origin_scheduler + + +Origin model +------------ + +The origin model is how we represent the behaviors of origins: when they are +created/discovered, how many commits they get and when, and when they fail to load. + +For now it is only a simple approximation designed to exercise simple cases: +origin creation/discovery, a continuous stream of commits, and failure if they have +too many commits to load at once. +For details, see :py:`swh.scheduler.simulator.origins`. + +To keep the simulation fast enough, each origin's state is kept in memory, so the +simulator process will linearly increase in memory usage as it runs. diff --git a/swh/scheduler/simulator/__init__.py b/swh/scheduler/simulator/__init__.py index ef7f7f4..fa2228b 100644 --- a/swh/scheduler/simulator/__init__.py +++ b/swh/scheduler/simulator/__init__.py @@ -1,159 +1,163 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This package runs the scheduler in a simulated environment, to evaluate various metrics. See :ref:`swh-scheduler-simulator`. This module orchestrates of the simulator by initializing processes and connecting them together; these processes are defined in modules in the package and simulate/call specific components.""" from datetime import datetime, timedelta, timezone import logging from typing import Dict, Generator, Optional from simpy import Event from swh.scheduler.interface import SchedulerInterface from . import origin_scheduler, task_scheduler from .common import Environment, Queue, SimulationReport, Task -from .origins import generate_listed_origin, load_task_process +from .origins import generate_listed_origin, lister_process, load_task_process logger = logging.getLogger(__name__) def update_metrics_process( env: Environment, update_interval: int ) -> Generator[Event, None, None]: """Update the scheduler metrics every `update_interval` (simulated) seconds, and add them to the SimulationReport """ t0 = env.time while True: metrics = env.scheduler.update_metrics(timestamp=env.time) env.report.record_metrics(env.time, metrics) dt = env.time - t0 logger.info("time:%s visits:%s", dt, env.report.total_visits) yield env.timeout(update_interval) def worker_process( env: Environment, name: str, task_queue: Queue, status_queue: Queue ) -> Generator[Event, Task, None]: """A worker which consumes tasks from the input task_queue. Tasks themselves send OriginVisitStatus objects to the status_queue.""" logger.debug("%s worker %s: Start", env.time, name) while True: task = yield task_queue.get() logger.debug( "%s worker %s: Run task %s origin=%s", env.time, name, task.visit_type, task.origin, ) yield env.process(load_task_process(env, task, status_queue=status_queue)) def setup( env: Environment, scheduler_type: str, policy: Optional[str], workers_per_type: Dict[str, int], task_queue_capacity: int, min_batch_size: int, metrics_update_interval: int, ): task_queues = { visit_type: Queue(env, capacity=task_queue_capacity) for visit_type in workers_per_type } status_queue = Queue(env) if scheduler_type == "origin_scheduler": if policy is None: raise ValueError("origin_scheduler needs a scheduling policy") env.process( origin_scheduler.scheduler_runner_process( env, task_queues, policy, min_batch_size=min_batch_size ) ) env.process( origin_scheduler.scheduler_journal_client_process(env, status_queue) ) elif scheduler_type == "task_scheduler": if policy is not None: raise ValueError("task_scheduler doesn't support a scheduling policy") env.process( task_scheduler.scheduler_runner_process( env, task_queues, min_batch_size=min_batch_size ) ) env.process(task_scheduler.scheduler_listener_process(env, status_queue)) else: raise ValueError(f"Unknown scheduler type to simulate: {scheduler_type}") env.process(update_metrics_process(env, metrics_update_interval)) for visit_type, num_workers in workers_per_type.items(): task_queue = task_queues[visit_type] for i in range(num_workers): worker_name = f"worker-{visit_type}-{i}" env.process(worker_process(env, worker_name, task_queue, status_queue)) + lister = env.scheduler.get_or_create_lister(name="example") + assert lister.id + env.process(lister_process(env, lister.id)) + def fill_test_data(scheduler: SchedulerInterface, num_origins: int = 100000): """Fills the database with mock data to test the simulator.""" stored_lister = scheduler.get_or_create_lister(name="example") assert stored_lister.id is not None # Generate 'num_origins' new origins origins = [generate_listed_origin(stored_lister.id) for _ in range(num_origins)] scheduler.record_listed_origins(origins) scheduler.create_tasks( [ { **origin.as_task_dict(), "policy": "recurring", "next_run": origin.last_update, "interval": timedelta(days=64), } for origin in origins ] ) def run( scheduler: SchedulerInterface, scheduler_type: str, policy: Optional[str], runtime: Optional[int], ): NUM_WORKERS = 48 start_time = datetime.now(tz=timezone.utc) env = Environment(start_time=start_time) env.scheduler = scheduler env.report = SimulationReport() setup( env, scheduler_type=scheduler_type, policy=policy, workers_per_type={"git": NUM_WORKERS}, task_queue_capacity=10000, min_batch_size=1000, metrics_update_interval=3600, ) try: env.run(until=runtime) except KeyboardInterrupt: pass finally: end_time = env.time print("total simulated time:", end_time - start_time) metrics = env.scheduler.update_metrics(timestamp=end_time) env.report.record_metrics(end_time, metrics) print(env.report.format()) diff --git a/swh/scheduler/simulator/origins.py b/swh/scheduler/simulator/origins.py index 7d1ae3c..b66a39c 100644 --- a/swh/scheduler/simulator/origins.py +++ b/swh/scheduler/simulator/origins.py @@ -1,179 +1,214 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This module implements a model of the frequency of updates of an origin -and how long it takes to load it.""" +and how long it takes to load it. + +For each origin, a commit frequency is chosen deterministically based on the +hash of its URL and assume all origins were created on an arbitrary epoch. +From this we compute a number of commits, that is the product of these two. + +And the run time of a load task is approximated as proportional to the number +of commits since the previous visit of the origin (possibly 0).""" from datetime import datetime, timedelta, timezone import hashlib import logging import os -from typing import Iterator, Optional, Tuple +from typing import Generator, Iterator, List, Optional, Tuple import uuid import attr from simpy import Event from swh.model.model import OriginVisitStatus from swh.scheduler.model import ListedOrigin, OriginVisitStats from .common import Environment, Queue, Task, TaskEvent logger = logging.getLogger(__name__) _nb_generated_origins = 0 def generate_listed_origin( lister_id: uuid.UUID, now: Optional[datetime] = None ) -> ListedOrigin: """Returns a globally unique new origin. Seed the `last_update` value according to the OriginModel and the passed timestamp. Arguments: lister: instance of the lister that generated this origin now: time of listing, to emulate last_update (defaults to :func:`datetime.now`) """ global _nb_generated_origins _nb_generated_origins += 1 assert _nb_generated_origins < 10 ** 6, "Too many origins!" if now is None: now = datetime.now(tz=timezone.utc) url = f"https://example.com/{_nb_generated_origins:6d}.git" visit_type = "git" origin = OriginModel(visit_type, url) return ListedOrigin( lister_id=lister_id, url=url, visit_type=visit_type, last_update=origin.get_last_update(now), ) class OriginModel: MIN_RUN_TIME = 0.5 """Minimal run time for a visit (retrieved from production data)""" MAX_RUN_TIME = 7200 """Max run time for a visit""" PER_COMMIT_RUN_TIME = 0.1 """Run time per commit""" EPOCH = datetime(2015, 9, 1, 0, 0, 0, tzinfo=timezone.utc) """The origin of all origins (at least according to Software Heritage)""" def __init__(self, type: str, origin: str): self.type = type self.origin = origin def seconds_between_commits(self): """Returns a random 'average time between two commits' of this origin, used to estimate the run time of a load task, and how much the loading architecture is lagging behind origin updates.""" n_bytes = 2 num_buckets = 2 ** (8 * n_bytes) # Deterministic seed to generate "random" characteristics of this origin bucket = int.from_bytes( hashlib.md5(self.origin.encode()).digest()[0:n_bytes], "little" ) # minimum: 1 second (bucket == 0) # max: 10 years (bucket == num_buckets - 1) ten_y = 10 * 365 * 24 * 3600 return ten_y ** (bucket / num_buckets) # return 1 + (ten_y - 1) * (bucket / (num_buckets - 1)) def get_last_update(self, now: datetime): """Get the last_update value for this origin. We assume that the origin had its first commit at `EPOCH`, and that one commit happened every `self.seconds_between_commits()`. This returns the last commit date before or equal to `now`. """ _, time_since_last_commit = divmod( (now - self.EPOCH).total_seconds(), self.seconds_between_commits() ) return now - timedelta(seconds=time_since_last_commit) def load_task_characteristics( self, env: Environment, stats: Optional[OriginVisitStats] ) -> Tuple[float, bool, str]: """Returns the (run_time, eventfulness, end_status) of the next origin visit.""" if stats and stats.last_eventful: time_since_last_successful_run = env.time - stats.last_eventful else: time_since_last_successful_run = timedelta(days=365) seconds_between_commits = self.seconds_between_commits() logger.debug( "Interval between commits: %s", timedelta(seconds=seconds_between_commits) ) seconds_since_last_successful = time_since_last_successful_run.total_seconds() if seconds_since_last_successful < seconds_between_commits: # No commits since last visit => uneventful return (self.MIN_RUN_TIME, False, "full") else: n_commits = seconds_since_last_successful / seconds_between_commits run_time = self.MIN_RUN_TIME + self.PER_COMMIT_RUN_TIME * n_commits if run_time > self.MAX_RUN_TIME: return (self.MAX_RUN_TIME, False, "partial") else: return (run_time, True, "full") +def lister_process( + env: Environment, lister_id: uuid.UUID +) -> Generator[Event, Event, None]: + """Every hour, generate new origins and update the `last_update` field for + the ones this process generated in the past""" + NUM_NEW_ORIGINS = 100 + + origins: List[ListedOrigin] = [] + + while True: + updated_origins = [] + for origin in origins: + model = OriginModel(origin.visit_type, origin.url) + updated_origins.append( + attr.evolve(origin, last_update=model.get_last_update(env.time)) + ) + + origins = updated_origins + origins.extend( + generate_listed_origin(lister_id, now=env.time) + for _ in range(NUM_NEW_ORIGINS) + ) + + env.scheduler.record_listed_origins(origins) + + yield env.timeout(3600) + + def load_task_process( env: Environment, task: Task, status_queue: Queue ) -> Iterator[Event]: """A loading task. This pushes OriginVisitStatus objects to the status_queue to simulate the visible outcomes of the task. Uses the `load_task_duration` function to determine its run time. """ # This is cheating; actual tasks access the state from the storage, not the # scheduler pk = task.origin, task.visit_type visit_stats = env.scheduler.origin_visit_stats_get([pk]) stats: Optional[OriginVisitStats] = visit_stats[0] if len(visit_stats) > 0 else None last_snapshot = stats.last_snapshot if stats else None status = OriginVisitStatus( origin=task.origin, visit=42, type=task.visit_type, status="created", date=env.time, snapshot=None, ) origin_model = OriginModel(task.visit_type, task.origin) (run_time, eventful, end_status) = origin_model.load_task_characteristics( env, stats ) logger.debug("%s task %s origin=%s: Start", env.time, task.visit_type, task.origin) yield status_queue.put(TaskEvent(task=task, status=status)) yield env.timeout(run_time) logger.debug("%s task %s origin=%s: End", env.time, task.visit_type, task.origin) new_snapshot = os.urandom(20) if eventful else last_snapshot yield status_queue.put( TaskEvent( task=task, status=attr.evolve( status, status=end_status, date=env.time, snapshot=new_snapshot ), eventful=eventful, ) ) env.report.record_visit(run_time, eventful, end_status)