diff --git a/swh/scheduler/simulator/__init__.py b/swh/scheduler/simulator/__init__.py --- a/swh/scheduler/simulator/__init__.py +++ b/swh/scheduler/simulator/__init__.py @@ -17,11 +17,10 @@ from simpy import Event from swh.scheduler.interface import SchedulerInterface -from swh.scheduler.model import ListedOrigin from . import origin_scheduler, task_scheduler from .common import Environment, Queue, SimulationReport, Task -from .origins import load_task_process +from .origins import generate_listed_origin, lister_process, load_task_process logger = logging.getLogger(__name__) @@ -101,21 +100,18 @@ worker_name = f"worker-{visit_type}-{i}" env.process(worker_process(env, worker_name, task_queue, status_queue)) + lister = env.scheduler.get_or_create_lister(name="example") + assert lister.id + env.process(lister_process(env, lister.id)) + def fill_test_data(scheduler: SchedulerInterface, num_origins: int = 100000): """Fills the database with mock data to test the simulator.""" stored_lister = scheduler.get_or_create_lister(name="example") assert stored_lister.id is not None - origins = [ - ListedOrigin( - lister_id=stored_lister.id, - url=f"https://example.com/{i:04d}.git", - visit_type="git", - last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc), - ) - for i in range(num_origins) - ] + # Generate 'num_origins' new origins + origins = [generate_listed_origin(stored_lister.id) for _ in range(num_origins)] scheduler.record_listed_origins(origins) scheduler.create_tasks( diff --git a/swh/scheduler/simulator/origins.py b/swh/scheduler/simulator/origins.py --- a/swh/scheduler/simulator/origins.py +++ b/swh/scheduler/simulator/origins.py @@ -6,23 +6,56 @@ """This module implements a model of the frequency of updates of an origin and how long it takes to load it.""" -from datetime import timedelta +from datetime import datetime, timedelta, timezone import hashlib import logging import os -from typing import Iterator, Optional, Tuple +from typing import Generator, Iterator, List, Optional, Tuple +import uuid import attr from simpy import Event from swh.model.model import OriginVisitStatus -from swh.scheduler.model import OriginVisitStats +from swh.scheduler.model import ListedOrigin, OriginVisitStats from .common import Environment, Queue, Task, TaskEvent logger = logging.getLogger(__name__) +_nb_generated_origins = 0 + + +def generate_listed_origin( + lister_id: uuid.UUID, now: Optional[datetime] = None +) -> ListedOrigin: + """Returns a globally unique new origin. Seed the `last_update` value + according to the OriginModel and the passed timestamp. + + Arguments: + lister: instance of the lister that generated this origin + now: time of listing, to emulate last_update (defaults to :func:`datetime.now`) + """ + global _nb_generated_origins + _nb_generated_origins += 1 + assert _nb_generated_origins < 10 ** 6, "Too many origins!" + + if now is None: + now = datetime.now(tz=timezone.utc) + + url = f"https://example.com/{_nb_generated_origins:6d}.git" + visit_type = "git" + origin = OriginModel(visit_type, url) + + return ListedOrigin( + lister_id=lister_id, + url=url, + visit_type=visit_type, + last_update=origin.get_last_update(now), + ) + + class OriginModel: MIN_RUN_TIME = 0.5 """Minimal run time for a visit (retrieved from production data)""" @@ -33,6 +66,9 @@ PER_COMMIT_RUN_TIME = 0.1 """Run time per commit""" + EPOCH = datetime(2015, 9, 1, 0, 0, 0, tzinfo=timezone.utc) + """The origin of all origins (at least according to Software Heritage)""" + def __init__(self, type: str, origin: str): self.type = type self.origin = origin @@ -56,6 +92,19 @@ return ten_y ** (bucket / num_buckets) # return 1 + (ten_y - 1) * (bucket / (num_buckets - 1)) + def get_last_update(self, now: datetime): + """Get the last_update value for this origin. + + We assume that the origin had its first commit at `EPOCH`, and that one + commit happened every `self.seconds_between_commits()`. This returns + the last commit date before or equal to `now`. + """ + _, time_since_last_commit = divmod( + (now - self.EPOCH).total_seconds(), self.seconds_between_commits() + ) + + return now - timedelta(seconds=time_since_last_commit) + def load_task_characteristics( self, env: Environment, stats: Optional[OriginVisitStats] ) -> Tuple[float, bool, str]: @@ -84,6 +133,34 @@ return (run_time, True, "full") +def lister_process( + env: Environment, lister_id: uuid.UUID +) -> Generator[Event, Event, None]: + """Every hour, generate new origins and update the `last_update` field for + existing ones.""" + NUM_NEW_ORIGINS = 100 + + origins: List[ListedOrigin] = [] + + while True: + updated_origins = [] + for origin in origins: + model = OriginModel(origin.visit_type, origin.url) + updated_origins.append( + attr.evolve(origin, last_update=model.get_last_update(env.time)) + ) + + origins = updated_origins + origins.extend( + generate_listed_origin(lister_id, now=env.time) + for _ in range(NUM_NEW_ORIGINS) + ) + + env.scheduler.record_listed_origins(origins) + + yield env.timeout(3600) + + def load_task_process( env: Environment, task: Task, status_queue: Queue ) -> Iterator[Event]: