diff --git a/swh/scheduler/simulator/__init__.py b/swh/scheduler/simulator/__init__.py index 1cde89f..ef7f7f4 100644 --- a/swh/scheduler/simulator/__init__.py +++ b/swh/scheduler/simulator/__init__.py @@ -1,167 +1,159 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This package runs the scheduler in a simulated environment, to evaluate various metrics. See :ref:`swh-scheduler-simulator`. This module orchestrates of the simulator by initializing processes and connecting them together; these processes are defined in modules in the package and simulate/call specific components.""" from datetime import datetime, timedelta, timezone import logging from typing import Dict, Generator, Optional from simpy import Event from swh.scheduler.interface import SchedulerInterface -from swh.scheduler.model import ListedOrigin from . import origin_scheduler, task_scheduler from .common import Environment, Queue, SimulationReport, Task -from .origins import load_task_process +from .origins import generate_listed_origin, load_task_process logger = logging.getLogger(__name__) def update_metrics_process( env: Environment, update_interval: int ) -> Generator[Event, None, None]: """Update the scheduler metrics every `update_interval` (simulated) seconds, and add them to the SimulationReport """ t0 = env.time while True: metrics = env.scheduler.update_metrics(timestamp=env.time) env.report.record_metrics(env.time, metrics) dt = env.time - t0 logger.info("time:%s visits:%s", dt, env.report.total_visits) yield env.timeout(update_interval) def worker_process( env: Environment, name: str, task_queue: Queue, status_queue: Queue ) -> Generator[Event, Task, None]: """A worker which consumes tasks from the input task_queue. Tasks themselves send OriginVisitStatus objects to the status_queue.""" logger.debug("%s worker %s: Start", env.time, name) while True: task = yield task_queue.get() logger.debug( "%s worker %s: Run task %s origin=%s", env.time, name, task.visit_type, task.origin, ) yield env.process(load_task_process(env, task, status_queue=status_queue)) def setup( env: Environment, scheduler_type: str, policy: Optional[str], workers_per_type: Dict[str, int], task_queue_capacity: int, min_batch_size: int, metrics_update_interval: int, ): task_queues = { visit_type: Queue(env, capacity=task_queue_capacity) for visit_type in workers_per_type } status_queue = Queue(env) if scheduler_type == "origin_scheduler": if policy is None: raise ValueError("origin_scheduler needs a scheduling policy") env.process( origin_scheduler.scheduler_runner_process( env, task_queues, policy, min_batch_size=min_batch_size ) ) env.process( origin_scheduler.scheduler_journal_client_process(env, status_queue) ) elif scheduler_type == "task_scheduler": if policy is not None: raise ValueError("task_scheduler doesn't support a scheduling policy") env.process( task_scheduler.scheduler_runner_process( env, task_queues, min_batch_size=min_batch_size ) ) env.process(task_scheduler.scheduler_listener_process(env, status_queue)) else: raise ValueError(f"Unknown scheduler type to simulate: {scheduler_type}") env.process(update_metrics_process(env, metrics_update_interval)) for visit_type, num_workers in workers_per_type.items(): task_queue = task_queues[visit_type] for i in range(num_workers): worker_name = f"worker-{visit_type}-{i}" env.process(worker_process(env, worker_name, task_queue, status_queue)) def fill_test_data(scheduler: SchedulerInterface, num_origins: int = 100000): """Fills the database with mock data to test the simulator.""" stored_lister = scheduler.get_or_create_lister(name="example") assert stored_lister.id is not None - origins = [ - ListedOrigin( - lister_id=stored_lister.id, - url=f"https://example.com/{i:04d}.git", - visit_type="git", - last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc), - ) - for i in range(num_origins) - ] + # Generate 'num_origins' new origins + origins = [generate_listed_origin(stored_lister.id) for _ in range(num_origins)] scheduler.record_listed_origins(origins) scheduler.create_tasks( [ { **origin.as_task_dict(), "policy": "recurring", "next_run": origin.last_update, "interval": timedelta(days=64), } for origin in origins ] ) def run( scheduler: SchedulerInterface, scheduler_type: str, policy: Optional[str], runtime: Optional[int], ): NUM_WORKERS = 48 start_time = datetime.now(tz=timezone.utc) env = Environment(start_time=start_time) env.scheduler = scheduler env.report = SimulationReport() setup( env, scheduler_type=scheduler_type, policy=policy, workers_per_type={"git": NUM_WORKERS}, task_queue_capacity=10000, min_batch_size=1000, metrics_update_interval=3600, ) try: env.run(until=runtime) except KeyboardInterrupt: pass finally: end_time = env.time print("total simulated time:", end_time - start_time) metrics = env.scheduler.update_metrics(timestamp=end_time) env.report.record_metrics(end_time, metrics) print(env.report.format()) diff --git a/swh/scheduler/simulator/origins.py b/swh/scheduler/simulator/origins.py index 3320790..7d1ae3c 100644 --- a/swh/scheduler/simulator/origins.py +++ b/swh/scheduler/simulator/origins.py @@ -1,130 +1,179 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This module implements a model of the frequency of updates of an origin and how long it takes to load it.""" -from datetime import timedelta +from datetime import datetime, timedelta, timezone import hashlib import logging import os from typing import Iterator, Optional, Tuple +import uuid import attr from simpy import Event from swh.model.model import OriginVisitStatus -from swh.scheduler.model import OriginVisitStats +from swh.scheduler.model import ListedOrigin, OriginVisitStats from .common import Environment, Queue, Task, TaskEvent logger = logging.getLogger(__name__) +_nb_generated_origins = 0 + + +def generate_listed_origin( + lister_id: uuid.UUID, now: Optional[datetime] = None +) -> ListedOrigin: + """Returns a globally unique new origin. Seed the `last_update` value + according to the OriginModel and the passed timestamp. + + Arguments: + lister: instance of the lister that generated this origin + now: time of listing, to emulate last_update (defaults to :func:`datetime.now`) + """ + global _nb_generated_origins + _nb_generated_origins += 1 + assert _nb_generated_origins < 10 ** 6, "Too many origins!" + + if now is None: + now = datetime.now(tz=timezone.utc) + + url = f"https://example.com/{_nb_generated_origins:6d}.git" + visit_type = "git" + origin = OriginModel(visit_type, url) + + return ListedOrigin( + lister_id=lister_id, + url=url, + visit_type=visit_type, + last_update=origin.get_last_update(now), + ) + + class OriginModel: MIN_RUN_TIME = 0.5 """Minimal run time for a visit (retrieved from production data)""" MAX_RUN_TIME = 7200 """Max run time for a visit""" PER_COMMIT_RUN_TIME = 0.1 """Run time per commit""" + EPOCH = datetime(2015, 9, 1, 0, 0, 0, tzinfo=timezone.utc) + """The origin of all origins (at least according to Software Heritage)""" + def __init__(self, type: str, origin: str): self.type = type self.origin = origin def seconds_between_commits(self): """Returns a random 'average time between two commits' of this origin, used to estimate the run time of a load task, and how much the loading architecture is lagging behind origin updates.""" n_bytes = 2 num_buckets = 2 ** (8 * n_bytes) # Deterministic seed to generate "random" characteristics of this origin bucket = int.from_bytes( hashlib.md5(self.origin.encode()).digest()[0:n_bytes], "little" ) # minimum: 1 second (bucket == 0) # max: 10 years (bucket == num_buckets - 1) ten_y = 10 * 365 * 24 * 3600 return ten_y ** (bucket / num_buckets) # return 1 + (ten_y - 1) * (bucket / (num_buckets - 1)) + def get_last_update(self, now: datetime): + """Get the last_update value for this origin. + + We assume that the origin had its first commit at `EPOCH`, and that one + commit happened every `self.seconds_between_commits()`. This returns + the last commit date before or equal to `now`. + """ + _, time_since_last_commit = divmod( + (now - self.EPOCH).total_seconds(), self.seconds_between_commits() + ) + + return now - timedelta(seconds=time_since_last_commit) + def load_task_characteristics( self, env: Environment, stats: Optional[OriginVisitStats] ) -> Tuple[float, bool, str]: """Returns the (run_time, eventfulness, end_status) of the next origin visit.""" if stats and stats.last_eventful: time_since_last_successful_run = env.time - stats.last_eventful else: time_since_last_successful_run = timedelta(days=365) seconds_between_commits = self.seconds_between_commits() logger.debug( "Interval between commits: %s", timedelta(seconds=seconds_between_commits) ) seconds_since_last_successful = time_since_last_successful_run.total_seconds() if seconds_since_last_successful < seconds_between_commits: # No commits since last visit => uneventful return (self.MIN_RUN_TIME, False, "full") else: n_commits = seconds_since_last_successful / seconds_between_commits run_time = self.MIN_RUN_TIME + self.PER_COMMIT_RUN_TIME * n_commits if run_time > self.MAX_RUN_TIME: return (self.MAX_RUN_TIME, False, "partial") else: return (run_time, True, "full") def load_task_process( env: Environment, task: Task, status_queue: Queue ) -> Iterator[Event]: """A loading task. This pushes OriginVisitStatus objects to the status_queue to simulate the visible outcomes of the task. Uses the `load_task_duration` function to determine its run time. """ # This is cheating; actual tasks access the state from the storage, not the # scheduler pk = task.origin, task.visit_type visit_stats = env.scheduler.origin_visit_stats_get([pk]) stats: Optional[OriginVisitStats] = visit_stats[0] if len(visit_stats) > 0 else None last_snapshot = stats.last_snapshot if stats else None status = OriginVisitStatus( origin=task.origin, visit=42, type=task.visit_type, status="created", date=env.time, snapshot=None, ) origin_model = OriginModel(task.visit_type, task.origin) (run_time, eventful, end_status) = origin_model.load_task_characteristics( env, stats ) logger.debug("%s task %s origin=%s: Start", env.time, task.visit_type, task.origin) yield status_queue.put(TaskEvent(task=task, status=status)) yield env.timeout(run_time) logger.debug("%s task %s origin=%s: End", env.time, task.visit_type, task.origin) new_snapshot = os.urandom(20) if eventful else last_snapshot yield status_queue.put( TaskEvent( task=task, status=attr.evolve( status, status=end_status, date=env.time, snapshot=new_snapshot ), eventful=eventful, ) ) env.report.record_visit(run_time, eventful, end_status)