diff --git a/swh/scheduler/celery_backend/recurrent_visits.py b/swh/scheduler/celery_backend/recurrent_visits.py index 6e542e6..2c903be 100644 --- a/swh/scheduler/celery_backend/recurrent_visits.py +++ b/swh/scheduler/celery_backend/recurrent_visits.py @@ -1,365 +1,366 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This schedules the recurrent visits, for listed origins, in Celery. For "oneshot" (save code now, lister) tasks, check the :mod:`swh.scheduler.celery_backend.runner` and :mod:`swh.scheduler.celery_backend.pika_listener` modules. """ from __future__ import annotations from itertools import chain import logging from queue import Empty, Queue import random from threading import Thread import time from typing import TYPE_CHECKING, Any, Dict, List, Tuple from kombu.utils.uuid import uuid from swh.scheduler.celery_backend.config import get_available_slots +from swh.scheduler.utils import create_origin_task_dict if TYPE_CHECKING: from ..interface import SchedulerInterface from ..model import ListedOrigin logger = logging.getLogger(__name__) DEFAULT_POLICY = [ {"policy": "already_visited_order_by_lag", "weight": 50}, {"policy": "never_visited_oldest_update_first", "weight": 50}, ] DEFAULT_DVCS_POLICY = [ {"policy": "already_visited_order_by_lag", "weight": 49}, {"policy": "never_visited_oldest_update_first", "weight": 49}, {"policy": "origins_without_last_update", "weight": 2}, ] DEFAULT_GIT_POLICY = [ {"policy": "already_visited_order_by_lag", "weight": 49, "tablesample": 0.1}, {"policy": "never_visited_oldest_update_first", "weight": 49, "tablesample": 0.1}, {"policy": "origins_without_last_update", "weight": 2, "tablesample": 0.1}, ] DEFAULT_POLICY_CONFIG: Dict[str, List[Dict[str, Any]]] = { "default": DEFAULT_POLICY, "hg": DEFAULT_DVCS_POLICY, "svn": DEFAULT_DVCS_POLICY, "cvs": DEFAULT_DVCS_POLICY, "bzr": DEFAULT_DVCS_POLICY, "git": DEFAULT_GIT_POLICY, } """Scheduling policies to use to retrieve visits for the given visit types, with their relative weights""" MIN_SLOTS_RATIO = 0.05 """Quantity of slots that need to be available (with respect to max_queue_length) for :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` to trigger""" QUEUE_FULL_BACKOFF = 60 """Backoff time (in seconds) if there's fewer than :py:data:`MIN_SLOTS_RATIO` slots available in the queue.""" NO_ORIGINS_SCHEDULED_BACKOFF = 20 * 60 """Backoff time (in seconds) if no origins have been scheduled in the current iteration""" BACKOFF_SPLAY = 5.0 """Amplitude of the fuzziness between backoffs""" TERMINATE = object() """Termination request received from command queue (singleton used for identity comparison)""" def grab_next_visits_policy_weights( scheduler: SchedulerInterface, visit_type: str, num_visits: int, policy_cfg: List[Dict[str, Any]], ) -> List[ListedOrigin]: """Get the next ``num_visits`` for the given ``visit_type`` using the corresponding set of scheduling policies. The :py:data:`POLICY_CFG` list sets, for the current visit type, the scheduling policies used to pull the next tasks. Each policy config entry in the list should at least have a 'policy' (policy name) and a 'weight' entry. For each policy in this policy_cfg list, the number of returned origins to visit will be weighted using this weight config option so that the total number of returned origins is around num_visits. Any other key/value entry in the policy configuration will be passed to the scheduler.grab_next_visit() method. This function emits a warning if the ratio of retrieved origins is off of the requested ratio by more than 5%. Returns: at most ``num_visits`` :py:class:`~swh.scheduler.model.ListedOrigin` objects """ policies = [cfg["policy"] for cfg in policy_cfg] if len(set(policies)) != len(policies): raise ValueError( "A policy weights can only appear once; check your policy " f"configuration for visit type {visit_type}" ) weights = [cfg["weight"] for cfg in policy_cfg] total_weight = sum(weights) if not total_weight: raise ValueError(f"No policy weights set for visit type {visit_type}") ratios = [weight / total_weight for weight in weights] extra_kws = [ {k: v for k, v in cfg.items() if k not in ("weight", "policy")} for cfg in policy_cfg ] fetched_origins: Dict[str, List[ListedOrigin]] = {} for policy, ratio, extra_kw in zip(policies, ratios, extra_kws): num_tasks_to_send = int(num_visits * ratio) fetched_origins[policy] = scheduler.grab_next_visits( visit_type, num_tasks_to_send, policy=policy, **extra_kw, ) all_origins: List[ListedOrigin] = list( chain.from_iterable(fetched_origins.values()) ) if not all_origins: return [] # Check whether the ratios of origins fetched are skewed with respect to the # ones we requested fetched_origin_ratios = { policy: len(origins) / len(all_origins) for policy, origins in fetched_origins.items() } for policy, expected_ratio in zip(policies, ratios): # 5% of skew with respect to request if abs(fetched_origin_ratios[policy] - expected_ratio) / expected_ratio > 0.05: logger.info( "Skewed fetch for visit type %s with policy %s: fetched %s, " "requested %s", visit_type, policy, fetched_origin_ratios[policy], expected_ratio, ) return all_origins def splay(): """Return a random short interval by which to vary the backoffs for the visit scheduling threads""" return random.uniform(0, BACKOFF_SPLAY) def send_visits_for_visit_type( scheduler: SchedulerInterface, app, visit_type: str, task_type: Dict, policy_cfg: List[Dict[str, Any]], ) -> float: """Schedule the next batch of visits for the given ``visit_type``. First, we determine the number of available slots by introspecting the RabbitMQ queue. If there's fewer than :py:data:`MIN_SLOTS_RATIO` slots available in the queue, we wait for :py:data:`QUEUE_FULL_BACKOFF` seconds. This avoids running the expensive :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries when there's not many jobs to queue. Once there's more than :py:data:`MIN_SLOTS_RATIO` slots available, we run :py:func:`grab_next_visits_policy_weights` to retrieve the next set of origin visits to schedule, and we send them to celery. If the last scheduling attempt didn't return any origins, we sleep for :py:data:`NO_ORIGINS_SCHEDULED_BACKOFF` seconds. This avoids running the expensive :py:func:`~swh.scheduler.interface.SchedulerInterface.grab_next_visits` queries too often if there's nothing left to schedule. The :py:data:`POLICY_CFG` argument is the policy configuration used to choose the next origins to visit. It is passed directly to the :py:func:`grab_next_visits_policy_weights()` function. Returns: the earliest :py:func:`time.monotonic` value at which to run the next iteration of the loop. """ queue_name = task_type["backend_name"] max_queue_length = task_type.get("max_queue_length") or 0 min_available_slots = max_queue_length * MIN_SLOTS_RATIO current_iteration_start = time.monotonic() # Check queue level available_slots = get_available_slots(app, queue_name, max_queue_length) logger.debug( "%s available slots for visit type %s in queue %s", available_slots, visit_type, queue_name, ) if available_slots < min_available_slots: return current_iteration_start + QUEUE_FULL_BACKOFF origins = grab_next_visits_policy_weights( scheduler, visit_type, available_slots, policy_cfg ) if not origins: logger.debug("No origins to visit for type %s", visit_type) return current_iteration_start + NO_ORIGINS_SCHEDULED_BACKOFF # Try to smooth the ingestion load, origins pulled by different # scheduling policies have different resource usage patterns random.shuffle(origins) for origin in origins: - task_dict = origin.as_task_dict() + task_dict = create_origin_task_dict(origin) app.send_task( queue_name, task_id=uuid(), args=task_dict["arguments"]["args"], kwargs=task_dict["arguments"]["kwargs"], queue=queue_name, ) logger.info( "%s: %s visits scheduled in queue %s", visit_type, len(origins), queue_name, ) # When everything worked, we can try to schedule origins again ASAP. return time.monotonic() def visit_scheduler_thread( config: Dict, visit_type: str, command_queue: Queue[object], exc_queue: Queue[Tuple[str, BaseException]], ): """Target function for the visit sending thread, which initializes local connections and handles exceptions by sending them back to the main thread.""" from swh.scheduler import get_scheduler from swh.scheduler.celery_backend.config import build_app try: # We need to reinitialize these connections because they're not generally # thread-safe app = build_app(config.get("celery")) scheduler = get_scheduler(**config["scheduler"]) task_type = scheduler.get_task_type(f"load-{visit_type}") if task_type is None: raise ValueError(f"Unknown task type: load-{visit_type}") policy_cfg = config.get("scheduling_policy", DEFAULT_POLICY_CONFIG) for policies in policy_cfg.values(): for policy in policies: if "weight" not in policy or "policy" not in policy: raise ValueError( "Each policy configuration needs at least a 'policy' " "and a 'weight' entry" ) policy_cfg = {**DEFAULT_POLICY_CONFIG, **policy_cfg} next_iteration = time.monotonic() while True: # vary the next iteration time a little bit next_iteration = next_iteration + splay() while time.monotonic() < next_iteration: # Wait for next iteration to start. Listen for termination message. try: msg = command_queue.get(block=True, timeout=1) except Empty: continue if msg is TERMINATE: return else: logger.warn("Received unexpected message %s in command queue", msg) next_iteration = send_visits_for_visit_type( scheduler, app, visit_type, task_type, policy_cfg.get(visit_type, policy_cfg["default"]), ) except BaseException as e: exc_queue.put((visit_type, e)) VisitSchedulerThreads = Dict[str, Tuple[Thread, Queue]] """Dict storing the visit scheduler threads and their command queues""" def spawn_visit_scheduler_thread( threads: VisitSchedulerThreads, exc_queue: Queue[Tuple[str, BaseException]], config: Dict[str, Any], visit_type: str, ): """Spawn a new thread to schedule the visits of type ``visit_type``.""" command_queue: Queue[object] = Queue() thread = Thread( target=visit_scheduler_thread, kwargs={ "config": config, "visit_type": visit_type, "command_queue": command_queue, "exc_queue": exc_queue, }, ) threads[visit_type] = (thread, command_queue) thread.start() def terminate_visit_scheduler_threads(threads: VisitSchedulerThreads) -> List[str]: """Terminate all visit scheduler threads""" logger.info("Termination requested...") for _, command_queue in threads.values(): command_queue.put(TERMINATE) loops = 0 while threads and loops < 10: logger.info( "Terminating visit scheduling threads: %s", ", ".join(sorted(threads)) ) loops += 1 for visit_type, (thread, _) in list(threads.items()): thread.join(timeout=1) if not thread.is_alive(): logger.debug("Thread %s terminated", visit_type) del threads[visit_type] if threads: logger.warn( "Could not reap the following threads after 10 attempts: %s", ", ".join(sorted(threads)), ) return list(sorted(threads)) diff --git a/swh/scheduler/cli/origin.py b/swh/scheduler/cli/origin.py index 987ecc3..3451edd 100644 --- a/swh/scheduler/cli/origin.py +++ b/swh/scheduler/cli/origin.py @@ -1,258 +1,259 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from typing import TYPE_CHECKING, Iterable, List, Optional import click from . import cli +from ..utils import create_origin_task_dict if TYPE_CHECKING: from uuid import UUID from ..interface import SchedulerInterface from ..model import ListedOrigin @cli.group("origin") @click.pass_context def origin(ctx): """Manipulate listed origins.""" if not ctx.obj["scheduler"]: raise ValueError("Scheduler class (local/remote) must be instantiated") def format_origins( origins: List[ListedOrigin], fields: Optional[List[str]] = None, with_header: bool = True, ) -> Iterable[str]: """Format a list of origins as CSV. Arguments: origins: list of origins to output fields: optional list of fields to output (defaults to all fields) with_header: if True, output a CSV header. """ import csv from io import StringIO import attr from ..model import ListedOrigin expected_fields = [field.name for field in attr.fields(ListedOrigin)] if not fields: fields = expected_fields unknown_fields = set(fields) - set(expected_fields) if unknown_fields: raise ValueError( "Unknown ListedOrigin field(s): %s" % ", ".join(unknown_fields) ) output = StringIO() writer = csv.writer(output) def csv_row(data): """Return a single CSV-formatted row. We clear the output buffer after we're done to keep it reasonably sized.""" writer.writerow(data) output.seek(0) ret = output.read().rstrip() output.seek(0) output.truncate() return ret if with_header: yield csv_row(fields) for origin in origins: yield csv_row(str(getattr(origin, field)) for field in fields) @origin.command("grab-next") @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" ) @click.option( "--fields", "-f", default=None, help="Listed origin fields to print on output" ) @click.option( "--with-header/--without-header", is_flag=True, default=True, help="Print the CSV header?", ) @click.argument("type", type=str) @click.argument("count", type=int) @click.pass_context def grab_next( ctx, policy: str, fields: Optional[str], with_header: bool, type: str, count: int ): """Grab the next COUNT origins to visit using the TYPE loader from the listed origins table.""" if fields: parsed_fields: Optional[List[str]] = fields.split(",") else: parsed_fields = None scheduler = ctx.obj["scheduler"] origins = scheduler.grab_next_visits(type, count, policy=policy) for line in format_origins(origins, fields=parsed_fields, with_header=with_header): click.echo(line) @origin.command("schedule-next") @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" ) @click.argument("type", type=str) @click.argument("count", type=int) @click.pass_context def schedule_next(ctx, policy: str, type: str, count: int): """Send the next COUNT origin visits of the TYPE loader to the scheduler as one-shot tasks.""" from ..utils import utcnow from .task import pretty_print_task scheduler = ctx.obj["scheduler"] origins = scheduler.grab_next_visits(type, count, policy=policy) created = scheduler.create_tasks( [ { - **origin.as_task_dict(), + **create_origin_task_dict(origin), "policy": "oneshot", "next_run": utcnow(), "retries_left": 1, } for origin in origins ] ) output = ["Created %d tasks\n" % len(created)] for task in created: output.append(pretty_print_task(task)) click.echo_via_pager("\n".join(output)) @origin.command("send-to-celery") @click.option( "--policy", "-p", default="oldest_scheduled_first", help="Scheduling policy" ) @click.option( "--queue", "-q", help="Target celery queue", type=str, ) @click.option( "--tablesample", help="Table sampling percentage", type=float, ) @click.option( "--only-enabled/--only-disabled", "enabled", is_flag=True, default=True, help="""Determine whether we want to scheduled enabled or disabled origins. As default, we want to reasonably deal with enabled origins. For some edge case though, we might want the disabled ones.""", ) @click.option( "--lister-uuid", default=None, help="Limit origins to those listed from such lister", ) @click.argument("type", type=str) @click.pass_context def send_to_celery( ctx, policy: str, queue: Optional[str], tablesample: Optional[float], type: str, enabled: bool, lister_uuid: Optional[str] = None, ): """Send the next origin visits of the TYPE loader to celery, filling the queue.""" from kombu.utils.uuid import uuid from swh.scheduler.celery_backend.config import app, get_available_slots scheduler = ctx.obj["scheduler"] task_type = scheduler.get_task_type(f"load-{type}") task_name = task_type["backend_name"] queue_name = queue or task_name num_tasks = get_available_slots(app, queue_name, task_type["max_queue_length"]) click.echo(f"{num_tasks} slots available in celery queue") origins = scheduler.grab_next_visits( type, num_tasks, policy=policy, tablesample=tablesample, enabled=enabled, lister_uuid=lister_uuid, ) click.echo(f"{len(origins)} visits to send to celery") for origin in origins: - task_dict = origin.as_task_dict() + task_dict = create_origin_task_dict(origin) app.send_task( task_name, task_id=uuid(), args=task_dict["arguments"]["args"], kwargs=task_dict["arguments"]["kwargs"], queue=queue_name, ) @origin.command("update-metrics") @click.option("--lister", default=None, help="Only update metrics for this lister") @click.option( "--instance", default=None, help="Only update metrics for this lister instance" ) @click.pass_context def update_metrics(ctx, lister: Optional[str], instance: Optional[str]): """Update the scheduler metrics on listed origins. Examples: swh scheduler origin update-metrics swh scheduler origin update-metrics --lister github swh scheduler origin update-metrics --lister phabricator --instance llvm """ import json import attr scheduler: SchedulerInterface = ctx.obj["scheduler"] lister_id: Optional[UUID] = None if lister is not None: lister_instance = scheduler.get_lister(name=lister, instance_name=instance) if not lister_instance: click.echo(f"Lister not found: {lister} instance={instance}") ctx.exit(2) assert False # for mypy lister_id = lister_instance.id def dictify_metrics(d): return {k: str(v) for (k, v) in attr.asdict(d).items()} ret = scheduler.update_metrics(lister_id=lister_id) click.echo(json.dumps(list(map(dictify_metrics, ret)), indent=4, sort_keys=True)) diff --git a/swh/scheduler/model.py b/swh/scheduler/model.py index 4a64ffe..0aed578 100644 --- a/swh/scheduler/model.py +++ b/swh/scheduler/model.py @@ -1,274 +1,265 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from enum import Enum from typing import Any, Dict, List, Optional, Tuple, Union from uuid import UUID import attr import attr.converters from attrs_strict import type_validator def check_timestamptz(value) -> None: """Checks the date has a timezone.""" if value is not None and value.tzinfo is None: raise ValueError("date must be a timezone-aware datetime.") @attr.s class BaseSchedulerModel: """Base class for database-backed objects. These database-backed objects are defined through attrs-based attributes that match the columns of the database 1:1. This is a (very) lightweight ORM. These attrs-based attributes have metadata specific to the functionality expected from these fields in the database: - `primary_key`: the column is a primary key; it should be filtered out when doing an `update` of the object - `auto_primary_key`: the column is a primary key, which is automatically handled by the database. It will not be inserted to. This must be matched with a database-side default value. - `auto_now_add`: the column is a timestamp that is set to the current time when the object is inserted, and never updated afterwards. This must be matched with a database-side default value. - `auto_now`: the column is a timestamp that is set to the current time when the object is inserted or updated. """ _pk_cols: Optional[Tuple[str, ...]] = None _select_cols: Optional[Tuple[str, ...]] = None _insert_cols_and_metavars: Optional[Tuple[Tuple[str, ...], Tuple[str, ...]]] = None @classmethod def primary_key_columns(cls) -> Tuple[str, ...]: """Get the primary key columns for this object type""" if cls._pk_cols is None: columns: List[str] = [] for field in attr.fields(cls): if any( field.metadata.get(flag) for flag in ("auto_primary_key", "primary_key") ): columns.append(field.name) cls._pk_cols = tuple(sorted(columns)) return cls._pk_cols @classmethod def select_columns(cls) -> Tuple[str, ...]: """Get all the database columns needed for a `select` on this object type""" if cls._select_cols is None: columns: List[str] = [] for field in attr.fields(cls): columns.append(field.name) cls._select_cols = tuple(sorted(columns)) return cls._select_cols @classmethod def insert_columns_and_metavars(cls) -> Tuple[Tuple[str, ...], Tuple[str, ...]]: """Get the database columns and metavars needed for an `insert` or `update` on this object type. This implements support for the `auto_*` field metadata attributes. """ if cls._insert_cols_and_metavars is None: zipped_cols_and_metavars: List[Tuple[str, str]] = [] for field in attr.fields(cls): if any( field.metadata.get(flag) for flag in ("auto_now_add", "auto_primary_key") ): continue elif field.metadata.get("auto_now"): zipped_cols_and_metavars.append((field.name, "now()")) else: zipped_cols_and_metavars.append((field.name, f"%({field.name})s")) zipped_cols_and_metavars.sort() cols, metavars = zip(*zipped_cols_and_metavars) cls._insert_cols_and_metavars = cols, metavars return cls._insert_cols_and_metavars @attr.s class Lister(BaseSchedulerModel): name = attr.ib(type=str, validator=[type_validator()]) instance_name = attr.ib(type=str, validator=[type_validator()]) # Populated by database id = attr.ib( type=Optional[UUID], validator=type_validator(), default=None, metadata={"auto_primary_key": True}, ) current_state = attr.ib( type=Dict[str, Any], validator=[type_validator()], factory=dict ) created = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now_add": True}, ) updated = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now": True}, ) @attr.s class ListedOrigin(BaseSchedulerModel): """Basic information about a listed origin, output by a lister""" lister_id = attr.ib( type=UUID, validator=[type_validator()], metadata={"primary_key": True} ) url = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) visit_type = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) extra_loader_arguments = attr.ib( type=Dict[str, Any], validator=[type_validator()], factory=dict ) last_update = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, ) enabled = attr.ib(type=bool, validator=[type_validator()], default=True) first_seen = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now_add": True}, ) last_seen = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, metadata={"auto_now": True}, ) - def as_task_dict(self): - return { - "type": f"load-{self.visit_type}", - "arguments": { - "args": [], - "kwargs": {"url": self.url, **self.extra_loader_arguments}, - }, - } - class LastVisitStatus(Enum): successful = "successful" failed = "failed" not_found = "not_found" def convert_last_visit_status( s: Union[None, str, LastVisitStatus] ) -> Optional[LastVisitStatus]: if not isinstance(s, str): return s return LastVisitStatus(s) @attr.s(frozen=True, slots=True) class OriginVisitStats(BaseSchedulerModel): """Represents an aggregated origin visits view.""" url = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) visit_type = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) last_successful = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None ) last_visit = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None ) last_visit_status = attr.ib( type=Optional[LastVisitStatus], validator=type_validator(), default=None, converter=convert_last_visit_status, ) last_scheduled = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, ) last_snapshot = attr.ib( type=Optional[bytes], validator=type_validator(), default=None ) next_visit_queue_position = attr.ib( type=Optional[int], validator=type_validator(), default=None ) next_position_offset = attr.ib(type=int, validator=type_validator(), default=4) successive_visits = attr.ib(type=int, validator=type_validator(), default=1) @last_successful.validator def check_last_successful(self, attribute, value): check_timestamptz(value) @last_visit.validator def check_last_visit(self, attribute, value): check_timestamptz(value) @attr.s(frozen=True, slots=True) class SchedulerMetrics(BaseSchedulerModel): """Metrics for the scheduler, aggregated by (lister_id, visit_type)""" lister_id = attr.ib( type=UUID, validator=[type_validator()], metadata={"primary_key": True} ) visit_type = attr.ib( type=str, validator=[type_validator()], metadata={"primary_key": True} ) last_update = attr.ib( type=Optional[datetime.datetime], validator=[type_validator()], default=None, ) origins_known = attr.ib(type=int, validator=[type_validator()], default=0) """Number of known (enabled or disabled) origins""" origins_enabled = attr.ib(type=int, validator=[type_validator()], default=0) """Number of origins that were present in the latest listings""" origins_never_visited = attr.ib(type=int, validator=[type_validator()], default=0) """Number of enabled origins that have never been visited (according to the visit cache)""" origins_with_pending_changes = attr.ib( type=int, validator=[type_validator()], default=0 ) """Number of enabled origins with known activity (recorded by a lister) since our last visit""" diff --git a/swh/scheduler/simulator/__init__.py b/swh/scheduler/simulator/__init__.py index 7de91c8..741a1e8 100644 --- a/swh/scheduler/simulator/__init__.py +++ b/swh/scheduler/simulator/__init__.py @@ -1,163 +1,164 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This package runs the scheduler in a simulated environment, to evaluate various metrics. See :ref:`swh-scheduler-simulator`. This module orchestrates of the simulator by initializing processes and connecting them together; these processes are defined in modules in the package and simulate/call specific components.""" from datetime import datetime, timedelta, timezone import logging from typing import Dict, Generator, Optional from simpy import Event from swh.scheduler.interface import SchedulerInterface +from swh.scheduler.utils import create_origin_task_dict from . import origin_scheduler, task_scheduler from .common import Environment, Queue, SimulationReport, Task from .origins import generate_listed_origin, lister_process, load_task_process logger = logging.getLogger(__name__) def update_metrics_process( env: Environment, update_interval: int ) -> Generator[Event, None, None]: """Update the scheduler metrics every `update_interval` (simulated) seconds, and add them to the SimulationReport """ t0 = env.time while True: metrics = env.scheduler.update_metrics(timestamp=env.time) env.report.record_metrics(env.time, metrics) dt = env.time - t0 logger.info("time:%s visits:%s", dt, env.report.total_visits) yield env.timeout(update_interval) def worker_process( env: Environment, name: str, task_queue: Queue, status_queue: Queue ) -> Generator[Event, Task, None]: """A worker which consumes tasks from the input task_queue. Tasks themselves send OriginVisitStatus objects to the status_queue.""" logger.debug("%s worker %s: Start", env.time, name) while True: task = yield task_queue.get() logger.debug( "%s worker %s: Run task %s origin=%s", env.time, name, task.visit_type, task.origin, ) yield env.process(load_task_process(env, task, status_queue=status_queue)) def setup( env: Environment, scheduler_type: str, policy: Optional[str], workers_per_type: Dict[str, int], task_queue_capacity: int, min_batch_size: int, metrics_update_interval: int, ): task_queues = { visit_type: Queue(env, capacity=task_queue_capacity) for visit_type in workers_per_type } status_queue = Queue(env) if scheduler_type == "origin_scheduler": if policy is None: raise ValueError("origin_scheduler needs a scheduling policy") env.process( origin_scheduler.scheduler_runner_process( env, task_queues, policy, min_batch_size=min_batch_size ) ) env.process( origin_scheduler.scheduler_journal_client_process(env, status_queue) ) elif scheduler_type == "task_scheduler": if policy is not None: raise ValueError("task_scheduler doesn't support a scheduling policy") env.process( task_scheduler.scheduler_runner_process( env, task_queues, min_batch_size=min_batch_size ) ) env.process(task_scheduler.scheduler_listener_process(env, status_queue)) else: raise ValueError(f"Unknown scheduler type to simulate: {scheduler_type}") env.process(update_metrics_process(env, metrics_update_interval)) for visit_type, num_workers in workers_per_type.items(): task_queue = task_queues[visit_type] for i in range(num_workers): worker_name = f"worker-{visit_type}-{i}" env.process(worker_process(env, worker_name, task_queue, status_queue)) lister = env.scheduler.get_or_create_lister(name="example") assert lister.id env.process(lister_process(env, lister.id)) def fill_test_data(scheduler: SchedulerInterface, num_origins: int = 100000): """Fills the database with mock data to test the simulator.""" stored_lister = scheduler.get_or_create_lister(name="example") assert stored_lister.id is not None # Generate 'num_origins' new origins origins = [generate_listed_origin(stored_lister.id) for _ in range(num_origins)] scheduler.record_listed_origins(origins) scheduler.create_tasks( [ { - **origin.as_task_dict(), + **create_origin_task_dict(origin), "policy": "recurring", "next_run": origin.last_update, "interval": timedelta(days=64), } for origin in origins ] ) def run( scheduler: SchedulerInterface, scheduler_type: str, policy: Optional[str], runtime: Optional[int], ): NUM_WORKERS = 48 start_time = datetime.now(tz=timezone.utc) env = Environment(start_time=start_time) env.scheduler = scheduler env.report = SimulationReport() setup( env, scheduler_type=scheduler_type, policy=policy, workers_per_type={"git": NUM_WORKERS}, task_queue_capacity=10000, min_batch_size=1000, metrics_update_interval=3600, ) try: env.run(until=runtime) except KeyboardInterrupt: pass finally: end_time = env.time print("total simulated time:", end_time - start_time) metrics = env.scheduler.update_metrics(timestamp=end_time) env.report.record_metrics(end_time, metrics) return env.report diff --git a/swh/scheduler/tests/test_model.py b/swh/scheduler/tests/test_model.py index 5780ebd..125a59f 100644 --- a/swh/scheduler/tests/test_model.py +++ b/swh/scheduler/tests/test_model.py @@ -1,127 +1,94 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime -import uuid import attr from swh.scheduler import model def test_select_columns(): @attr.s class TestModel(model.BaseSchedulerModel): id = attr.ib(type=str) test1 = attr.ib(type=str) a_first_attr = attr.ib(type=str) @property def test2(self): """This property should not show up in the extracted columns""" return self.test1 assert TestModel.select_columns() == ("a_first_attr", "id", "test1") def test_insert_columns(): @attr.s class TestModel(model.BaseSchedulerModel): id = attr.ib(type=str) test1 = attr.ib(type=str) @property def test2(self): """This property should not show up in the extracted columns""" return self.test1 assert TestModel.insert_columns_and_metavars() == ( ("id", "test1"), ("%(id)s", "%(test1)s"), ) def test_insert_columns_auto_now_add(): @attr.s class TestModel(model.BaseSchedulerModel): id = attr.ib(type=str) test1 = attr.ib(type=str) added = attr.ib(type=datetime.datetime, metadata={"auto_now_add": True}) assert TestModel.insert_columns_and_metavars() == ( ("id", "test1"), ("%(id)s", "%(test1)s"), ) def test_insert_columns_auto_now(): @attr.s class TestModel(model.BaseSchedulerModel): id = attr.ib(type=str) test1 = attr.ib(type=str) updated = attr.ib(type=datetime.datetime, metadata={"auto_now": True}) assert TestModel.insert_columns_and_metavars() == ( ("id", "test1", "updated"), ("%(id)s", "%(test1)s", "now()"), ) def test_insert_columns_primary_key(): @attr.s class TestModel(model.BaseSchedulerModel): id = attr.ib(type=str, metadata={"auto_primary_key": True}) test1 = attr.ib(type=str) assert TestModel.insert_columns_and_metavars() == (("test1",), ("%(test1)s",)) def test_insert_primary_key(): @attr.s class TestModel(model.BaseSchedulerModel): id = attr.ib(type=str, metadata={"auto_primary_key": True}) test1 = attr.ib(type=str) assert TestModel.primary_key_columns() == ("id",) @attr.s class TestModel2(model.BaseSchedulerModel): col1 = attr.ib(type=str, metadata={"primary_key": True}) col2 = attr.ib(type=str, metadata={"primary_key": True}) test1 = attr.ib(type=str) assert TestModel2.primary_key_columns() == ("col1", "col2") - - -def test_listed_origin_as_task_dict(): - origin = model.ListedOrigin( - lister_id=uuid.uuid4(), - url="http://example.com/", - visit_type="git", - ) - - task = origin.as_task_dict() - assert task == { - "type": "load-git", - "arguments": {"args": [], "kwargs": {"url": "http://example.com/"}}, - } - - loader_args = {"foo": "bar", "baz": {"foo": "bar"}} - - origin_w_args = model.ListedOrigin( - lister_id=uuid.uuid4(), - url="http://example.com/svn/", - visit_type="svn", - extra_loader_arguments=loader_args, - ) - - task_w_args = origin_w_args.as_task_dict() - assert task_w_args == { - "type": "load-svn", - "arguments": { - "args": [], - "kwargs": {"url": "http://example.com/svn/", **loader_args}, - }, - } diff --git a/swh/scheduler/tests/test_utils.py b/swh/scheduler/tests/test_utils.py index 4f3a8c9..7fb3d4b 100644 --- a/swh/scheduler/tests/test_utils.py +++ b/swh/scheduler/tests/test_utils.py @@ -1,82 +1,115 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timezone from unittest.mock import patch +import uuid -from swh.scheduler import utils +from swh.scheduler import model, utils @patch("swh.scheduler.utils.datetime") def test_create_oneshot_task_dict_simple(mock_datetime): mock_datetime.now.return_value = "some-date" actual_task = utils.create_oneshot_task_dict("some-task-type") expected_task = { "policy": "oneshot", "type": "some-task-type", "next_run": "some-date", "arguments": { "args": [], "kwargs": {}, }, } assert actual_task == expected_task mock_datetime.now.assert_called_once_with(tz=timezone.utc) @patch("swh.scheduler.utils.datetime") def test_create_oneshot_task_dict_other_call(mock_datetime): mock_datetime.now.return_value = "some-other-date" actual_task = utils.create_oneshot_task_dict( "some-task-type", "arg0", "arg1", priority="high", other_stuff="normal" ) expected_task = { "policy": "oneshot", "type": "some-task-type", "next_run": "some-other-date", "arguments": { "args": ("arg0", "arg1"), "kwargs": {"other_stuff": "normal"}, }, "priority": "high", } assert actual_task == expected_task mock_datetime.now.assert_called_once_with(tz=timezone.utc) @patch("swh.scheduler.utils.datetime") def test_create_task_dict(mock_datetime): mock_datetime.now.return_value = "date" actual_task = utils.create_task_dict( "task-type", "recurring", "arg0", "arg1", priority="low", other_stuff="normal", retries_left=3, ) expected_task = { "policy": "recurring", "type": "task-type", "next_run": "date", "arguments": { "args": ("arg0", "arg1"), "kwargs": {"other_stuff": "normal"}, }, "priority": "low", "retries_left": 3, } assert actual_task == expected_task mock_datetime.now.assert_called_once_with(tz=timezone.utc) + + +def test_create_origin_task_dict(): + origin = model.ListedOrigin( + lister_id=uuid.uuid4(), + url="http://example.com/", + visit_type="git", + ) + + task = utils.create_origin_task_dict(origin) + assert task == { + "type": "load-git", + "arguments": {"args": [], "kwargs": {"url": "http://example.com/"}}, + } + + loader_args = {"foo": "bar", "baz": {"foo": "bar"}} + + origin_w_args = model.ListedOrigin( + lister_id=uuid.uuid4(), + url="http://example.com/svn/", + visit_type="svn", + extra_loader_arguments=loader_args, + ) + + task_w_args = utils.create_origin_task_dict(origin_w_args) + assert task_w_args == { + "type": "load-svn", + "arguments": { + "args": [], + "kwargs": {"url": "http://example.com/svn/", **loader_args}, + }, + } diff --git a/swh/scheduler/utils.py b/swh/scheduler/utils.py index f8e7935..1f8ed94 100644 --- a/swh/scheduler/utils.py +++ b/swh/scheduler/utils.py @@ -1,80 +1,93 @@ -# Copyright (C) 2017-2018 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone +from typing import Any, Dict + +from .model import ListedOrigin def utcnow(): return datetime.now(tz=timezone.utc) def get_task(task_name): """Retrieve task object in our application instance by its fully qualified python name. Args: task_name (str): task's name (e.g swh.loader.git.tasks.LoadDiskGitRepository) Returns: Instance of task """ from swh.scheduler.celery_backend.config import app for module in app.conf.CELERY_IMPORTS: __import__(module) return app.tasks[task_name] def create_task_dict(type, policy, *args, **kwargs): """Create a task with type and policy, scheduled for as soon as possible. Args: type (str): Type of oneshot task as per swh-scheduler's db table task_type's column (Ex: load-git, check-deposit) policy (str): oneshot or recurring policy Returns: Expected dictionary for the one-shot task scheduling api (swh.scheduler.backend.create_tasks) """ task_extra = {} for extra_key in ["priority", "retries_left"]: if extra_key in kwargs: extra_val = kwargs.pop(extra_key) task_extra[extra_key] = extra_val task = { "policy": policy, "type": type, "next_run": utcnow(), "arguments": { "args": args if args else [], "kwargs": kwargs if kwargs else {}, }, } task.update(task_extra) return task +def create_origin_task_dict(origin: ListedOrigin) -> Dict[str, Any]: + return { + "type": f"load-{origin.visit_type}", + "arguments": { + "args": [], + "kwargs": {"url": origin.url, **origin.extra_loader_arguments}, + }, + } + + def create_oneshot_task_dict(type, *args, **kwargs): """Create a oneshot task scheduled for as soon as possible. Args: type (str): Type of oneshot task as per swh-scheduler's db table task_type's column (Ex: load-git, check-deposit) Returns: Expected dictionary for the one-shot task scheduling api (swh.scheduler.backend.create_tasks) """ return create_task_dict(type, "oneshot", *args, **kwargs)