diff --git a/swh/scheduler/__init__.py b/swh/scheduler/__init__.py index b1f98ba..cfb15fa 100644 --- a/swh/scheduler/__init__.py +++ b/swh/scheduler/__init__.py @@ -1,76 +1,76 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from importlib import import_module from typing import TYPE_CHECKING, Any, Dict import warnings DEFAULT_CONFIG = { "scheduler": ( "dict", { - "cls": "local", + "cls": "postgresql", "db": "dbname=softwareheritage-scheduler-dev", }, ) } # current configuration. To be set by the config loading mechanism CONFIG = {} # type: Dict[str, Any] if TYPE_CHECKING: from swh.scheduler.interface import SchedulerInterface BACKEND_TYPES: Dict[str, str] = { "postgresql": ".backend.SchedulerBackend", "remote": ".api.client.RemoteScheduler", # deprecated "local": ".backend.SchedulerBackend", } def get_scheduler(cls: str, **kwargs) -> SchedulerInterface: """ Get a scheduler object of class `cls` with arguments `**kwargs`. Args: cls: scheduler's class, either 'local' or 'remote' kwargs: arguments to pass to the class' constructor Returns: an instance of swh.scheduler, either local or remote: local: swh.scheduler.backend.SchedulerBackend remote: swh.scheduler.api.client.RemoteScheduler Raises: ValueError if passed an unknown storage class. """ if "args" in kwargs: warnings.warn( 'Explicit "args" key is deprecated, use keys directly instead.', DeprecationWarning, ) kwargs = kwargs["args"] class_path = BACKEND_TYPES.get(cls) if class_path is None: raise ValueError( f"Unknown Scheduler class `{cls}`. " f"Supported: {', '.join(BACKEND_TYPES)}" ) (module_path, class_name) = class_path.rsplit(".", 1) module = import_module(module_path, package=__package__) BackendClass = getattr(module, class_name) return BackendClass(**kwargs) get_datastore = get_scheduler diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py index 5854095..abc3ab8 100644 --- a/swh/scheduler/api/server.py +++ b/swh/scheduler/api/server.py @@ -1,150 +1,150 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os from swh.core import config from swh.core.api import JSONFormatter, MsgpackFormatter, RPCServerApp from swh.core.api import encode_data_server as encode_data from swh.core.api import error_handler, negotiate from swh.scheduler import get_scheduler from swh.scheduler.exc import SchedulerException from swh.scheduler.interface import SchedulerInterface from .serializers import DECODERS, ENCODERS scheduler = None def get_global_scheduler(): global scheduler if not scheduler: scheduler = get_scheduler(**app.config["scheduler"]) return scheduler class SchedulerServerApp(RPCServerApp): extra_type_decoders = DECODERS extra_type_encoders = ENCODERS app = SchedulerServerApp( __name__, backend_class=SchedulerInterface, backend_factory=get_global_scheduler ) @app.errorhandler(SchedulerException) def argument_error_handler(exception): return error_handler(exception, encode_data, status_code=400) @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) def has_no_empty_params(rule): return len(rule.defaults or ()) >= len(rule.arguments or ()) @app.route("/") def index(): return """ Software Heritage scheduler RPC server

You have reached the Software Heritage scheduler RPC server.
See its documentation and API for more information

""" @app.route("/site-map") @negotiate(MsgpackFormatter) @negotiate(JSONFormatter) def site_map(): links = [] for rule in app.url_map.iter_rules(): if has_no_empty_params(rule) and hasattr(SchedulerInterface, rule.endpoint): links.append( dict( rule=rule.rule, description=getattr(SchedulerInterface, rule.endpoint).__doc__, ) ) # links is now a list of url, endpoint tuples return links def load_and_check_config(config_path, type="local"): """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_path (str): Path to the configuration file to load type (str): configuration type. For 'local' type, more checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_path: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_path): raise FileNotFoundError(f"Configuration file {config_path} does not exist") cfg = config.read(config_path) vcfg = cfg.get("scheduler") if not vcfg: raise KeyError("Missing '%scheduler' configuration") if type == "local": cls = vcfg.get("cls") - if cls != "local": + if cls not in ("local", "postgresql"): raise ValueError( - "The scheduler backend can only be started with a 'local' " + "The scheduler backend can only be started with a 'postgresql' " "configuration" ) db = vcfg.get("db") if not db: raise KeyError("Invalid configuration; missing 'db' config entry") return cfg api_cfg = None def make_app_from_configfile(): """Run the WSGI app from the webserver, loading the configuration from a configuration file. SWH_CONFIG_FILENAME environment variable defines the configuration path to load. """ global api_cfg if not api_cfg: config_path = os.environ.get("SWH_CONFIG_FILENAME") api_cfg = load_and_check_config(config_path) app.config.update(api_cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app if __name__ == "__main__": print('Please use the "swh-scheduler api-server" command') diff --git a/swh/scheduler/celery_backend/pika_listener.py b/swh/scheduler/celery_backend/pika_listener.py index a5c48f2..426b271 100644 --- a/swh/scheduler/celery_backend/pika_listener.py +++ b/swh/scheduler/celery_backend/pika_listener.py @@ -1,110 +1,112 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This is the scheduler listener. It is in charge of listening to rabbitmq events (the task result) and flushes the "oneshot" tasks' status in the scheduler backend. It's the final step after a task is done. The scheduler runner :mod:`swh.scheduler.celery_backend.runner` is the module in charge of pushing tasks in the queue. """ import json import logging import sys import pika from swh.core.statsd import statsd from swh.scheduler import get_scheduler from swh.scheduler.utils import utcnow logger = logging.getLogger(__name__) def get_listener(broker_url, queue_name, scheduler_backend): connection = pika.BlockingConnection(pika.URLParameters(broker_url)) channel = connection.channel() channel.queue_declare(queue=queue_name, durable=True) exchange = "celeryev" routing_key = "#" channel.queue_bind(queue=queue_name, exchange=exchange, routing_key=routing_key) channel.basic_qos(prefetch_count=1000) channel.basic_consume( queue=queue_name, on_message_callback=get_on_message(scheduler_backend), ) return channel def get_on_message(scheduler_backend): def on_message(channel, method_frame, properties, body): try: events = json.loads(body) except Exception: logger.warning("Could not parse body %r", body) events = [] if not isinstance(events, list): events = [events] for event in events: logger.debug("Received event %r", event) process_event(event, scheduler_backend) channel.basic_ack(delivery_tag=method_frame.delivery_tag) return on_message def process_event(event, scheduler_backend): uuid = event.get("uuid") if not uuid: return event_type = event["type"] statsd.increment( "swh_scheduler_listener_handled_event_total", tags={"event_type": event_type} ) if event_type == "task-started": scheduler_backend.start_task_run( uuid, timestamp=utcnow(), metadata={"worker": event.get("hostname")}, ) elif event_type == "task-result": result = event["result"] status = None if isinstance(result, dict) and "status" in result: status = result["status"] if status == "success": status = "eventful" if result.get("eventful") else "uneventful" if status is None: status = "eventful" if result else "uneventful" scheduler_backend.end_task_run( uuid, timestamp=utcnow(), status=status, result=result ) elif event_type == "task-failed": scheduler_backend.end_task_run(uuid, timestamp=utcnow(), status="failed") if __name__ == "__main__": url = sys.argv[1] logging.basicConfig(level=logging.DEBUG) - scheduler_backend = get_scheduler("local", args={"db": "service=swh-scheduler"}) + scheduler_backend = get_scheduler( + "postgresql", args={"db": "service=swh-scheduler"} + ) channel = get_listener(url, "celeryev.test", scheduler_backend) logger.info("Start consuming") channel.start_consuming() diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py index be59acf..525df02 100644 --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -1,184 +1,184 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """This is the first scheduler runner. It is in charge of scheduling "oneshot" tasks (e.g save code now, indexer, vault, deposit, ...). To do this, it reads tasks ouf of the scheduler backend and pushes those to their associated rabbitmq queues. The scheduler listener :mod:`swh.scheduler.celery_backend.pika_listener` is the module in charge of finalizing the task results. """ import logging from typing import Dict, List, Tuple from deprecated import deprecated from kombu.utils.uuid import uuid from swh.core.statsd import statsd from swh.scheduler import get_scheduler from swh.scheduler.celery_backend.config import get_available_slots from swh.scheduler.interface import SchedulerInterface from swh.scheduler.utils import utcnow logger = logging.getLogger(__name__) # Max batch size for tasks MAX_NUM_TASKS = 10000 def run_ready_tasks( backend: SchedulerInterface, app, task_types: List[Dict] = [], with_priority: bool = False, ) -> List[Dict]: """Schedule tasks ready to be scheduled. This lookups any tasks per task type and mass schedules those accordingly (send messages to rabbitmq and mark as scheduled equivalent tasks in the scheduler backend). If tasks (per task type) with priority exist, they will get redirected to dedicated high priority queue (standard queue name prefixed with `save_code_now:`). Args: backend: scheduler backend to interact with (read/update tasks) app (App): Celery application to send tasks to task_types: The list of task types dict to iterate over. By default, empty. When empty, the full list of task types referenced in the scheduler will be used. with_priority: If True, only tasks with priority set will be fetched and scheduled. By default, False. Returns: A list of dictionaries:: { 'task': the scheduler's task id, 'backend_id': Celery's task id, 'scheduler': utcnow() } The result can be used to block-wait for the tasks' results:: backend_tasks = run_ready_tasks(self.scheduler, app) for task in backend_tasks: AsyncResult(id=task['backend_id']).get() """ all_backend_tasks: List[Dict] = [] while True: if not task_types: task_types = backend.get_task_types() task_types_d = {} pending_tasks = [] for task_type in task_types: task_type_name = task_type["type"] task_types_d[task_type_name] = task_type max_queue_length = task_type["max_queue_length"] if max_queue_length is None: max_queue_length = 0 backend_name = task_type["backend_name"] if with_priority: # grab max_queue_length (or 10) potential tasks with any priority for # the same type (limit the result to avoid too long running queries) grabbed_priority_tasks = backend.grab_ready_priority_tasks( task_type_name, num_tasks=max_queue_length or 10 ) if grabbed_priority_tasks: pending_tasks.extend(grabbed_priority_tasks) logger.info( "Grabbed %s tasks %s (priority)", len(grabbed_priority_tasks), task_type_name, ) statsd.increment( "swh_scheduler_runner_scheduled_task_total", len(grabbed_priority_tasks), tags={"task_type": task_type_name}, ) else: num_tasks = get_available_slots(app, backend_name, max_queue_length) # only pull tasks if the buffer is at least 1/5th empty (= 80% # full), to help postgresql use properly indexed queries. if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: # Only grab num_tasks tasks with no priority grabbed_tasks = backend.grab_ready_tasks( task_type_name, num_tasks=num_tasks ) if grabbed_tasks: pending_tasks.extend(grabbed_tasks) logger.info( "Grabbed %s tasks %s", len(grabbed_tasks), task_type_name ) statsd.increment( "swh_scheduler_runner_scheduled_task_total", len(grabbed_tasks), tags={"task_type": task_type_name}, ) if not pending_tasks: return all_backend_tasks backend_tasks = [] celery_tasks: List[Tuple[bool, str, str, List, Dict]] = [] for task in pending_tasks: args = task["arguments"]["args"] kwargs = task["arguments"]["kwargs"] backend_name = task_types_d[task["type"]]["backend_name"] backend_id = uuid() celery_tasks.append( ( task.get("priority") is not None, backend_name, backend_id, args, kwargs, ) ) data = { "task": task["id"], "backend_id": backend_id, "scheduled": utcnow(), } backend_tasks.append(data) logger.debug("Sent %s celery tasks", len(backend_tasks)) backend.mass_schedule_task_runs(backend_tasks) for with_priority, backend_name, backend_id, args, kwargs in celery_tasks: kw = dict( task_id=backend_id, args=args, kwargs=kwargs, ) if with_priority: kw["queue"] = f"save_code_now:{backend_name}" app.send_task(backend_name, **kw) all_backend_tasks.extend(backend_tasks) @deprecated(version="0.18", reason="Use `swh scheduler start-runner` instead") def main(): from .config import app as main_app for module in main_app.conf.CELERY_IMPORTS: __import__(module) - main_backend = get_scheduler("local") + main_backend = get_scheduler("postgresql") try: run_ready_tasks(main_backend, main_app) except Exception: main_backend.rollback() raise if __name__ == "__main__": main() diff --git a/swh/scheduler/cli/__init__.py b/swh/scheduler/cli/__init__.py index cda7259..8eeadaa 100644 --- a/swh/scheduler/cli/__init__.py +++ b/swh/scheduler/cli/__init__.py @@ -1,102 +1,102 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import click from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.core.cli import swh as swh_cli_group # If you're looking for subcommand imports, they are further down this file to # avoid a circular import! @swh_cli_group.group( name="scheduler", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup ) @click.option( "--config-file", "-C", default=None, type=click.Path( exists=True, dir_okay=False, ), help="Configuration file.", ) @click.option( "--database", "-d", default=None, - help="Scheduling database DSN (imply cls is 'local')", + help="Scheduling database DSN (imply cls is 'postgresql')", ) @click.option( "--url", "-u", default=None, help="Scheduler's url access (imply cls is 'remote')" ) @click.option( "--no-stdout", is_flag=True, default=False, help="Do NOT output logs on the console" ) @click.pass_context def cli(ctx, config_file, database, url, no_stdout): """Software Heritage Scheduler tools. Use a local scheduler instance by default (plugged to the main scheduler db). """ try: from psycopg2 import OperationalError except ImportError: class OperationalError(Exception): pass from swh.core import config from swh.scheduler import DEFAULT_CONFIG, get_scheduler ctx.ensure_object(dict) logger = logging.getLogger(__name__) scheduler = None conf = config.read(config_file, DEFAULT_CONFIG) if "scheduler" not in conf: raise ValueError("missing 'scheduler' configuration") if database: - conf["scheduler"]["cls"] = "local" + conf["scheduler"]["cls"] = "postgresql" conf["scheduler"]["db"] = database elif url: conf["scheduler"]["cls"] = "remote" conf["scheduler"]["url"] = url sched_conf = conf["scheduler"] try: logger.debug("Instantiating scheduler with %s", sched_conf) scheduler = get_scheduler(**sched_conf) except (ValueError, OperationalError): # it's the subcommand to decide whether not having a proper # scheduler instance is a problem. pass ctx.obj["scheduler"] = scheduler ctx.obj["config"] = conf from . import admin, celery_monitor, journal, origin, simulator, task, task_type # noqa def main(): import click.core click.core.DEPRECATED_HELP_NOTICE = """ DEPRECATED! Please use the command 'swh scheduler'.""" cli.deprecated = True return cli(auto_envvar_prefix="SWH_SCHEDULER") if __name__ == "__main__": main() diff --git a/swh/scheduler/pytest_plugin.py b/swh/scheduler/pytest_plugin.py index dfa3165..f569bb4 100644 --- a/swh/scheduler/pytest_plugin.py +++ b/swh/scheduler/pytest_plugin.py @@ -1,111 +1,111 @@ # Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta from functools import partial from celery.contrib.testing import worker from celery.contrib.testing.app import TestApp, setup_default_app import pkg_resources import pytest from pytest_postgresql import factories from swh.core.db.pytest_plugin import initialize_database_for_module from swh.scheduler import get_scheduler from swh.scheduler.backend import SchedulerBackend # celery tasks for testing purpose; tasks themselves should be # in swh/scheduler/tests/tasks.py TASK_NAMES = ["ping", "multiping", "add", "error", "echo"] scheduler_postgresql_proc = factories.postgresql_proc( load=[ partial( initialize_database_for_module, modname="scheduler", version=SchedulerBackend.current_version, ) ], ) postgresql_scheduler = factories.postgresql("scheduler_postgresql_proc") @pytest.fixture def swh_scheduler_config(request, postgresql_scheduler): return { "db": postgresql_scheduler.dsn, } @pytest.fixture def swh_scheduler(swh_scheduler_config): - scheduler = get_scheduler("local", **swh_scheduler_config) + scheduler = get_scheduler("postgresql", **swh_scheduler_config) for taskname in TASK_NAMES: scheduler.create_task_type( { "type": "swh-test-{}".format(taskname), "description": "The {} testing task".format(taskname), "backend_name": "swh.scheduler.tests.tasks.{}".format(taskname), "default_interval": timedelta(days=1), "min_interval": timedelta(hours=6), "max_interval": timedelta(days=12), } ) return scheduler # this alias is used to be able to easily instantiate a db-backed Scheduler # eg. for the RPC client/server test suite. swh_db_scheduler = swh_scheduler @pytest.fixture(scope="session") def swh_scheduler_celery_app(): """Set up a Celery app as swh.scheduler and swh worker tests would expect it""" test_app = TestApp( set_as_current=True, enable_logging=True, task_cls="swh.scheduler.task:SWHTask", config={ "accept_content": ["application/x-msgpack", "application/json"], "broker_url": "memory://guest@localhost//", "task_serializer": "msgpack", "result_serializer": "json", }, ) with setup_default_app(test_app, use_trap=False): from swh.scheduler.celery_backend import config config.app = test_app test_app.set_default() test_app.set_current() yield test_app @pytest.fixture(scope="session") def swh_scheduler_celery_includes(): """List of task modules that should be loaded by the swh_scheduler_celery_worker on startup.""" task_modules = ["swh.scheduler.tests.tasks"] for entrypoint in pkg_resources.iter_entry_points("swh.workers"): task_modules.extend(entrypoint.load()().get("task_modules", [])) return task_modules @pytest.fixture(scope="session") def swh_scheduler_celery_worker( swh_scheduler_celery_app, swh_scheduler_celery_includes, ): """Spawn a worker""" for module in swh_scheduler_celery_includes: swh_scheduler_celery_app.loader.import_task_module(module) with worker.start_worker(swh_scheduler_celery_app, pool="solo") as w: yield w diff --git a/swh/scheduler/tests/test_cli_journal.py b/swh/scheduler/tests/test_cli_journal.py index 3c7e723..023385f 100644 --- a/swh/scheduler/tests/test_cli_journal.py +++ b/swh/scheduler/tests/test_cli_journal.py @@ -1,132 +1,132 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Dict, List from click.testing import CliRunner, Result from confluent_kafka import Producer import pytest import yaml from swh.journal.serializers import value_to_kafka from swh.scheduler import get_scheduler from swh.scheduler.cli import cli from swh.scheduler.tests.test_journal_client import VISIT_STATUSES_1 @pytest.fixture def swh_scheduler_cfg(postgresql_scheduler, kafka_server): """Journal client configuration ready""" return { "scheduler": { - "cls": "local", + "cls": "postgresql", "db": postgresql_scheduler.dsn, }, "journal": { "brokers": [kafka_server], "group_id": "test-consume-visit-status", }, } def _write_configuration_path(config: Dict, tmp_path: str) -> str: config_path = os.path.join(str(tmp_path), "scheduler.yml") with open(config_path, "w") as f: f.write(yaml.dump(config)) return config_path @pytest.fixture def swh_scheduler_cfg_path(swh_scheduler_cfg, tmp_path): """Write scheduler configuration in temporary path and returns such path""" return _write_configuration_path(swh_scheduler_cfg, tmp_path) def invoke(args: List[str], config_path: str, catch_exceptions: bool = False) -> Result: """Invoke swh scheduler journal subcommands""" runner = CliRunner() result = runner.invoke(cli, ["-C" + config_path] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_cli_journal_client_origin_visit_status_misconfiguration_no_scheduler( swh_scheduler_cfg, tmp_path ): config = swh_scheduler_cfg.copy() config["scheduler"] = {"cls": "foo"} config_path = _write_configuration_path(config, tmp_path) with pytest.raises(ValueError, match="must be instantiated"): invoke( [ "journal-client", "--stop-after-objects", "1", ], config_path, ) def test_cli_journal_client_origin_visit_status_misconfiguration_missing_journal_conf( swh_scheduler_cfg, tmp_path ): config = swh_scheduler_cfg.copy() config.pop("journal", None) config_path = _write_configuration_path(config, tmp_path) with pytest.raises(ValueError, match="Missing 'journal'"): invoke( [ "journal-client", "--stop-after-objects", "1", ], config_path, ) def test_cli_journal_client_origin_visit_status( swh_scheduler_cfg, swh_scheduler_cfg_path, ): kafka_server = swh_scheduler_cfg["journal"]["brokers"][0] swh_scheduler = get_scheduler(**swh_scheduler_cfg["scheduler"]) producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test visit-stats producer", "acks": "all", } ) visit_status = VISIT_STATUSES_1[0] value = value_to_kafka(visit_status) topic = "swh.journal.objects.origin_visit_status" producer.produce(topic=topic, key=b"bogus-origin", value=value) producer.flush() result = invoke( [ "journal-client", "--stop-after-objects", "1", ], swh_scheduler_cfg_path, ) # Check the output expected_output = "Processed 1 message(s).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output actual_visit_stats = swh_scheduler.origin_visit_stats_get( [(visit_status["origin"], visit_status["type"])] ) assert actual_visit_stats assert len(actual_visit_stats) == 1 diff --git a/swh/scheduler/tests/test_init.py b/swh/scheduler/tests/test_init.py index 9a97548..7673d6d 100644 --- a/swh/scheduler/tests/test_init.py +++ b/swh/scheduler/tests/test_init.py @@ -1,77 +1,77 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import inspect import pytest from swh.scheduler import get_scheduler from swh.scheduler.api.client import RemoteScheduler from swh.scheduler.backend import SchedulerBackend from swh.scheduler.interface import SchedulerInterface SERVER_IMPLEMENTATIONS = [ ("remote", RemoteScheduler, {"url": "localhost"}), - ("local", SchedulerBackend, {"db": "something"}), + ("postgresql", SchedulerBackend, {"db": "something"}), ] @pytest.fixture def mock_psycopg2(mocker): mocker.patch("swh.scheduler.backend.psycopg2.pool") def test_init_get_scheduler_failure(): with pytest.raises(ValueError, match="Unknown Scheduler class"): get_scheduler("unknown-scheduler-storage") @pytest.mark.parametrize("class_name,expected_class,kwargs", SERVER_IMPLEMENTATIONS) def test_init_get_scheduler(class_name, expected_class, kwargs, mock_psycopg2): concrete_scheduler = get_scheduler(class_name, **kwargs) assert isinstance(concrete_scheduler, expected_class) assert isinstance(concrete_scheduler, SchedulerInterface) @pytest.mark.parametrize("class_name,expected_class,kwargs", SERVER_IMPLEMENTATIONS) def test_init_get_scheduler_deprecation_warning( class_name, expected_class, kwargs, mock_psycopg2 ): with pytest.warns(DeprecationWarning): concrete_scheduler = get_scheduler(class_name, args=kwargs) assert isinstance(concrete_scheduler, expected_class) def test_types(swh_scheduler) -> None: """Checks all methods of SchedulerInterface are implemented by this backend, and that they have the same signature.""" # Create an instance of the protocol (which cannot be instantiated # directly, so this creates a subclass, then instantiates it) interface = type("_", (SchedulerInterface,), {})() missing_methods = [] for meth_name in dir(interface): if meth_name.startswith("_"): continue interface_meth = getattr(interface, meth_name) try: concrete_meth = getattr(swh_scheduler, meth_name) except AttributeError: missing_methods.append(meth_name) continue expected_signature = inspect.signature(interface_meth) actual_signature = inspect.signature(concrete_meth) assert expected_signature == actual_signature, meth_name assert missing_methods == [] # If all the assertions above succeed, then this one should too. # But there's no harm in double-checking. # And we could replace the assertions above by this one, but unlike # the assertions above, it doesn't explain what is missing. assert isinstance(swh_scheduler, SchedulerInterface) diff --git a/swh/scheduler/tests/test_recurrent_visits.py b/swh/scheduler/tests/test_recurrent_visits.py index b5e9ebf..ad3cfd2 100644 --- a/swh/scheduler/tests/test_recurrent_visits.py +++ b/swh/scheduler/tests/test_recurrent_visits.py @@ -1,216 +1,216 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta import logging from queue import Queue from unittest.mock import MagicMock import pytest from swh.scheduler.celery_backend.recurrent_visits import ( DEFAULT_DVCS_POLICY, VisitSchedulerThreads, grab_next_visits_policy_weights, send_visits_for_visit_type, spawn_visit_scheduler_thread, terminate_visit_scheduler_threads, visit_scheduler_thread, ) from .test_cli import invoke TEST_MAX_QUEUE = 10000 MODULE_NAME = "swh.scheduler.celery_backend.recurrent_visits" def _compute_backend_name(visit_type: str) -> str: "Build a dummy reproducible backend name" return f"swh.loader.{visit_type}.tasks" @pytest.fixture def swh_scheduler(swh_scheduler): """Override default fixture of the scheduler to install some more task types.""" for visit_type in ["test-git", "test-hg", "test-svn"]: task_type = f"load-{visit_type}" swh_scheduler.create_task_type( { "type": task_type, "max_queue_length": TEST_MAX_QUEUE, "description": "The {} testing task".format(task_type), "backend_name": _compute_backend_name(visit_type), "default_interval": timedelta(days=1), "min_interval": timedelta(hours=6), "max_interval": timedelta(days=12), } ) return swh_scheduler def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler): """When passed an unknown visit type, the recurrent visit scheduler should refuse to start.""" with pytest.raises(ValueError, match="Unknown"): invoke( swh_scheduler, False, [ "schedule-recurrent", "--visit-type", "unknown", "--visit-type", "test-git", ], ) def test_cli_schedule_recurrent_noop(swh_scheduler, mocker): """When passing no visit types, the recurrent visit scheduler should start.""" spawn_visit_scheduler_thread = mocker.patch( f"{MODULE_NAME}.spawn_visit_scheduler_thread" ) spawn_visit_scheduler_thread.side_effect = SystemExit # The actual scheduling threads won't spawn, they'll immediately terminate. This # only exercises the logic to pull task types out of the database result = invoke(swh_scheduler, False, ["schedule-recurrent"]) assert result.exit_code == 0, result.output def test_recurrent_visit_scheduling( swh_scheduler, caplog, listed_origins_by_type, mocker, ): """Scheduling known tasks is ok.""" caplog.set_level(logging.DEBUG, MODULE_NAME) nb_origins = 1000 mock_celery_app = MagicMock() mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots") mock_available_slots.return_value = nb_origins # Slots available in queue # Make sure the scheduler is properly configured in terms of visit/task types all_task_types = { task_type_d["type"]: task_type_d for task_type_d in swh_scheduler.get_task_types() } visit_types = list(listed_origins_by_type.keys()) assert len(visit_types) > 0 task_types = [] origins = [] for visit_type, _origins in listed_origins_by_type.items(): origins.extend(swh_scheduler.record_listed_origins(_origins)) task_type_name = f"load-{visit_type}" assert task_type_name in all_task_types.keys() task_type = all_task_types[task_type_name] task_type["visit_type"] = visit_type # we'll limit the orchestrator to the origins' type we know task_types.append(task_type) for visit_type in ["test-git", "test-svn"]: task_type = f"load-{visit_type}" send_visits_for_visit_type( swh_scheduler, mock_celery_app, visit_type, all_task_types[task_type], DEFAULT_DVCS_POLICY, ) assert mock_available_slots.called, "The available slots functions should be called" records = [record.message for record in caplog.records] # Mapping over the dict ratio/policies entries can change overall order so let's # check the set of records expected_records = set() for task_type in task_types: visit_type = task_type["visit_type"] queue_name = task_type["backend_name"] msg = ( f"{nb_origins} available slots for visit type {visit_type} " f"in queue {queue_name}" ) expected_records.add(msg) for expected_record in expected_records: assert expected_record in set(records) @pytest.mark.parametrize( "visit_type, extras", [("test-hg", {}), ("test-git", {"tablesample": 0.1})], ) def test_recurrent_visit_additional_parameters( swh_scheduler, mocker, visit_type, extras ): """Testing additional policy parameters""" mock_grab_next_visits = mocker.patch.object(swh_scheduler, "grab_next_visits") mock_grab_next_visits.return_value = [] policy_cfg = DEFAULT_DVCS_POLICY[:] for policy in policy_cfg: policy.update(extras) grab_next_visits_policy_weights(swh_scheduler, visit_type, 10, policy_cfg) for call in mock_grab_next_visits.call_args_list: assert call[1].get("tablesample") == extras.get("tablesample") @pytest.fixture def scheduler_config(swh_scheduler_config): - return {"scheduler": {"cls": "local", **swh_scheduler_config}, "celery": {}} + return {"scheduler": {"cls": "postgresql", **swh_scheduler_config}, "celery": {}} def test_visit_scheduler_thread_unknown_task( swh_scheduler, scheduler_config, ): """Starting a thread with unknown task type reports the error""" unknown_visit_type = "unknown" command_queue = Queue() exc_queue = Queue() visit_scheduler_thread( scheduler_config, unknown_visit_type, command_queue, exc_queue ) assert command_queue.empty() is True assert exc_queue.empty() is False assert len(exc_queue.queue) == 1 result = exc_queue.queue.pop() assert result[0] == unknown_visit_type assert isinstance(result[1], ValueError) def test_spawn_visit_scheduler_thread_noop(scheduler_config, visit_types, mocker): """Spawning and terminating threads runs smoothly""" threads: VisitSchedulerThreads = {} exc_queue = Queue() mock_build_app = mocker.patch("swh.scheduler.celery_backend.config.build_app") mock_build_app.return_value = MagicMock() assert len(threads) == 0 for visit_type in visit_types: spawn_visit_scheduler_thread(threads, exc_queue, scheduler_config, visit_type) # This actually only checks the spawning and terminating logic is sound assert len(threads) == len(visit_types) actual_threads = terminate_visit_scheduler_threads(threads) assert not len(actual_threads) assert mock_build_app.called diff --git a/swh/scheduler/tests/test_server.py b/swh/scheduler/tests/test_server.py index a678dd8..50c5b41 100644 --- a/swh/scheduler/tests/test_server.py +++ b/swh/scheduler/tests/test_server.py @@ -1,100 +1,100 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest import yaml from swh.scheduler.api.server import load_and_check_config def prepare_config_file(tmpdir, content, name="config.yml"): """Prepare configuration file in `$tmpdir/name` with content `content`. Args: tmpdir (LocalPath): root directory content (str/dict): Content of the file either as string or as a dict. If a dict, converts the dict into a yaml string. name (str): configuration filename Returns path (str) of the configuration file prepared. """ config_path = tmpdir / name if isinstance(content, dict): # convert if needed content = yaml.dump(content) config_path.write_text(content, encoding="utf-8") # pytest on python3.5 does not support LocalPath manipulation, so # convert path to string return str(config_path) @pytest.mark.parametrize("scheduler_class", [None, ""]) def test_load_and_check_config_no_configuration(scheduler_class): """Inexistent configuration files raises""" with pytest.raises(EnvironmentError, match="Configuration file must be defined"): load_and_check_config(scheduler_class) def test_load_and_check_config_inexistent_fil(): """Inexistent config filepath should raise""" config_path = "/some/inexistent/config.yml" expected_error = f"Configuration file {config_path} does not exist" with pytest.raises(FileNotFoundError, match=expected_error): load_and_check_config(config_path) def test_load_and_check_config_wrong_configuration(tmpdir): """Wrong configuration raises""" config_path = prepare_config_file(tmpdir, "something: useless") with pytest.raises(KeyError, match="Missing '%scheduler' configuration"): load_and_check_config(config_path) def test_load_and_check_config_remote_config_local_type_raise(tmpdir): """Configuration without 'local' storage is rejected""" config = {"scheduler": {"cls": "remote"}} config_path = prepare_config_file(tmpdir, config) expected_error = ( - "The scheduler backend can only be started with a 'local'" " configuration" + "The scheduler backend can only be started with a 'postgresql'" " configuration" ) with pytest.raises(ValueError, match=expected_error): load_and_check_config(config_path, type="local") def test_load_and_check_config_local_incomplete_configuration(tmpdir): """Incomplete 'local' configuration should raise""" config = { "scheduler": { - "cls": "local", + "cls": "postgresql", "something": "needed-for-test", } } config_path = prepare_config_file(tmpdir, config) expected_error = "Invalid configuration; missing 'db' config entry" with pytest.raises(KeyError, match=expected_error): load_and_check_config(config_path) def test_load_and_check_config_local_config_fine(tmpdir): """Local configuration is fine""" config = { "scheduler": { - "cls": "local", + "cls": "postgresql", "db": "db", } } config_path = prepare_config_file(tmpdir, config) cfg = load_and_check_config(config_path, type="local") assert cfg == config def test_load_and_check_config_remote_config_fine(tmpdir): """Remote configuration is fine""" config = {"scheduler": {"cls": "remote"}} config_path = prepare_config_file(tmpdir, config) cfg = load_and_check_config(config_path, type="any") assert cfg == config