diff --git a/swh/scheduler/simulator/common.py b/swh/scheduler/simulator/common.py index 7a26341..65f4b18 100644 --- a/swh/scheduler/simulator/common.py +++ b/swh/scheduler/simulator/common.py @@ -1,132 +1,146 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import dataclass, field from datetime import datetime, timedelta import textwrap -from typing import Dict, List, Tuple +from typing import Dict, List, Optional, Tuple import uuid import plotille from simpy import Environment as _Environment from simpy import Store from swh.model.model import OriginVisitStatus from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import SchedulerMetrics @dataclass class SimulationReport: DURATION_THRESHOLD = 3600 """Max duration for histograms""" total_visits: int = 0 """Total count of finished visits""" visit_runtimes: Dict[Tuple[str, bool], List[float]] = field(default_factory=dict) """Collected visit runtimes for each (status, eventful) tuple""" metrics: List[Tuple[datetime, List[SchedulerMetrics]]] = field(default_factory=list) """Collected scheduler metrics for every timestamp""" - def record_visit(self, duration: float, eventful: bool, status: str) -> None: + latest_snapshots: Dict[Tuple[str, str], bytes] = field(default_factory=dict) + """Collected latest snapshots for origins""" + + def record_visit( + self, + origin: Tuple[str, str], + duration: float, + status: str, + snapshot=Optional[bytes], + ) -> None: + eventful = False + if status == "full": + eventful = snapshot != self.latest_snapshots.get(origin) + self.latest_snapshots[origin] = snapshot + self.total_visits += 1 self.visit_runtimes.setdefault((status, eventful), []).append(duration) def record_metrics(self, timestamp: datetime, metrics: List[SchedulerMetrics]): self.metrics.append((timestamp, metrics)) @property def useless_visits(self): """Number of uneventful, full visits""" return len(self.visit_runtimes.get(("full", False), [])) def runtime_histogram(self, status: str, eventful: bool) -> str: runtimes = self.visit_runtimes.get((status, eventful), []) return plotille.hist( [runtime for runtime in runtimes if runtime <= self.DURATION_THRESHOLD] ) def metrics_plot(self) -> str: timestamps, metric_lists = zip(*self.metrics) known = [sum(m.origins_known for m in metrics) for metrics in metric_lists] never_visited = [ sum(m.origins_never_visited for m in metrics) for metrics in metric_lists ] figure = plotille.Figure() figure.x_label = "simulated time" figure.y_label = "origins" figure.scatter(timestamps, known, label="Known origins") figure.scatter(timestamps, never_visited, label="Origins never visited") return figure.show(legend=True) def format(self): full_visits = self.visit_runtimes.get(("full", True), []) histogram = self.runtime_histogram("full", True) plot = self.metrics_plot() long_tasks = sum(runtime > self.DURATION_THRESHOLD for runtime in full_visits) return ( textwrap.dedent( f"""\ Total visits: {self.total_visits} Useless visits: {self.useless_visits} Eventful visits: {len(full_visits)} Very long running tasks: {long_tasks} Visit time histogram for eventful visits: """ ) + histogram + "\n" + textwrap.dedent( """\ Metrics over time: """ ) + plot ) @dataclass class Task: visit_type: str origin: str backend_id: uuid.UUID = field(default_factory=uuid.uuid4) @dataclass class TaskEvent: task: Task status: OriginVisitStatus eventful: bool = field(default=False) class Queue(Store): """Model a queue of objects to be passed between processes.""" def __len__(self): return len(self.items or []) def slots_remaining(self): return self.capacity - len(self) class Environment(_Environment): report: SimulationReport scheduler: SchedulerInterface def __init__(self, start_time: datetime): if start_time.tzinfo is None: raise ValueError("start_time must have timezone information") self.start_time = start_time super().__init__() @property def time(self): """Get the current simulated wall clock time""" return self.start_time + timedelta(seconds=self.now) diff --git a/swh/scheduler/simulator/origins.py b/swh/scheduler/simulator/origins.py index b66a39c..a0ec081 100644 --- a/swh/scheduler/simulator/origins.py +++ b/swh/scheduler/simulator/origins.py @@ -1,214 +1,227 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This module implements a model of the frequency of updates of an origin and how long it takes to load it. For each origin, a commit frequency is chosen deterministically based on the hash of its URL and assume all origins were created on an arbitrary epoch. From this we compute a number of commits, that is the product of these two. And the run time of a load task is approximated as proportional to the number of commits since the previous visit of the origin (possibly 0).""" from datetime import datetime, timedelta, timezone import hashlib import logging -import os -from typing import Generator, Iterator, List, Optional, Tuple +from typing import Dict, Generator, Iterator, List, Optional, Tuple import uuid import attr from simpy import Event from swh.model.model import OriginVisitStatus -from swh.scheduler.model import ListedOrigin, OriginVisitStats +from swh.scheduler.model import ListedOrigin from .common import Environment, Queue, Task, TaskEvent logger = logging.getLogger(__name__) _nb_generated_origins = 0 +_visit_times: Dict[Tuple[str, str], datetime] = {} +"""Cache of the time of the last visit of (visit_type, origin_url), +to spare an SQL query (high latency).""" def generate_listed_origin( lister_id: uuid.UUID, now: Optional[datetime] = None ) -> ListedOrigin: """Returns a globally unique new origin. Seed the `last_update` value according to the OriginModel and the passed timestamp. Arguments: lister: instance of the lister that generated this origin now: time of listing, to emulate last_update (defaults to :func:`datetime.now`) """ global _nb_generated_origins _nb_generated_origins += 1 assert _nb_generated_origins < 10 ** 6, "Too many origins!" if now is None: now = datetime.now(tz=timezone.utc) - url = f"https://example.com/{_nb_generated_origins:6d}.git" + url = f"https://example.com/{_nb_generated_origins:06d}.git" visit_type = "git" origin = OriginModel(visit_type, url) return ListedOrigin( lister_id=lister_id, url=url, visit_type=visit_type, last_update=origin.get_last_update(now), ) class OriginModel: MIN_RUN_TIME = 0.5 """Minimal run time for a visit (retrieved from production data)""" MAX_RUN_TIME = 7200 """Max run time for a visit""" PER_COMMIT_RUN_TIME = 0.1 """Run time per commit""" EPOCH = datetime(2015, 9, 1, 0, 0, 0, tzinfo=timezone.utc) """The origin of all origins (at least according to Software Heritage)""" def __init__(self, type: str, origin: str): self.type = type self.origin = origin def seconds_between_commits(self): """Returns a random 'average time between two commits' of this origin, used to estimate the run time of a load task, and how much the loading architecture is lagging behind origin updates.""" n_bytes = 2 num_buckets = 2 ** (8 * n_bytes) # Deterministic seed to generate "random" characteristics of this origin bucket = int.from_bytes( hashlib.md5(self.origin.encode()).digest()[0:n_bytes], "little" ) # minimum: 1 second (bucket == 0) # max: 10 years (bucket == num_buckets - 1) ten_y = 10 * 365 * 24 * 3600 return ten_y ** (bucket / num_buckets) # return 1 + (ten_y - 1) * (bucket / (num_buckets - 1)) - def get_last_update(self, now: datetime): + def get_last_update(self, now: datetime) -> datetime: """Get the last_update value for this origin. We assume that the origin had its first commit at `EPOCH`, and that one commit happened every `self.seconds_between_commits()`. This returns the last commit date before or equal to `now`. """ _, time_since_last_commit = divmod( (now - self.EPOCH).total_seconds(), self.seconds_between_commits() ) return now - timedelta(seconds=time_since_last_commit) + def get_current_snapshot_id(self, now: datetime) -> bytes: + """Get the current snapshot for this origin. + + To generate a snapshot id, we calculate the number of commits since the + EPOCH, and hash it alongside the origin type and url. + """ + commits_since_epoch, _ = divmod( + (now - self.EPOCH).total_seconds(), self.seconds_between_commits() + ) + return hashlib.sha1( + f"{self.type} {self.origin} {commits_since_epoch}".encode() + ).digest() + def load_task_characteristics( - self, env: Environment, stats: Optional[OriginVisitStats] - ) -> Tuple[float, bool, str]: - """Returns the (run_time, eventfulness, end_status) of the next + self, now: datetime + ) -> Tuple[float, str, Optional[bytes]]: + """Returns the (run_time, end_status, snapshot id) of the next origin visit.""" - if stats and stats.last_eventful: - time_since_last_successful_run = env.time - stats.last_eventful - else: - time_since_last_successful_run = timedelta(days=365) + current_snapshot = self.get_current_snapshot_id(now) + + key = (self.type, self.origin) + last_visit = _visit_times.get(key, now - timedelta(days=365)) + time_since_last_successful_run = now - last_visit + + _visit_times[key] = now seconds_between_commits = self.seconds_between_commits() + seconds_since_last_successful = time_since_last_successful_run.total_seconds() + n_commits = int(seconds_since_last_successful / seconds_between_commits) logger.debug( - "Interval between commits: %s", timedelta(seconds=seconds_between_commits) + "%s characteristics %s origin=%s: Interval: %s, n_commits: %s", + now, + self.type, + self.origin, + timedelta(seconds=seconds_between_commits), + n_commits, ) - seconds_since_last_successful = time_since_last_successful_run.total_seconds() - if seconds_since_last_successful < seconds_between_commits: - # No commits since last visit => uneventful - return (self.MIN_RUN_TIME, False, "full") + run_time = self.MIN_RUN_TIME + self.PER_COMMIT_RUN_TIME * n_commits + if run_time > self.MAX_RUN_TIME: + # Long visits usually fail + return (self.MAX_RUN_TIME, "partial", None) else: - n_commits = seconds_since_last_successful / seconds_between_commits - run_time = self.MIN_RUN_TIME + self.PER_COMMIT_RUN_TIME * n_commits - if run_time > self.MAX_RUN_TIME: - return (self.MAX_RUN_TIME, False, "partial") - else: - return (run_time, True, "full") + return (run_time, "full", current_snapshot) def lister_process( env: Environment, lister_id: uuid.UUID ) -> Generator[Event, Event, None]: """Every hour, generate new origins and update the `last_update` field for the ones this process generated in the past""" NUM_NEW_ORIGINS = 100 origins: List[ListedOrigin] = [] while True: updated_origins = [] for origin in origins: model = OriginModel(origin.visit_type, origin.url) updated_origins.append( attr.evolve(origin, last_update=model.get_last_update(env.time)) ) origins = updated_origins origins.extend( generate_listed_origin(lister_id, now=env.time) for _ in range(NUM_NEW_ORIGINS) ) env.scheduler.record_listed_origins(origins) yield env.timeout(3600) def load_task_process( env: Environment, task: Task, status_queue: Queue ) -> Iterator[Event]: """A loading task. This pushes OriginVisitStatus objects to the status_queue to simulate the visible outcomes of the task. Uses the `load_task_duration` function to determine its run time. """ - # This is cheating; actual tasks access the state from the storage, not the - # scheduler - pk = task.origin, task.visit_type - visit_stats = env.scheduler.origin_visit_stats_get([pk]) - stats: Optional[OriginVisitStats] = visit_stats[0] if len(visit_stats) > 0 else None - last_snapshot = stats.last_snapshot if stats else None - status = OriginVisitStatus( origin=task.origin, visit=42, type=task.visit_type, status="created", date=env.time, snapshot=None, ) - origin_model = OriginModel(task.visit_type, task.origin) - (run_time, eventful, end_status) = origin_model.load_task_characteristics( - env, stats - ) logger.debug("%s task %s origin=%s: Start", env.time, task.visit_type, task.origin) yield status_queue.put(TaskEvent(task=task, status=status)) + + origin_model = OriginModel(task.visit_type, task.origin) + (run_time, end_status, snapshot) = origin_model.load_task_characteristics(env.time) yield env.timeout(run_time) + logger.debug("%s task %s origin=%s: End", env.time, task.visit_type, task.origin) - new_snapshot = os.urandom(20) if eventful else last_snapshot yield status_queue.put( TaskEvent( task=task, status=attr.evolve( - status, status=end_status, date=env.time, snapshot=new_snapshot + status, status=end_status, date=env.time, snapshot=snapshot ), - eventful=eventful, ) ) - env.report.record_visit(run_time, eventful, end_status) + env.report.record_visit( + (task.visit_type, task.origin), run_time, end_status, snapshot + )