diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,6 +15,7 @@ rev: v1.16.0 hooks: - id: codespell + args: [-L simpy, -L hist] - repo: local hooks: diff --git a/docs/index.rst b/docs/index.rst --- a/docs/index.rst +++ b/docs/index.rst @@ -166,4 +166,5 @@ :maxdepth: 2 cli + simulator /apidoc/swh.scheduler diff --git a/docs/simulator.rst b/docs/simulator.rst new file mode 100644 --- /dev/null +++ b/docs/simulator.rst @@ -0,0 +1,65 @@ +.. _swh-scheduler-simulator: + +Software Heritage Scheduler Simulator +===================================== + +This component simulates the interaction between the scheduling and loading +infrastructure of Software Heritage. This allows quick(er) development of new +task scheduling policies without having to wait for the actual infrastructure +to perform (heavy) loading tasks. + +Simulator components +-------------------- + +- real instance of the scheduler database +- simulated task queues: replaces RabbitMQ with simple in-memory structures +- simulated workers: replaces Celery with simple while loops +- simulated load tasks: replaces loaders with noops that take a certain time, + and generate synthetic OriginVisitStatus objects +- simulated archive -> scheduler feedback loop: OriginVisitStatus objects are + pushed to a simple queue which gets processed by the scheduler journal + client's process function directly (instead of going through swh.storage and + swh.journal (kafka)) + +In short, only the scheduler database and scheduler logic is kept; every other +component (RabbitMQ, Celery, Kafka, SWH loaders, SWH storage) is either replaced +with an barebones in-process utility, or removed entirely. + +Installing the simulator +------------------------ + +The simulator depends on SimPy and other specific libraries. To install them, +please use: + +.. code-block:: bash + + pip install 'swh.scheduler[simulator]' + +Running the simulator +--------------------- + +The simulator uses a real instance of the scheduler database, which is (at +least for now) persistent across runs of the simulator. You need to set that up +beforehand: + +.. code-block:: bash + + # if you want to use a temporary instance of postgresql + eval `pifpaf run postgresql` + + # Set this variable for the simulator to know which db to connect to. pifpaf + # sets other variables like PGPORT, PGHOST, ... + export PGDATABASE=swh-scheduler + + # Create/initialize the scheduler database + swh db create scheduler -d $PGDATABASE + swh db init scheduler -d $PGDATABASE + + # This generates some data in the scheduler database. You can also feed the + # database with more realistic data, e.g. from a lister or from a dump of the + # production database. + swh scheduler simulator fill-test-data + + # Run the simulator itself, interacting with the scheduler database you've + # just seeded. + swh scheduler simulator run --scheduler origin_scheduler diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -26,6 +26,9 @@ [mypy-pika.*] ignore_missing_imports = True +[mypy-plotille.*] +ignore_missing_imports = True + [mypy-pkg_resources.*] ignore_missing_imports = True @@ -37,3 +40,6 @@ [mypy-pytest_postgresql.*] ignore_missing_imports = True + +[mypy-simpy.*] +ignore_missing_imports = True diff --git a/requirements-simulator.txt b/requirements-simulator.txt new file mode 100644 --- /dev/null +++ b/requirements-simulator.txt @@ -0,0 +1,2 @@ +plotille +simpy>=3,<4 diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -16,22 +16,23 @@ long_description = f.read() -def parse_requirements(name=None): - if name: - reqf = "requirements-%s.txt" % name - else: - reqf = "requirements.txt" - +def parse_requirements(*names): requirements = [] - if not path.exists(reqf): - return requirements + for name in names: + if name: + reqf = "requirements-%s.txt" % name + else: + reqf = "requirements.txt" + + if not path.exists(reqf): + return requirements - with open(reqf) as f: - for line in f.readlines(): - line = line.strip() - if not line or line.startswith("#"): - continue - requirements.append(line) + with open(reqf) as f: + for line in f.readlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + requirements.append(line) return requirements @@ -47,10 +48,11 @@ packages=find_packages(), setup_requires=["setuptools-scm"], use_scm_version=True, - install_requires=parse_requirements() + parse_requirements("swh"), + install_requires=parse_requirements(None, "swh"), extras_require={ - "testing": parse_requirements("test") + parse_requirements("journal"), + "testing": parse_requirements("test", "journal", "simulator"), "journal": parse_requirements("journal"), + "simulator": parse_requirements("simulator"), }, include_package_data=True, entry_points=""" diff --git a/sql/updates/24.sql b/sql/updates/24.sql new file mode 100644 --- /dev/null +++ b/sql/updates/24.sql @@ -0,0 +1,56 @@ +insert into dbversion (version, release, description) + values (24, now(), 'Work In Progress'); + +create table scheduler_metrics ( + lister_id uuid not null references listers(id), + visit_type text not null, + last_update timestamptz not null, + origins_known int not null default 0, + origins_enabled int not null default 0, + origins_never_visited int not null default 0, + origins_with_pending_changes int not null default 0, + + primary key (lister_id, visit_type) +); + +comment on table scheduler_metrics is 'Cache of per-lister metrics for the scheduler, collated between the listed_origins and origin_visit_stats tables.'; +comment on column scheduler_metrics.lister_id is 'Lister instance on which metrics have been aggregated'; +comment on column scheduler_metrics.visit_type is 'Visit type on which metrics have been aggregated'; +comment on column scheduler_metrics.last_update is 'Last update of these metrics'; +comment on column scheduler_metrics.origins_known is 'Number of known (enabled or disabled) origins'; +comment on column scheduler_metrics.origins_enabled is 'Number of origins that were present in the latest listing'; +comment on column scheduler_metrics.origins_never_visited is 'Number of origins that have never been visited'; +comment on column scheduler_metrics.origins_with_pending_changes is 'Number of enabled origins with known activity since our last visit'; + + +create or replace function update_metrics(lister_id uuid default NULL, ts timestamptz default now()) + returns setof scheduler_metrics + language sql +as $$ + insert into scheduler_metrics ( + lister_id, visit_type, last_update, + origins_known, origins_enabled, + origins_never_visited, origins_with_pending_changes + ) + select + lo.lister_id, lo.visit_type, coalesce(ts, now()) as last_update, + count(*) as origins_known, + count(*) filter (where enabled) as origins_enabled, + count(*) filter (where + enabled and last_snapshot is NULL + ) as origins_never_visited, + count(*) filter (where + enabled and lo.last_update > greatest(ovs.last_eventful, ovs.last_uneventful) + ) as origins_with_pending_changes + from listed_origins lo + left join origin_visit_stats ovs using (url, visit_type) + where + -- update only for the requested lister + update_metrics.lister_id = lo.lister_id + -- or for all listers if the function argument is null + or update_metrics.lister_id is null + group by (lister_id, visit_type) + returning * +$$; + +comment on function update_metrics(uuid, timestamptz) is 'Update metrics for the given lister_id'; diff --git a/swh/scheduler/backend.py b/swh/scheduler/backend.py --- a/swh/scheduler/backend.py +++ b/swh/scheduler/backend.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime import json import logging from typing import Any, Dict, Iterable, List, Optional, Tuple, Union @@ -24,6 +25,7 @@ Lister, OriginVisitStats, PaginatedListedOriginList, + SchedulerMetrics, ) logger = logging.getLogger(__name__) @@ -833,3 +835,63 @@ return OriginVisitStats(**row) else: return None + + @db_transaction() + def update_metrics( + self, + lister_id: Optional[UUID] = None, + timestamp: Optional[datetime.datetime] = None, + db=None, + cur=None, + ) -> List[SchedulerMetrics]: + """Update the performance metrics of this scheduler instance. + + Returns the updated metrics. + + Args: + lister_id: if passed, update the metrics only for this lister instance + timestamp: if passed, the date at which we're updating the metrics, + defaults to the database NOW() + """ + query = format_query( + "SELECT {keys} FROM update_metrics(%s, %s)", + SchedulerMetrics.select_columns(), + ) + cur.execute(query, (lister_id, timestamp)) + return [SchedulerMetrics(**row) for row in cur.fetchall()] + + @db_transaction() + def get_metrics( + self, + lister_id: Optional[UUID] = None, + visit_type: Optional[str] = None, + db=None, + cur=None, + ) -> List[SchedulerMetrics]: + """Retrieve the performance metrics of this scheduler instance. + + Args: + lister_id: filter the metrics for this lister instance only + visit_type: filter the metrics for this visit type only + """ + + where_filters = [] + where_args = [] + if lister_id: + where_filters.append("lister_id = %s") + where_args.append(str(lister_id)) + if visit_type: + where_filters.append("visit_type = %s") + where_args.append(visit_type) + + where_clause = "" + if where_filters: + where_clause = f"where {' and '.join(where_filters)}" + + query = format_query( + "SELECT {keys} FROM scheduler_metrics %s" % where_clause, + SchedulerMetrics.select_columns(), + ) + + cur.execute(query, tuple(where_args)) + return [SchedulerMetrics(**row) for row in cur.fetchall()] diff --git a/swh/scheduler/cli/__init__.py b/swh/scheduler/cli/__init__.py --- a/swh/scheduler/cli/__init__.py +++ b/swh/scheduler/cli/__init__.py @@ -82,7 +82,7 @@ ctx.obj["config"] = conf -from . import admin, celery_monitor, origin, task, task_type # noqa +from . import admin, celery_monitor, journal, origin, simulator, task, task_type # noqa def main(): diff --git a/swh/scheduler/cli/simulator.py b/swh/scheduler/cli/simulator.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/cli/simulator.py @@ -0,0 +1,61 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import time + +import click + +from . import cli + + +@cli.group("simulator") +def simulator(): + """Scheduler simulator.""" + pass + + +@simulator.command("fill-test-data") +def fill_test_data_command(): + """Fill the scheduler with test data for simulation purposes.""" + from swh.scheduler.simulator import fill_test_data + + click.echo("Filling test data...") + start = time.monotonic() + fill_test_data() + runtime = time.monotonic() - start + click.echo(f"Completed in {runtime:.2f} seconds") + + +@simulator.command("run") +@click.option( + "--scheduler", + "-s", + type=click.Choice(["task_scheduler", "origin_scheduler"]), + default="origin_scheduler", + help="Scheduler to simulate", +) +@click.option( + "--policy", + "-p", + type=click.Choice(["oldest_scheduled_first"]), + default="oldest_scheduled_first", + help="Scheduling policy to simulate (only for origin_scheduler)", +) +@click.option("--runtime", "-t", type=float, help="Simulated runtime") +def run_command(scheduler, policy, runtime): + """Run the scheduler simulator. + + By default, the simulation runs forever. You can cap the simulated runtime + with the --runtime option, and you can always press Ctrl+C to interrupt the + running simulation. + + 'task_scheduler' is the "classic" task-based scheduler; 'origin_scheduler' + is the new origin-visit-aware simulator. The latter uses --policy to decide + which origins to schedule first based on information from listers. + """ + from swh.scheduler.simulator import run + + policy = policy if scheduler == "origin_scheduler" else None + run(scheduler=scheduler, policy=policy, runtime=runtime) diff --git a/swh/scheduler/interface.py b/swh/scheduler/interface.py --- a/swh/scheduler/interface.py +++ b/swh/scheduler/interface.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime from typing import Any, Dict, Iterable, List, Optional from uuid import UUID @@ -15,6 +16,7 @@ Lister, OriginVisitStats, PaginatedListedOriginList, + SchedulerMetrics, ) @@ -340,3 +342,32 @@ ) -> Optional[OriginVisitStats]: """Retrieve the stats for an origin with a given visit type""" ... + + @remote_api_endpoint("scheduler_metrics/update") + def update_metrics( + self, + lister_id: Optional[UUID] = None, + timestamp: Optional[datetime.datetime] = None, + ) -> List[SchedulerMetrics]: + """Update the performance metrics of this scheduler instance. + + Returns the updated metrics. + + Args: + lister_id: if passed, update the metrics only for this lister instance + timestamp: if passed, the date at which we're updating the metrics, + defaults to the database NOW() + """ + ... + + @remote_api_endpoint("scheduler_metrics/get") + def get_metrics( + self, lister_id: Optional[UUID] = None, visit_type: Optional[str] = None + ) -> List[SchedulerMetrics]: + """Retrieve the performance metrics of this scheduler instance. + + Args: + lister_id: filter the metrics for this lister instance only + visit_type: filter the metrics for this visit type only + """ + ... diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -252,3 +252,35 @@ @last_notfound.validator def check_last_notfound(self, attribute, value): check_timestamptz(value) + + +@attr.s(frozen=True, slots=True) +class SchedulerMetrics(BaseSchedulerModel): + """Metrics for the scheduler, aggregated by (lister_id, visit_type)""" + + lister_id = attr.ib( + type=UUID, validator=[type_validator()], metadata={"primary_key": True} + ) + visit_type = attr.ib( + type=str, validator=[type_validator()], metadata={"primary_key": True} + ) + + last_update = attr.ib( + type=Optional[datetime.datetime], validator=[type_validator()], default=None, + ) + + origins_known = attr.ib(type=int, validator=[type_validator()], default=0) + """Number of known (enabled or disabled) origins""" + + origins_enabled = attr.ib(type=int, validator=[type_validator()], default=0) + """Number of origins that were present in the latest listings""" + + origins_never_visited = attr.ib(type=int, validator=[type_validator()], default=0) + """Number of enabled origins that have never been visited + (according to the visit cache)""" + + origins_with_pending_changes = attr.ib( + type=int, validator=[type_validator()], default=0 + ) + """Number of enabled origins with known activity (recorded by a lister) + since our last visit""" diff --git a/swh/scheduler/simulator/__init__.py b/swh/scheduler/simulator/__init__.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/simulator/__init__.py @@ -0,0 +1,144 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""This package runs the scheduler in a simulated environment, to evaluate +various metrics. See :ref:`swh-scheduler-simulator`. + +This module orchestrates of the simulator by initializing processes and connecting +them together; these processes are defined in modules in the package and +simulate/call specific components.""" + +from datetime import datetime, timedelta, timezone +import logging +from typing import Dict, Generator, Optional + +from simpy import Event + +from swh.scheduler import get_scheduler +from swh.scheduler.model import ListedOrigin + +from . import origin_scheduler, task_scheduler +from .common import Environment, Queue, SimulationReport, Task +from .origins import load_task_process + +logger = logging.getLogger(__name__) + + +def worker_process( + env: Environment, name: str, task_queue: Queue, status_queue: Queue +) -> Generator[Event, Task, None]: + """A worker which consumes tasks from the input task_queue. Tasks + themselves send OriginVisitStatus objects to the status_queue.""" + logger.debug("%s worker %s: Start", env.time, name) + while True: + task = yield task_queue.get() + logger.debug( + "%s worker %s: Run task %s origin=%s", + env.time, + name, + task.visit_type, + task.origin, + ) + yield env.process(load_task_process(env, task, status_queue=status_queue)) + + +def setup( + env: Environment, + scheduler: str, + policy: Optional[str], + workers_per_type: Dict[str, int], + task_queue_capacity: int, + min_batch_size: int, +): + # We expect PGHOST, PGPORT, ... set externally + task_queues = { + visit_type: Queue(env, capacity=task_queue_capacity) + for visit_type in workers_per_type + } + status_queue = Queue(env) + + if scheduler == "origin_scheduler": + if policy is None: + raise ValueError("origin_scheduler needs a scheduling policy") + env.process( + origin_scheduler.scheduler_runner_process( + env, task_queues, policy, min_batch_size=min_batch_size + ) + ) + env.process( + origin_scheduler.scheduler_journal_client_process(env, status_queue) + ) + elif scheduler == "task_scheduler": + if policy is not None: + raise ValueError("task_scheduler doesn't support a scheduling policy") + env.process( + task_scheduler.scheduler_runner_process( + env, task_queues, min_batch_size=min_batch_size + ) + ) + env.process(task_scheduler.scheduler_listener_process(env, status_queue)) + else: + raise ValueError(f"Unknown scheduler: {scheduler}") + + for visit_type, num_workers in workers_per_type.items(): + task_queue = task_queues[visit_type] + for i in range(num_workers): + worker_name = f"worker-{visit_type}-{i}" + env.process(worker_process(env, worker_name, task_queue, status_queue)) + + +def fill_test_data(num_origins: int = 100000): + """Fills the database with mock data to test the simulator.""" + scheduler = get_scheduler(cls="local", db="") + + stored_lister = scheduler.get_or_create_lister(name="example") + assert stored_lister.id is not None + + origins = [ + ListedOrigin( + lister_id=stored_lister.id, + url=f"https://example.com/{i:04d}.git", + visit_type="git", + last_update=datetime(2020, 6, 15, 16, 0, 0, i, tzinfo=timezone.utc), + ) + for i in range(num_origins) + ] + scheduler.record_listed_origins(origins) + + scheduler.create_tasks( + [ + { + **origin.as_task_dict(), + "policy": "recurring", + "next_run": origin.last_update, + "interval": timedelta(days=64), + } + for origin in origins + ] + ) + + +def run(scheduler: str, policy: Optional[str], runtime: Optional[int]): + NUM_WORKERS = 48 + start_time = datetime.now(tz=timezone.utc) + env = Environment(start_time=start_time) + env.scheduler = get_scheduler(cls="local", db="") + env.report = SimulationReport() + setup( + env, + scheduler=scheduler, + policy=policy, + workers_per_type={"git": NUM_WORKERS}, + task_queue_capacity=10000, + min_batch_size=1000, + ) + try: + env.run(until=runtime) + except KeyboardInterrupt: + pass + finally: + end_time = env.time + print("total time:", end_time - start_time) + print(env.report.format()) diff --git a/swh/scheduler/simulator/common.py b/swh/scheduler/simulator/common.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/simulator/common.py @@ -0,0 +1,102 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from dataclasses import dataclass, field +from datetime import datetime, timedelta +import textwrap +from typing import Dict, List, Tuple +import uuid + +import plotille +from simpy import Environment as _Environment +from simpy import Store + +from swh.model.model import OriginVisitStatus +from swh.scheduler.interface import SchedulerInterface + + +@dataclass +class SimulationReport: + DURATION_THRESHOLD = 3600 + """Max duration for histograms""" + + total_visits: int = 0 + """Total count of finished visits""" + + visit_runtimes: Dict[Tuple[str, bool], List[float]] = field(default_factory=dict) + """Collect the visit runtimes for each (status, eventful) tuple""" + + def record_visit(self, duration: float, eventful: bool, status: str) -> None: + self.total_visits += 1 + self.visit_runtimes.setdefault((status, eventful), []).append(duration) + + @property + def useless_visits(self): + """Number of uneventful, full visits""" + return len(self.visit_runtimes.get(("full", False), [])) + + def runtime_histogram(self, status: str, eventful: bool) -> str: + runtimes = self.visit_runtimes.get((status, eventful), []) + return plotille.hist( + [runtime for runtime in runtimes if runtime <= self.DURATION_THRESHOLD] + ) + + def format(self): + full_visits = self.visit_runtimes.get(("full", True), []) + histogram = self.runtime_histogram("full", True) + long_tasks = sum(runtime > self.DURATION_THRESHOLD for runtime in full_visits) + + return ( + textwrap.dedent( + f"""\ + Total visits: {self.total_visits} + Useless visits: {self.useless_visits} + Eventful visits: {len(full_visits)} + Very long running tasks: {long_tasks} + Visit time histogram for eventful visits: + """ + ) + + histogram + ) + + +@dataclass +class Task: + visit_type: str + origin: str + backend_id: uuid.UUID = field(default_factory=uuid.uuid4) + + +@dataclass +class TaskEvent: + task: Task + status: OriginVisitStatus + eventful: bool = field(default=False) + + +class Queue(Store): + """Model a queue of objects to be passed between processes.""" + + def __len__(self): + return len(self.items or []) + + def slots_remaining(self): + return self.capacity - len(self) + + +class Environment(_Environment): + report: SimulationReport + scheduler: SchedulerInterface + + def __init__(self, start_time: datetime): + if start_time.tzinfo is None: + raise ValueError("start_time must have timezone information") + self.start_time = start_time + super().__init__() + + @property + def time(self): + """Get the current simulated wall clock time""" + return self.start_time + timedelta(seconds=self.now) diff --git a/swh/scheduler/simulator/origin_scheduler.py b/swh/scheduler/simulator/origin_scheduler.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/simulator/origin_scheduler.py @@ -0,0 +1,68 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Agents using the new origin-aware scheduler.""" + +import logging +from typing import Any, Dict, Generator, Iterator, List + +from simpy import Event + +from swh.scheduler.journal_client import process_journal_objects + +from .common import Environment, Queue, Task, TaskEvent + +logger = logging.getLogger(__name__) + + +def scheduler_runner_process( + env: Environment, task_queues: Dict[str, Queue], policy: str, min_batch_size: int +) -> Iterator[Event]: + """Scheduler runner. Grabs next visits from the database according to the + scheduling policy, and fills the task_queues accordingly.""" + + while True: + for visit_type, queue in task_queues.items(): + remaining = queue.slots_remaining() + if remaining < min_batch_size: + continue + next_origins = env.scheduler.grab_next_visits( + visit_type, remaining, policy=policy + ) + logger.debug( + "%s runner: running %s %s tasks", + env.time, + visit_type, + len(next_origins), + ) + for origin in next_origins: + yield queue.put(Task(visit_type=origin.visit_type, origin=origin.url)) + + yield env.timeout(10.0) + + +def scheduler_journal_client_process( + env: Environment, status_queue: Queue +) -> Generator[Event, TaskEvent, None]: + """Scheduler journal client. Every once in a while, pulls + `OriginVisitStatus`es from the status_queue to update the scheduler + origin_visit_stats table.""" + BATCH_SIZE = 100 + + statuses: List[Dict[str, Any]] = [] + while True: + task_event = yield status_queue.get() + statuses.append(task_event.status.to_dict()) + if len(statuses) < BATCH_SIZE: + continue + + logger.debug( + "%s journal client: processing %s statuses", env.time, len(statuses) + ) + process_journal_objects( + {"origin_visit_status": statuses}, scheduler=env.scheduler + ) + + statuses = [] diff --git a/swh/scheduler/simulator/origins.py b/swh/scheduler/simulator/origins.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/simulator/origins.py @@ -0,0 +1,128 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""This module implements a model of the frequency of updates of an origin +and how long it takes to load it.""" + +from datetime import timedelta +import hashlib +import logging +import os +from typing import Iterator, Optional, Tuple + +import attr +from simpy import Event + +from swh.model.model import OriginVisitStatus +from swh.scheduler.model import OriginVisitStats + +from .common import Environment, Queue, Task, TaskEvent + +logger = logging.getLogger(__name__) + + +class OriginModel: + MIN_RUN_TIME = 0.5 + """Minimal run time for a visit (retrieved from production data)""" + + MAX_RUN_TIME = 7200 + """Max run time for a visit""" + + PER_COMMIT_RUN_TIME = 0.1 + """Run time per commit""" + + def __init__(self, type: str, origin: str): + self.type = type + self.origin = origin + + def seconds_between_commits(self): + """Returns a random 'average time between two commits' of this origin, + used to estimate the run time of a load task, and how much the loading + architecture is lagging behind origin updates.""" + n_bytes = 2 + num_buckets = 2 ** (8 * n_bytes) + + # Deterministic seed to generate "random" characteristics of this origin + bucket = int.from_bytes( + hashlib.md5(self.origin.encode()).digest()[0:n_bytes], "little" + ) + + # minimum: 1 second (bucket == 0) + # max: 10 years (bucket == num_buckets - 1) + ten_y = 10 * 365 * 24 * 3600 + + return ten_y ** (bucket / num_buckets) + # return 1 + (ten_y - 1) * (bucket / (num_buckets - 1)) + + def load_task_characteristics( + self, env: Environment, stats: Optional[OriginVisitStats] + ) -> Tuple[float, bool, str]: + """Returns the (run_time, eventfulness, end_status) of the next + origin visit.""" + if stats and stats.last_eventful: + time_since_last_successful_run = env.time - stats.last_eventful + else: + time_since_last_successful_run = timedelta(days=365) + + seconds_between_commits = self.seconds_between_commits() + logger.debug( + "Interval between commits: %s", timedelta(seconds=seconds_between_commits) + ) + + seconds_since_last_successful = time_since_last_successful_run.total_seconds() + if seconds_since_last_successful < seconds_between_commits: + # No commits since last visit => uneventful + return (self.MIN_RUN_TIME, False, "full") + else: + n_commits = seconds_since_last_successful / seconds_between_commits + run_time = self.MIN_RUN_TIME + self.PER_COMMIT_RUN_TIME * n_commits + if run_time > self.MAX_RUN_TIME: + return (self.MAX_RUN_TIME, False, "partial") + else: + return (run_time, True, "full") + + +def load_task_process( + env: Environment, task: Task, status_queue: Queue +) -> Iterator[Event]: + """A loading task. This pushes OriginVisitStatus objects to the + status_queue to simulate the visible outcomes of the task. + + Uses the `load_task_duration` function to determine its run time. + """ + # This is cheating; actual tasks access the state from the storage, not the + # scheduler + stats = env.scheduler.origin_visit_stats_get(task.origin, task.visit_type) + last_snapshot = stats.last_snapshot if stats else None + + status = OriginVisitStatus( + origin=task.origin, + visit=42, + type=task.visit_type, + status="created", + date=env.time, + snapshot=None, + ) + origin_model = OriginModel(task.visit_type, task.origin) + (run_time, eventful, end_status) = origin_model.load_task_characteristics( + env, stats + ) + logger.debug("%s task %s origin=%s: Start", env.time, task.visit_type, task.origin) + yield status_queue.put(TaskEvent(task=task, status=status)) + yield env.timeout(run_time) + logger.debug("%s task %s origin=%s: End", env.time, task.visit_type, task.origin) + + new_snapshot = os.urandom(20) if eventful else last_snapshot + yield status_queue.put( + TaskEvent( + task=task, + status=attr.evolve( + status, status=end_status, date=env.time, snapshot=new_snapshot + ), + eventful=eventful, + ) + ) + + env.report.record_visit(run_time, eventful, end_status) diff --git a/swh/scheduler/simulator/task_scheduler.py b/swh/scheduler/simulator/task_scheduler.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/simulator/task_scheduler.py @@ -0,0 +1,76 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Agents using the "old" task-based scheduler.""" + +import logging +from typing import Dict, Generator, Iterator + +from simpy import Event + +from .common import Environment, Queue, Task, TaskEvent + +logger = logging.getLogger(__name__) + + +def scheduler_runner_process( + env: Environment, task_queues: Dict[str, Queue], min_batch_size: int, +) -> Iterator[Event]: + """Scheduler runner. Grabs next visits from the database according to the + scheduling policy, and fills the task_queues accordingly.""" + + while True: + for visit_type, queue in task_queues.items(): + remaining = queue.slots_remaining() + if remaining < min_batch_size: + continue + next_tasks = env.scheduler.grab_ready_tasks( + f"load-{visit_type}", num_tasks=remaining, timestamp=env.time + ) + logger.debug( + "%s runner: running %s %s tasks", env.time, visit_type, len(next_tasks), + ) + + sim_tasks = [ + Task(visit_type=visit_type, origin=task["arguments"]["kwargs"]["url"]) + for task in next_tasks + ] + + env.scheduler.mass_schedule_task_runs( + [ + { + "task": task["id"], + "scheduled": env.time, + "backend_id": str(sim_task.backend_id), + } + for task, sim_task in zip(next_tasks, sim_tasks) + ] + ) + + for sim_task in sim_tasks: + yield queue.put(sim_task) + + yield env.timeout(10.0) + + +def scheduler_listener_process( + env: Environment, status_queue: Queue +) -> Generator[Event, TaskEvent, None]: + """Scheduler listener. In the real world this would listen to celery + events, but we listen to the status_queue and simulate celery events from + that.""" + while True: + event = yield status_queue.get() + if event.status.status == "ongoing": + env.scheduler.start_task_run(event.task.backend_id, timestamp=env.time) + else: + if event.status.status == "full": + status = "eventful" if event.eventful else "uneventful" + else: + status = "failed" + + env.scheduler.end_task_run( + str(event.task.backend_id), status=status, timestamp=env.time + ) diff --git a/swh/scheduler/sql/30-schema.sql b/swh/scheduler/sql/30-schema.sql --- a/swh/scheduler/sql/30-schema.sql +++ b/swh/scheduler/sql/30-schema.sql @@ -11,7 +11,7 @@ comment on column dbversion.description is 'Version description'; insert into dbversion (version, release, description) - values (23, now(), 'Work In Progress'); + values (24, now(), 'Work In Progress'); create table task_type ( type text primary key, @@ -184,3 +184,25 @@ comment on column origin_visit_stats.last_failed is 'Date of the last failed event'; comment on column origin_visit_stats.last_notfound is 'Date of the last notfound event'; comment on column origin_visit_stats.last_snapshot is 'sha1_git of the last visit snapshot'; + + +create table scheduler_metrics ( + lister_id uuid not null references listers(id), + visit_type text not null, + last_update timestamptz not null, + origins_known int not null default 0, + origins_enabled int not null default 0, + origins_never_visited int not null default 0, + origins_with_pending_changes int not null default 0, + + primary key (lister_id, visit_type) +); + +comment on table scheduler_metrics is 'Cache of per-lister metrics for the scheduler, collated between the listed_origins and origin_visit_stats tables.'; +comment on column scheduler_metrics.lister_id is 'Lister instance on which metrics have been aggregated'; +comment on column scheduler_metrics.visit_type is 'Visit type on which metrics have been aggregated'; +comment on column scheduler_metrics.last_update is 'Last update of these metrics'; +comment on column scheduler_metrics.origins_known is 'Number of known (enabled or disabled) origins'; +comment on column scheduler_metrics.origins_enabled is 'Number of origins that were present in the latest listing'; +comment on column scheduler_metrics.origins_never_visited is 'Number of origins that have never been visited'; +comment on column scheduler_metrics.origins_with_pending_changes is 'Number of enabled origins with known activity since our last visit'; diff --git a/swh/scheduler/sql/40-func.sql b/swh/scheduler/sql/40-func.sql --- a/swh/scheduler/sql/40-func.sql +++ b/swh/scheduler/sql/40-func.sql @@ -406,3 +406,36 @@ for each row when (new.status NOT IN ('scheduled', 'started')) execute procedure swh_scheduler_update_task_on_task_end (); + + +create or replace function update_metrics(lister_id uuid default NULL, ts timestamptz default now()) + returns setof scheduler_metrics + language sql +as $$ + insert into scheduler_metrics ( + lister_id, visit_type, last_update, + origins_known, origins_enabled, + origins_never_visited, origins_with_pending_changes + ) + select + lo.lister_id, lo.visit_type, coalesce(ts, now()) as last_update, + count(*) as origins_known, + count(*) filter (where enabled) as origins_enabled, + count(*) filter (where + enabled and last_snapshot is NULL + ) as origins_never_visited, + count(*) filter (where + enabled and lo.last_update > greatest(ovs.last_eventful, ovs.last_uneventful) + ) as origins_with_pending_changes + from listed_origins lo + left join origin_visit_stats ovs using (url, visit_type) + where + -- update only for the requested lister + update_metrics.lister_id = lo.lister_id + -- or for all listers if the function argument is null + or update_metrics.lister_id is null + group by (lister_id, visit_type) + returning * +$$; + +comment on function update_metrics(uuid, timestamptz) is 'Update metrics for the given lister_id'; diff --git a/swh/scheduler/tests/test_api_client.py b/swh/scheduler/tests/test_api_client.py --- a/swh/scheduler/tests/test_api_client.py +++ b/swh/scheduler/tests/test_api_client.py @@ -48,6 +48,8 @@ "origins/grab_next", "origins/record", "priority_ratios/get", + "scheduler_metrics/get", + "scheduler_metrics/update", "task/create", "task/delete_archived", "task/disable", diff --git a/swh/scheduler/tests/test_scheduler.py b/swh/scheduler/tests/test_scheduler.py --- a/swh/scheduler/tests/test_scheduler.py +++ b/swh/scheduler/tests/test_scheduler.py @@ -8,7 +8,7 @@ import datetime import inspect import random -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple import uuid import attr @@ -17,7 +17,12 @@ from swh.model.hashutil import hash_to_bytes from swh.scheduler.exc import SchedulerException, StaleData, UnknownPolicy from swh.scheduler.interface import SchedulerInterface -from swh.scheduler.model import ListedOrigin, ListedOriginPageToken, OriginVisitStats +from swh.scheduler.model import ( + ListedOrigin, + ListedOriginPageToken, + OriginVisitStats, + SchedulerMetrics, +) from swh.scheduler.utils import utcnow from .common import LISTERS, TASK_TYPES, TEMPLATES, tasks_from_template @@ -31,6 +36,14 @@ return {k: d[k] for k in keys if k not in excl} +def metrics_sort_key(m: SchedulerMetrics) -> Tuple[uuid.UUID, str]: + return (m.lister_id, m.visit_type) + + +def assert_metrics_equal(left, right): + assert sorted(left, key=metrics_sort_key) == sorted(right, key=metrics_sort_key) + + class TestScheduler: def test_interface(self, swh_scheduler): """Checks all methods of SchedulerInterface are implemented by this @@ -977,3 +990,141 @@ ), ] ) + + def test_metrics_origins_known(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + ret = swh_scheduler.update_metrics() + + assert sum(metric.origins_known for metric in ret) == len(listed_origins) + + def test_metrics_origins_enabled(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + disabled_origin = attr.evolve(listed_origins[0], enabled=False) + swh_scheduler.record_listed_origins([disabled_origin]) + + ret = swh_scheduler.update_metrics(lister_id=disabled_origin.lister_id) + for metric in ret: + if metric.visit_type == disabled_origin.visit_type: + # We disabled one of these origins + assert metric.origins_known - metric.origins_enabled == 1 + else: + # But these are still all enabled + assert metric.origins_known == metric.origins_enabled + + def test_metrics_origins_never_visited(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + # Pretend that we've recorded a visit on one origin + visited_origin = listed_origins[0] + swh_scheduler.origin_visit_stats_upsert( + [ + OriginVisitStats( + url=visited_origin.url, + visit_type=visited_origin.visit_type, + last_eventful=utcnow(), + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes( + "d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" + ), + ), + ] + ) + + ret = swh_scheduler.update_metrics(lister_id=visited_origin.lister_id) + for metric in ret: + if metric.visit_type == visited_origin.visit_type: + # We visited one of these origins + assert metric.origins_known - metric.origins_never_visited == 1 + else: + # But none of these have been visited + assert metric.origins_known == metric.origins_never_visited + + def test_metrics_origins_with_pending_changes(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + # Pretend that we've recorded a visit on one origin, in the past with + # respect to the "last update" time for the origin + visited_origin = listed_origins[0] + assert visited_origin.last_update is not None + swh_scheduler.origin_visit_stats_upsert( + [ + OriginVisitStats( + url=visited_origin.url, + visit_type=visited_origin.visit_type, + last_eventful=visited_origin.last_update + - datetime.timedelta(days=1), + last_uneventful=None, + last_failed=None, + last_notfound=None, + last_snapshot=hash_to_bytes( + "d81cc0710eb6cf9efd5b920a8453e1e07157b6cd" + ), + ), + ] + ) + + ret = swh_scheduler.update_metrics(lister_id=visited_origin.lister_id) + for metric in ret: + if metric.visit_type == visited_origin.visit_type: + # We visited one of these origins, in the past + assert metric.origins_with_pending_changes == 1 + else: + # But none of these have been visited + assert metric.origins_with_pending_changes == 0 + + def test_update_metrics_explicit_lister(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + fake_uuid = uuid.uuid4() + assert all(fake_uuid != origin.lister_id for origin in listed_origins) + + ret = swh_scheduler.update_metrics(lister_id=fake_uuid) + + assert len(ret) == 0 + + def test_update_metrics_explicit_timestamp(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + + ts = datetime.datetime(2020, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) + + ret = swh_scheduler.update_metrics(timestamp=ts) + + assert all(metric.last_update == ts for metric in ret) + + def test_get_metrics(self, swh_scheduler, listed_origins): + swh_scheduler.record_listed_origins(listed_origins) + updated = swh_scheduler.update_metrics() + + retrieved = swh_scheduler.get_metrics() + assert_metrics_equal(updated, retrieved) + + def test_get_metrics_by_lister(self, swh_scheduler, listed_origins): + lister_id = listed_origins[0].lister_id + assert lister_id is not None + + swh_scheduler.record_listed_origins(listed_origins) + updated = swh_scheduler.update_metrics() + + retrieved = swh_scheduler.get_metrics(lister_id=lister_id) + assert len(retrieved) > 0 + + assert_metrics_equal( + [metric for metric in updated if metric.lister_id == lister_id], retrieved + ) + + def test_get_metrics_by_visit_type(self, swh_scheduler, listed_origins): + visit_type = listed_origins[0].visit_type + assert visit_type is not None + + swh_scheduler.record_listed_origins(listed_origins) + updated = swh_scheduler.update_metrics() + + retrieved = swh_scheduler.get_metrics(visit_type=visit_type) + assert len(retrieved) > 0 + + assert_metrics_equal( + [metric for metric in updated if metric.visit_type == visit_type], retrieved + ) diff --git a/swh/scheduler/tests/test_simulator.py b/swh/scheduler/tests/test_simulator.py new file mode 100644 --- /dev/null +++ b/swh/scheduler/tests/test_simulator.py @@ -0,0 +1,45 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +import swh.scheduler.simulator as simulator +from swh.scheduler.tests.common import TASK_TYPES + +NUM_ORIGINS = 42 +TEST_RUNTIME = 1000 + + +@pytest.fixture +def monkeypatch_get_scheduler(swh_scheduler, monkeypatch): + def get_scheduler(*args, **kwargs): + return swh_scheduler + + monkeypatch.setattr(simulator, "get_scheduler", get_scheduler) + + for task_type in TASK_TYPES.values(): + swh_scheduler.create_task_type(task_type) + + +def test_fill_test_data(swh_scheduler, monkeypatch_get_scheduler): + simulator.fill_test_data(num_origins=NUM_ORIGINS) + + res = swh_scheduler.get_listed_origins() + assert len(res.origins) == NUM_ORIGINS + assert res.next_page_token is None + + res = swh_scheduler.search_tasks() + assert len(res) == NUM_ORIGINS + + +@pytest.mark.parametrize("policy", ("oldest_scheduled_first",)) +def test_run_origin_scheduler(swh_scheduler, monkeypatch_get_scheduler, policy): + simulator.fill_test_data(num_origins=NUM_ORIGINS) + simulator.run("origin_scheduler", policy=policy, runtime=TEST_RUNTIME) + + +def test_run_task_scheduler(swh_scheduler, monkeypatch_get_scheduler): + simulator.fill_test_data(num_origins=NUM_ORIGINS) + simulator.run("task_scheduler", policy=None, runtime=TEST_RUNTIME)