diff --git a/swh/scheduler/simulator/__init__.py b/swh/scheduler/simulator/__init__.py index 6c30d2b..5e470c8 100644 --- a/swh/scheduler/simulator/__init__.py +++ b/swh/scheduler/simulator/__init__.py @@ -1,111 +1,123 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta, timezone import logging from typing import Dict, Generator, Optional from simpy import Event from swh.scheduler import get_scheduler from swh.scheduler.model import ListedOrigin +from . import origin_scheduler, task_scheduler from .common import Environment, Queue, SimulationReport, Task -from .origin_scheduler import scheduler_journal_client_process, scheduler_runner_process from .origins import load_task_process logger = logging.getLogger(__name__) def worker_process( env: Environment, name: str, task_queue: Queue, status_queue: Queue ) -> Generator[Event, Task, None]: """A worker which consumes tasks from the input task_queue. Tasks themselves send OriginVisitStatus objects to the status_queue.""" logger.debug("%s worker %s: Start", env.time, name) while True: task = yield task_queue.get() logger.debug( "%s worker %s: Run task %s origin=%s", env.time, name, task.visit_type, task.origin, ) yield env.process(load_task_process(env, task, status_queue=status_queue)) def setup( env: Environment, scheduler: str, - policy: str, + policy: Optional[str], workers_per_type: Dict[str, int], task_queue_capacity: int, ): # We expect PGHOST, PGPORT, ... set externally task_queues = { visit_type: Queue(env, capacity=task_queue_capacity) for visit_type in workers_per_type } status_queue = Queue(env) - env.process(scheduler_runner_process(env, task_queues, policy)) - env.process(scheduler_journal_client_process(env, status_queue)) + if scheduler == "origin_scheduler": + if policy is None: + raise ValueError("origin_scheduler needs a scheduling policy") + env.process(origin_scheduler.scheduler_runner_process(env, task_queues, policy)) + env.process( + origin_scheduler.scheduler_journal_client_process(env, status_queue) + ) + elif scheduler == "task_scheduler": + if policy is not None: + raise ValueError("task_scheduler doesn't support a scheduling policy") + env.process(task_scheduler.scheduler_runner_process(env, task_queues)) + env.process(task_scheduler.scheduler_listener_process(env, status_queue)) + else: + raise ValueError(f"Unknown scheduler: {scheduler}") for visit_type, num_workers in workers_per_type.items(): task_queue = task_queues[visit_type] for i in range(num_workers): worker_name = f"worker-{visit_type}-{i}" env.process(worker_process(env, worker_name, task_queue, status_queue)) def fill_test_data(): scheduler = get_scheduler(cls="local", db="") stored_lister = scheduler.get_or_create_lister(name="example") origins = [ ListedOrigin( lister_id=stored_lister.id, url=f"https://example.com/{i:04d}.git", visit_type="git", last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc), ) for i in range(100000) ] scheduler.record_listed_origins(origins) scheduler.create_tasks( [ { **origin.as_task_dict(), "policy": "recurring", "next_run": origin.last_update, "interval": timedelta(days=64), } for origin in origins ] ) -def run(scheduler: str, policy: str, runtime: Optional[int]): +def run(scheduler: str, policy: Optional[str], runtime: Optional[int]): NUM_WORKERS = 48 start_time = datetime.now(tz=timezone.utc) env = Environment(start_time=start_time) env.scheduler = get_scheduler(cls="local", db="") env.report = SimulationReport() setup( env, scheduler=scheduler, policy=policy, workers_per_type={"git": NUM_WORKERS}, task_queue_capacity=10000, ) try: env.run(until=runtime) except KeyboardInterrupt: pass finally: end_time = env.time print("total time:", end_time - start_time) print(env.report.format()) diff --git a/swh/scheduler/simulator/common.py b/swh/scheduler/simulator/common.py index 30c54f9..1f1e803 100644 --- a/swh/scheduler/simulator/common.py +++ b/swh/scheduler/simulator/common.py @@ -1,99 +1,102 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import dataclass, field from datetime import datetime, timedelta import textwrap from typing import Dict, List, Tuple +import uuid import plotille from simpy import Environment as _Environment from simpy import Store from swh.model.model import OriginVisitStatus from swh.scheduler.interface import SchedulerInterface @dataclass class SimulationReport: DURATION_THRESHOLD = 3600 """Max duration for histograms""" total_visits: int = 0 """Total count of finished visits""" visit_runtimes: Dict[Tuple[str, bool], List[float]] = field(default_factory=dict) """Collect the visit runtimes for each (status, eventful) tuple""" def record_visit(self, duration: float, eventful: bool, status: str) -> None: self.total_visits += 1 self.visit_runtimes.setdefault((status, eventful), []).append(duration) @property def useless_visits(self): """Number of uneventful, full visits""" return len(self.visit_runtimes.get(("full", False), [])) def runtime_histogram(self, status: str, eventful: bool) -> str: runtimes = self.visit_runtimes.get((status, eventful), []) return plotille.hist( [runtime for runtime in runtimes if runtime <= self.DURATION_THRESHOLD] ) def format(self): full_visits = self.visit_runtimes.get(("full", True), []) histogram = self.runtime_histogram("full", True) long_tasks = sum(runtime > self.DURATION_THRESHOLD for runtime in full_visits) return ( textwrap.dedent( f"""\ Total visits: {self.total_visits} Useless visits: {self.useless_visits} Eventful visits: {len(full_visits)} Very long running tasks: {long_tasks} Visit time histogram for eventful visits: """ ) + histogram ) @dataclass class Task: visit_type: str origin: str + backend_id: uuid.UUID = field(default_factory=uuid.uuid4) @dataclass class TaskEvent: task: Task status: OriginVisitStatus + eventful: bool = field(default=False) class Queue(Store): """Model a queue of objects to be passed between processes.""" def __len__(self): return len(self.items or []) def slots_remaining(self): return self.capacity - len(self) class Environment(_Environment): report: SimulationReport scheduler: SchedulerInterface def __init__(self, start_time: datetime): if start_time.tzinfo is None: raise ValueError("start_time must have timezone information") self.start_time = start_time super().__init__() @property def time(self): """Get the current simulated wall clock time""" return self.start_time + timedelta(seconds=self.now) diff --git a/swh/scheduler/simulator/origin_scheduler.py b/swh/scheduler/simulator/origin_scheduler.py index 20235b7..7af1263 100644 --- a/swh/scheduler/simulator/origin_scheduler.py +++ b/swh/scheduler/simulator/origin_scheduler.py @@ -1,70 +1,69 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Agents using the new origin-aware scheduler.""" import logging from typing import Any, Dict, Generator, Iterator, List from simpy import Event from swh.scheduler.journal_client import process_journal_objects -from . import Environment -from .common import Queue, Task, TaskEvent +from .common import Environment, Queue, Task, TaskEvent logger = logging.getLogger(__name__) def scheduler_runner_process( env: Environment, task_queues: Dict[str, Queue], policy: str, ) -> Iterator[Event]: """Scheduler runner. Grabs next visits from the database according to the scheduling policy, and fills the task_queues accordingly.""" while True: for visit_type, queue in task_queues.items(): min_batch_size = max(queue.capacity // 10, 1) remaining = queue.slots_remaining() if remaining < min_batch_size: continue next_origins = env.scheduler.grab_next_visits( visit_type, remaining, policy=policy ) logger.debug( "%s runner: running %s %s tasks", env.time, visit_type, len(next_origins), ) for origin in next_origins: yield queue.put(Task(visit_type=origin.visit_type, origin=origin.url)) yield env.timeout(10.0) def scheduler_journal_client_process( env: Environment, status_queue: Queue ) -> Generator[Event, TaskEvent, None]: """Scheduler journal client. Every once in a while, pulls `OriginVisitStatus`es from the status_queue to update the scheduler origin_visit_stats table.""" BATCH_SIZE = 100 statuses: List[Dict[str, Any]] = [] while True: task_event = yield status_queue.get() statuses.append(task_event.status.to_dict()) if len(statuses) < BATCH_SIZE: continue logger.debug( "%s journal client: processing %s statuses", env.time, len(statuses) ) process_journal_objects( {"origin_visit_status": statuses}, scheduler=env.scheduler ) statuses = [] diff --git a/swh/scheduler/simulator/origins.py b/swh/scheduler/simulator/origins.py index 4e630a2..f6b941b 100644 --- a/swh/scheduler/simulator/origins.py +++ b/swh/scheduler/simulator/origins.py @@ -1,118 +1,119 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta import hashlib import logging import os from typing import Iterator, Optional, Tuple import attr from simpy import Event from swh.model.model import OriginVisitStatus from swh.scheduler.model import OriginVisitStats from .common import Environment, Queue, Task, TaskEvent logger = logging.getLogger(__name__) class OriginModel: MIN_RUN_TIME = 0.5 """Minimal run time for a visit (retrieved from production data)""" MAX_RUN_TIME = 7200 """Max run time for a visit""" PER_COMMIT_RUN_TIME = 0.1 """Run time per commit""" def __init__(self, type: str, origin: str): self.type = type self.origin = origin def seconds_between_commits(self): n_bytes = 2 num_buckets = 2 ** (8 * n_bytes) bucket = int.from_bytes( hashlib.md5(self.origin.encode()).digest()[0:n_bytes], "little" ) # minimum: 1 second (bucket == 0) # max: 10 years (bucket == num_buckets - 1) ten_y = 10 * 365 * 24 * 3600 return ten_y ** (bucket / num_buckets) # return 1 + (ten_y - 1) * (bucket / (num_buckets - 1)) def load_task_characteristics( self, env: Environment, stats: Optional[OriginVisitStats] ) -> Tuple[float, bool, str]: """Returns the (run_time, eventfulness, end_status) of the next origin visit.""" if stats and stats.last_eventful: time_since_last_successful_run = env.time - stats.last_eventful else: time_since_last_successful_run = timedelta(days=365) seconds_between_commits = self.seconds_between_commits() logger.debug( "Interval between commits: %s", timedelta(seconds=seconds_between_commits) ) seconds_since_last_successful = time_since_last_successful_run.total_seconds() if seconds_since_last_successful < seconds_between_commits: # No commits since last visit => uneventful return (self.MIN_RUN_TIME, False, "full") else: n_commits = seconds_since_last_successful / seconds_between_commits run_time = self.MIN_RUN_TIME + self.PER_COMMIT_RUN_TIME * n_commits if run_time > self.MAX_RUN_TIME: return (self.MAX_RUN_TIME, False, "partial") else: return (run_time, True, "full") def load_task_process( env: Environment, task: Task, status_queue: Queue ) -> Iterator[Event]: """A loading task. This pushes OriginVisitStatus objects to the status_queue to simulate the visible outcomes of the task. Uses the `load_task_duration` function to determine its run time. """ # This is cheating; actual tasks access the state from the storage, not the # scheduler stats = env.scheduler.origin_visit_stats_get(task.origin, task.visit_type) last_snapshot = stats.last_snapshot if stats else None status = OriginVisitStatus( origin=task.origin, visit=42, type=task.visit_type, status="created", date=env.time, snapshot=None, ) origin_model = OriginModel(task.visit_type, task.origin) (run_time, eventful, end_status) = origin_model.load_task_characteristics( env, stats ) logger.debug("%s task %s origin=%s: Start", env.time, task.visit_type, task.origin) yield status_queue.put(TaskEvent(task=task, status=status)) yield env.timeout(run_time) logger.debug("%s task %s origin=%s: End", env.time, task.visit_type, task.origin) new_snapshot = os.urandom(20) if eventful else last_snapshot yield status_queue.put( TaskEvent( task=task, status=attr.evolve( status, status=end_status, date=env.time, snapshot=new_snapshot ), + eventful=eventful, ) ) env.report.record_visit(run_time, eventful, end_status) diff --git a/swh/scheduler/simulator/task_scheduler.py b/swh/scheduler/simulator/task_scheduler.py new file mode 100644 index 0000000..6289cd9 --- /dev/null +++ b/swh/scheduler/simulator/task_scheduler.py @@ -0,0 +1,77 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Agents using the "old" task-based scheduler.""" + +import logging +from typing import Dict, Generator, Iterator + +from simpy import Event + +from .common import Environment, Queue, Task, TaskEvent + +logger = logging.getLogger(__name__) + + +def scheduler_runner_process( + env: Environment, task_queues: Dict[str, Queue], +) -> Iterator[Event]: + """Scheduler runner. Grabs next visits from the database according to the + scheduling policy, and fills the task_queues accordingly.""" + + while True: + for visit_type, queue in task_queues.items(): + min_batch_size = max(queue.capacity // 10, 1) + remaining = queue.slots_remaining() + if remaining < min_batch_size: + continue + next_tasks = env.scheduler.grab_ready_tasks( + f"load-{visit_type}", num_tasks=remaining, timestamp=env.time + ) + logger.debug( + "%s runner: running %s %s tasks", env.time, visit_type, len(next_tasks), + ) + + sim_tasks = [ + Task(visit_type=visit_type, origin=task["arguments"]["kwargs"]["url"]) + for task in next_tasks + ] + + env.scheduler.mass_schedule_task_runs( + [ + { + "task": task["id"], + "scheduled": env.time, + "backend_id": str(sim_task.backend_id), + } + for task, sim_task in zip(next_tasks, sim_tasks) + ] + ) + + for sim_task in sim_tasks: + yield queue.put(sim_task) + + yield env.timeout(10.0) + + +def scheduler_listener_process( + env: Environment, status_queue: Queue +) -> Generator[Event, TaskEvent, None]: + """Scheduler listener. In the real world this would listen to celery + events, but we listen to the status_queue and simulate celery events from + that.""" + while True: + event = yield status_queue.get() + if event.status.status == "ongoing": + env.scheduler.start_task_run(event.task.backend_id, timestamp=env.time) + else: + if event.status.status == "full": + status = "eventful" if event.eventful else "uneventful" + else: + status = "failed" + + env.scheduler.end_task_run( + str(event.task.backend_id), status=status, timestamp=env.time + )