diff --git a/swh/scheduler/simulator/common.py b/swh/scheduler/simulator/common.py --- a/swh/scheduler/simulator/common.py +++ b/swh/scheduler/simulator/common.py @@ -6,7 +6,7 @@ from dataclasses import dataclass, field from datetime import datetime, timedelta import textwrap -from typing import Dict, List, Tuple +from typing import Dict, List, Optional, Tuple import uuid import plotille @@ -32,7 +32,21 @@ metrics: List[Tuple[datetime, List[SchedulerMetrics]]] = field(default_factory=list) """Collected scheduler metrics for every timestamp""" - def record_visit(self, duration: float, eventful: bool, status: str) -> None: + latest_snapshots: Dict[Tuple[str, str], bytes] = field(default_factory=dict) + """Collected latest snapshots for origins""" + + def record_visit( + self, + origin: Tuple[str, str], + duration: float, + status: str, + snapshot=Optional[bytes], + ) -> None: + eventful = False + if status == "full": + eventful = snapshot != self.latest_snapshots.get(origin) + self.latest_snapshots[origin] = snapshot + self.total_visits += 1 self.visit_runtimes.setdefault((status, eventful), []).append(duration) diff --git a/swh/scheduler/simulator/origins.py b/swh/scheduler/simulator/origins.py --- a/swh/scheduler/simulator/origins.py +++ b/swh/scheduler/simulator/origins.py @@ -16,15 +16,14 @@ from datetime import datetime, timedelta, timezone import hashlib import logging -import os -from typing import Generator, Iterator, List, Optional, Tuple +from typing import Dict, Generator, Iterator, List, Optional, Tuple import uuid import attr from simpy import Event from swh.model.model import OriginVisitStatus -from swh.scheduler.model import ListedOrigin, OriginVisitStats +from swh.scheduler.model import ListedOrigin from .common import Environment, Queue, Task, TaskEvent @@ -32,6 +31,7 @@ _nb_generated_origins = 0 +_visit_times: Dict[Tuple[str, str], datetime] = {} def generate_listed_origin( @@ -51,7 +51,7 @@ if now is None: now = datetime.now(tz=timezone.utc) - url = f"https://example.com/{_nb_generated_origins:6d}.git" + url = f"https://example.com/{_nb_generated_origins:06d}.git" visit_type = "git" origin = OriginModel(visit_type, url) @@ -99,7 +99,7 @@ return ten_y ** (bucket / num_buckets) # return 1 + (ten_y - 1) * (bucket / (num_buckets - 1)) - def get_last_update(self, now: datetime): + def get_last_update(self, now: datetime) -> datetime: """Get the last_update value for this origin. We assume that the origin had its first commit at `EPOCH`, and that one @@ -112,32 +112,50 @@ return now - timedelta(seconds=time_since_last_commit) + def get_current_snapshot(self, now: datetime) -> bytes: + """Get the current snapshot for this origin. + + To generate a snapshot id, we calculate the number of commits since the + EPOCH, and hash it alongside the origin type and url. + """ + commits_since_epoch, _ = divmod( + (now - self.EPOCH).total_seconds(), self.seconds_between_commits() + ) + return hashlib.sha1( + f"{self.type} {self.origin} {commits_since_epoch}".encode() + ).digest() + def load_task_characteristics( - self, env: Environment, stats: Optional[OriginVisitStats] - ) -> Tuple[float, bool, str]: - """Returns the (run_time, eventfulness, end_status) of the next + self, now: datetime + ) -> Tuple[float, str, Optional[bytes]]: + """Returns the (run_time, end_status, snapshot id) of the next origin visit.""" - if stats and stats.last_eventful: - time_since_last_successful_run = env.time - stats.last_eventful - else: - time_since_last_successful_run = timedelta(days=365) + current_snapshot = self.get_current_snapshot(now) + + key = (self.type, self.origin) + last_visit = _visit_times.get(key, now - timedelta(days=365)) + time_since_last_successful_run = now - last_visit + + _visit_times[key] = now seconds_between_commits = self.seconds_between_commits() + seconds_since_last_successful = time_since_last_successful_run.total_seconds() + n_commits = int(seconds_since_last_successful / seconds_between_commits) logger.debug( - "Interval between commits: %s", timedelta(seconds=seconds_between_commits) + "%s characteristics %s origin=%s: Interval: %s, n_commits: %s", + now, + self.type, + self.origin, + timedelta(seconds=seconds_between_commits), + n_commits, ) - seconds_since_last_successful = time_since_last_successful_run.total_seconds() - if seconds_since_last_successful < seconds_between_commits: - # No commits since last visit => uneventful - return (self.MIN_RUN_TIME, False, "full") + run_time = self.MIN_RUN_TIME + self.PER_COMMIT_RUN_TIME * n_commits + if run_time > self.MAX_RUN_TIME: + # Long visits usually fail + return (self.MAX_RUN_TIME, "partial", None) else: - n_commits = seconds_since_last_successful / seconds_between_commits - run_time = self.MIN_RUN_TIME + self.PER_COMMIT_RUN_TIME * n_commits - if run_time > self.MAX_RUN_TIME: - return (self.MAX_RUN_TIME, False, "partial") - else: - return (run_time, True, "full") + return (run_time, "full", current_snapshot) def lister_process( @@ -176,13 +194,6 @@ Uses the `load_task_duration` function to determine its run time. """ - # This is cheating; actual tasks access the state from the storage, not the - # scheduler - pk = task.origin, task.visit_type - visit_stats = env.scheduler.origin_visit_stats_get([pk]) - stats: Optional[OriginVisitStats] = visit_stats[0] if len(visit_stats) > 0 else None - last_snapshot = stats.last_snapshot if stats else None - status = OriginVisitStatus( origin=task.origin, visit=42, @@ -191,24 +202,24 @@ date=env.time, snapshot=None, ) - origin_model = OriginModel(task.visit_type, task.origin) - (run_time, eventful, end_status) = origin_model.load_task_characteristics( - env, stats - ) logger.debug("%s task %s origin=%s: Start", env.time, task.visit_type, task.origin) yield status_queue.put(TaskEvent(task=task, status=status)) + + origin_model = OriginModel(task.visit_type, task.origin) + (run_time, end_status, snapshot) = origin_model.load_task_characteristics(env.time) yield env.timeout(run_time) + logger.debug("%s task %s origin=%s: End", env.time, task.visit_type, task.origin) - new_snapshot = os.urandom(20) if eventful else last_snapshot yield status_queue.put( TaskEvent( task=task, status=attr.evolve( - status, status=end_status, date=env.time, snapshot=new_snapshot + status, status=end_status, date=env.time, snapshot=snapshot ), - eventful=eventful, ) ) - env.report.record_visit(run_time, eventful, end_status) + env.report.record_visit( + (task.visit_type, task.origin), run_time, end_status, snapshot + )