Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/simulator/origins.py
Show All 10 Lines | |||||
From this we compute a number of commits, that is the product of these two. | From this we compute a number of commits, that is the product of these two. | ||||
And the run time of a load task is approximated as proportional to the number | And the run time of a load task is approximated as proportional to the number | ||||
of commits since the previous visit of the origin (possibly 0).""" | of commits since the previous visit of the origin (possibly 0).""" | ||||
from datetime import datetime, timedelta, timezone | from datetime import datetime, timedelta, timezone | ||||
import hashlib | import hashlib | ||||
import logging | import logging | ||||
import os | from typing import Dict, Generator, Iterator, List, Optional, Tuple | ||||
from typing import Generator, Iterator, List, Optional, Tuple | |||||
import uuid | import uuid | ||||
import attr | import attr | ||||
from simpy import Event | from simpy import Event | ||||
from swh.model.model import OriginVisitStatus | from swh.model.model import OriginVisitStatus | ||||
from swh.scheduler.model import ListedOrigin, OriginVisitStats | from swh.scheduler.model import ListedOrigin | ||||
from .common import Environment, Queue, Task, TaskEvent | from .common import Environment, Queue, Task, TaskEvent | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
_nb_generated_origins = 0 | _nb_generated_origins = 0 | ||||
douardda: what is this global var for? | |||||
_visit_times: Dict[Tuple[str, str], datetime] = {} | |||||
"""Cache of the time of the last visit of (visit_type, origin_url), | |||||
to spare an SQL query (high latency).""" | |||||
def generate_listed_origin( | def generate_listed_origin( | ||||
lister_id: uuid.UUID, now: Optional[datetime] = None | lister_id: uuid.UUID, now: Optional[datetime] = None | ||||
) -> ListedOrigin: | ) -> ListedOrigin: | ||||
"""Returns a globally unique new origin. Seed the `last_update` value | """Returns a globally unique new origin. Seed the `last_update` value | ||||
according to the OriginModel and the passed timestamp. | according to the OriginModel and the passed timestamp. | ||||
Arguments: | Arguments: | ||||
lister: instance of the lister that generated this origin | lister: instance of the lister that generated this origin | ||||
now: time of listing, to emulate last_update (defaults to :func:`datetime.now`) | now: time of listing, to emulate last_update (defaults to :func:`datetime.now`) | ||||
""" | """ | ||||
global _nb_generated_origins | global _nb_generated_origins | ||||
_nb_generated_origins += 1 | _nb_generated_origins += 1 | ||||
assert _nb_generated_origins < 10 ** 6, "Too many origins!" | assert _nb_generated_origins < 10 ** 6, "Too many origins!" | ||||
if now is None: | if now is None: | ||||
now = datetime.now(tz=timezone.utc) | now = datetime.now(tz=timezone.utc) | ||||
url = f"https://example.com/{_nb_generated_origins:6d}.git" | url = f"https://example.com/{_nb_generated_origins:06d}.git" | ||||
visit_type = "git" | visit_type = "git" | ||||
origin = OriginModel(visit_type, url) | origin = OriginModel(visit_type, url) | ||||
return ListedOrigin( | return ListedOrigin( | ||||
lister_id=lister_id, | lister_id=lister_id, | ||||
url=url, | url=url, | ||||
visit_type=visit_type, | visit_type=visit_type, | ||||
last_update=origin.get_last_update(now), | last_update=origin.get_last_update(now), | ||||
Show All 31 Lines | def seconds_between_commits(self): | ||||
# minimum: 1 second (bucket == 0) | # minimum: 1 second (bucket == 0) | ||||
# max: 10 years (bucket == num_buckets - 1) | # max: 10 years (bucket == num_buckets - 1) | ||||
ten_y = 10 * 365 * 24 * 3600 | ten_y = 10 * 365 * 24 * 3600 | ||||
return ten_y ** (bucket / num_buckets) | return ten_y ** (bucket / num_buckets) | ||||
# return 1 + (ten_y - 1) * (bucket / (num_buckets - 1)) | # return 1 + (ten_y - 1) * (bucket / (num_buckets - 1)) | ||||
def get_last_update(self, now: datetime): | def get_last_update(self, now: datetime) -> datetime: | ||||
"""Get the last_update value for this origin. | """Get the last_update value for this origin. | ||||
We assume that the origin had its first commit at `EPOCH`, and that one | We assume that the origin had its first commit at `EPOCH`, and that one | ||||
commit happened every `self.seconds_between_commits()`. This returns | commit happened every `self.seconds_between_commits()`. This returns | ||||
the last commit date before or equal to `now`. | the last commit date before or equal to `now`. | ||||
""" | """ | ||||
_, time_since_last_commit = divmod( | _, time_since_last_commit = divmod( | ||||
(now - self.EPOCH).total_seconds(), self.seconds_between_commits() | (now - self.EPOCH).total_seconds(), self.seconds_between_commits() | ||||
) | ) | ||||
return now - timedelta(seconds=time_since_last_commit) | return now - timedelta(seconds=time_since_last_commit) | ||||
Done Inline Actionsshouldn't it be get_current_snapshot_id instead? to me, the name suggest it returns a Snapshot object. douardda: shouldn't it be `get_current_snapshot_id` instead? to me, the name suggest it returns a… | |||||
def get_current_snapshot_id(self, now: datetime) -> bytes: | |||||
"""Get the current snapshot for this origin. | |||||
To generate a snapshot id, we calculate the number of commits since the | |||||
EPOCH, and hash it alongside the origin type and url. | |||||
""" | |||||
commits_since_epoch, _ = divmod( | |||||
(now - self.EPOCH).total_seconds(), self.seconds_between_commits() | |||||
) | |||||
return hashlib.sha1( | |||||
f"{self.type} {self.origin} {commits_since_epoch}".encode() | |||||
).digest() | |||||
def load_task_characteristics( | def load_task_characteristics( | ||||
self, env: Environment, stats: Optional[OriginVisitStats] | self, now: datetime | ||||
) -> Tuple[float, bool, str]: | ) -> Tuple[float, str, Optional[bytes]]: | ||||
"""Returns the (run_time, eventfulness, end_status) of the next | """Returns the (run_time, end_status, snapshot id) of the next | ||||
origin visit.""" | origin visit.""" | ||||
if stats and stats.last_eventful: | current_snapshot = self.get_current_snapshot_id(now) | ||||
time_since_last_successful_run = env.time - stats.last_eventful | |||||
else: | key = (self.type, self.origin) | ||||
time_since_last_successful_run = timedelta(days=365) | last_visit = _visit_times.get(key, now - timedelta(days=365)) | ||||
time_since_last_successful_run = now - last_visit | |||||
_visit_times[key] = now | |||||
seconds_between_commits = self.seconds_between_commits() | seconds_between_commits = self.seconds_between_commits() | ||||
seconds_since_last_successful = time_since_last_successful_run.total_seconds() | |||||
n_commits = int(seconds_since_last_successful / seconds_between_commits) | |||||
logger.debug( | logger.debug( | ||||
"Interval between commits: %s", timedelta(seconds=seconds_between_commits) | "%s characteristics %s origin=%s: Interval: %s, n_commits: %s", | ||||
now, | |||||
self.type, | |||||
self.origin, | |||||
timedelta(seconds=seconds_between_commits), | |||||
n_commits, | |||||
) | ) | ||||
seconds_since_last_successful = time_since_last_successful_run.total_seconds() | |||||
if seconds_since_last_successful < seconds_between_commits: | |||||
# No commits since last visit => uneventful | |||||
return (self.MIN_RUN_TIME, False, "full") | |||||
else: | |||||
n_commits = seconds_since_last_successful / seconds_between_commits | |||||
run_time = self.MIN_RUN_TIME + self.PER_COMMIT_RUN_TIME * n_commits | run_time = self.MIN_RUN_TIME + self.PER_COMMIT_RUN_TIME * n_commits | ||||
if run_time > self.MAX_RUN_TIME: | if run_time > self.MAX_RUN_TIME: | ||||
return (self.MAX_RUN_TIME, False, "partial") | # Long visits usually fail | ||||
return (self.MAX_RUN_TIME, "partial", None) | |||||
else: | else: | ||||
return (run_time, True, "full") | return (run_time, "full", current_snapshot) | ||||
def lister_process( | def lister_process( | ||||
env: Environment, lister_id: uuid.UUID | env: Environment, lister_id: uuid.UUID | ||||
) -> Generator[Event, Event, None]: | ) -> Generator[Event, Event, None]: | ||||
"""Every hour, generate new origins and update the `last_update` field for | """Every hour, generate new origins and update the `last_update` field for | ||||
the ones this process generated in the past""" | the ones this process generated in the past""" | ||||
NUM_NEW_ORIGINS = 100 | NUM_NEW_ORIGINS = 100 | ||||
Show All 22 Lines | |||||
def load_task_process( | def load_task_process( | ||||
env: Environment, task: Task, status_queue: Queue | env: Environment, task: Task, status_queue: Queue | ||||
) -> Iterator[Event]: | ) -> Iterator[Event]: | ||||
"""A loading task. This pushes OriginVisitStatus objects to the | """A loading task. This pushes OriginVisitStatus objects to the | ||||
status_queue to simulate the visible outcomes of the task. | status_queue to simulate the visible outcomes of the task. | ||||
Uses the `load_task_duration` function to determine its run time. | Uses the `load_task_duration` function to determine its run time. | ||||
""" | """ | ||||
# This is cheating; actual tasks access the state from the storage, not the | |||||
# scheduler | |||||
pk = task.origin, task.visit_type | |||||
visit_stats = env.scheduler.origin_visit_stats_get([pk]) | |||||
stats: Optional[OriginVisitStats] = visit_stats[0] if len(visit_stats) > 0 else None | |||||
last_snapshot = stats.last_snapshot if stats else None | |||||
status = OriginVisitStatus( | status = OriginVisitStatus( | ||||
origin=task.origin, | origin=task.origin, | ||||
visit=42, | visit=42, | ||||
type=task.visit_type, | type=task.visit_type, | ||||
status="created", | status="created", | ||||
date=env.time, | date=env.time, | ||||
snapshot=None, | snapshot=None, | ||||
) | ) | ||||
origin_model = OriginModel(task.visit_type, task.origin) | |||||
(run_time, eventful, end_status) = origin_model.load_task_characteristics( | |||||
env, stats | |||||
) | |||||
logger.debug("%s task %s origin=%s: Start", env.time, task.visit_type, task.origin) | logger.debug("%s task %s origin=%s: Start", env.time, task.visit_type, task.origin) | ||||
yield status_queue.put(TaskEvent(task=task, status=status)) | yield status_queue.put(TaskEvent(task=task, status=status)) | ||||
origin_model = OriginModel(task.visit_type, task.origin) | |||||
(run_time, end_status, snapshot) = origin_model.load_task_characteristics(env.time) | |||||
yield env.timeout(run_time) | yield env.timeout(run_time) | ||||
logger.debug("%s task %s origin=%s: End", env.time, task.visit_type, task.origin) | logger.debug("%s task %s origin=%s: End", env.time, task.visit_type, task.origin) | ||||
new_snapshot = os.urandom(20) if eventful else last_snapshot | |||||
yield status_queue.put( | yield status_queue.put( | ||||
TaskEvent( | TaskEvent( | ||||
task=task, | task=task, | ||||
status=attr.evolve( | status=attr.evolve( | ||||
status, status=end_status, date=env.time, snapshot=new_snapshot | status, status=end_status, date=env.time, snapshot=snapshot | ||||
), | ), | ||||
eventful=eventful, | |||||
) | ) | ||||
) | ) | ||||
env.report.record_visit(run_time, eventful, end_status) | env.report.record_visit( | ||||
(task.visit_type, task.origin), run_time, end_status, snapshot | |||||
) |
what is this global var for?