diff --git a/swh/scheduler/__init__.py b/swh/scheduler/__init__.py
index b1f98ba..cfb15fa 100644
--- a/swh/scheduler/__init__.py
+++ b/swh/scheduler/__init__.py
@@ -1,76 +1,76 @@
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
from importlib import import_module
from typing import TYPE_CHECKING, Any, Dict
import warnings
DEFAULT_CONFIG = {
"scheduler": (
"dict",
{
- "cls": "local",
+ "cls": "postgresql",
"db": "dbname=softwareheritage-scheduler-dev",
},
)
}
# current configuration. To be set by the config loading mechanism
CONFIG = {} # type: Dict[str, Any]
if TYPE_CHECKING:
from swh.scheduler.interface import SchedulerInterface
BACKEND_TYPES: Dict[str, str] = {
"postgresql": ".backend.SchedulerBackend",
"remote": ".api.client.RemoteScheduler",
# deprecated
"local": ".backend.SchedulerBackend",
}
def get_scheduler(cls: str, **kwargs) -> SchedulerInterface:
"""
Get a scheduler object of class `cls` with arguments `**kwargs`.
Args:
cls: scheduler's class, either 'local' or 'remote'
kwargs: arguments to pass to the class' constructor
Returns:
an instance of swh.scheduler, either local or remote:
local: swh.scheduler.backend.SchedulerBackend
remote: swh.scheduler.api.client.RemoteScheduler
Raises:
ValueError if passed an unknown storage class.
"""
if "args" in kwargs:
warnings.warn(
'Explicit "args" key is deprecated, use keys directly instead.',
DeprecationWarning,
)
kwargs = kwargs["args"]
class_path = BACKEND_TYPES.get(cls)
if class_path is None:
raise ValueError(
f"Unknown Scheduler class `{cls}`. "
f"Supported: {', '.join(BACKEND_TYPES)}"
)
(module_path, class_name) = class_path.rsplit(".", 1)
module = import_module(module_path, package=__package__)
BackendClass = getattr(module, class_name)
return BackendClass(**kwargs)
get_datastore = get_scheduler
diff --git a/swh/scheduler/api/server.py b/swh/scheduler/api/server.py
index 5854095..abc3ab8 100644
--- a/swh/scheduler/api/server.py
+++ b/swh/scheduler/api/server.py
@@ -1,150 +1,150 @@
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import os
from swh.core import config
from swh.core.api import JSONFormatter, MsgpackFormatter, RPCServerApp
from swh.core.api import encode_data_server as encode_data
from swh.core.api import error_handler, negotiate
from swh.scheduler import get_scheduler
from swh.scheduler.exc import SchedulerException
from swh.scheduler.interface import SchedulerInterface
from .serializers import DECODERS, ENCODERS
scheduler = None
def get_global_scheduler():
global scheduler
if not scheduler:
scheduler = get_scheduler(**app.config["scheduler"])
return scheduler
class SchedulerServerApp(RPCServerApp):
extra_type_decoders = DECODERS
extra_type_encoders = ENCODERS
app = SchedulerServerApp(
__name__, backend_class=SchedulerInterface, backend_factory=get_global_scheduler
)
@app.errorhandler(SchedulerException)
def argument_error_handler(exception):
return error_handler(exception, encode_data, status_code=400)
@app.errorhandler(Exception)
def my_error_handler(exception):
return error_handler(exception, encode_data)
def has_no_empty_params(rule):
return len(rule.defaults or ()) >= len(rule.arguments or ())
@app.route("/")
def index():
return """
Software Heritage scheduler RPC server
You have reached the
Software Heritage
scheduler RPC server.
See its
documentation
and API for more information
"""
@app.route("/site-map")
@negotiate(MsgpackFormatter)
@negotiate(JSONFormatter)
def site_map():
links = []
for rule in app.url_map.iter_rules():
if has_no_empty_params(rule) and hasattr(SchedulerInterface, rule.endpoint):
links.append(
dict(
rule=rule.rule,
description=getattr(SchedulerInterface, rule.endpoint).__doc__,
)
)
# links is now a list of url, endpoint tuples
return links
def load_and_check_config(config_path, type="local"):
"""Check the minimal configuration is set to run the api or raise an
error explanation.
Args:
config_path (str): Path to the configuration file to load
type (str): configuration type. For 'local' type, more
checks are done.
Raises:
Error if the setup is not as expected
Returns:
configuration as a dict
"""
if not config_path:
raise EnvironmentError("Configuration file must be defined")
if not os.path.exists(config_path):
raise FileNotFoundError(f"Configuration file {config_path} does not exist")
cfg = config.read(config_path)
vcfg = cfg.get("scheduler")
if not vcfg:
raise KeyError("Missing '%scheduler' configuration")
if type == "local":
cls = vcfg.get("cls")
- if cls != "local":
+ if cls not in ("local", "postgresql"):
raise ValueError(
- "The scheduler backend can only be started with a 'local' "
+ "The scheduler backend can only be started with a 'postgresql' "
"configuration"
)
db = vcfg.get("db")
if not db:
raise KeyError("Invalid configuration; missing 'db' config entry")
return cfg
api_cfg = None
def make_app_from_configfile():
"""Run the WSGI app from the webserver, loading the configuration from
a configuration file.
SWH_CONFIG_FILENAME environment variable defines the
configuration path to load.
"""
global api_cfg
if not api_cfg:
config_path = os.environ.get("SWH_CONFIG_FILENAME")
api_cfg = load_and_check_config(config_path)
app.config.update(api_cfg)
handler = logging.StreamHandler()
app.logger.addHandler(handler)
return app
if __name__ == "__main__":
print('Please use the "swh-scheduler api-server" command')
diff --git a/swh/scheduler/celery_backend/pika_listener.py b/swh/scheduler/celery_backend/pika_listener.py
index a5c48f2..426b271 100644
--- a/swh/scheduler/celery_backend/pika_listener.py
+++ b/swh/scheduler/celery_backend/pika_listener.py
@@ -1,110 +1,112 @@
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""This is the scheduler listener. It is in charge of listening to rabbitmq events (the
task result) and flushes the "oneshot" tasks' status in the scheduler backend. It's the
final step after a task is done.
The scheduler runner :mod:`swh.scheduler.celery_backend.runner` is the module in charge
of pushing tasks in the queue.
"""
import json
import logging
import sys
import pika
from swh.core.statsd import statsd
from swh.scheduler import get_scheduler
from swh.scheduler.utils import utcnow
logger = logging.getLogger(__name__)
def get_listener(broker_url, queue_name, scheduler_backend):
connection = pika.BlockingConnection(pika.URLParameters(broker_url))
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
exchange = "celeryev"
routing_key = "#"
channel.queue_bind(queue=queue_name, exchange=exchange, routing_key=routing_key)
channel.basic_qos(prefetch_count=1000)
channel.basic_consume(
queue=queue_name,
on_message_callback=get_on_message(scheduler_backend),
)
return channel
def get_on_message(scheduler_backend):
def on_message(channel, method_frame, properties, body):
try:
events = json.loads(body)
except Exception:
logger.warning("Could not parse body %r", body)
events = []
if not isinstance(events, list):
events = [events]
for event in events:
logger.debug("Received event %r", event)
process_event(event, scheduler_backend)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
return on_message
def process_event(event, scheduler_backend):
uuid = event.get("uuid")
if not uuid:
return
event_type = event["type"]
statsd.increment(
"swh_scheduler_listener_handled_event_total", tags={"event_type": event_type}
)
if event_type == "task-started":
scheduler_backend.start_task_run(
uuid,
timestamp=utcnow(),
metadata={"worker": event.get("hostname")},
)
elif event_type == "task-result":
result = event["result"]
status = None
if isinstance(result, dict) and "status" in result:
status = result["status"]
if status == "success":
status = "eventful" if result.get("eventful") else "uneventful"
if status is None:
status = "eventful" if result else "uneventful"
scheduler_backend.end_task_run(
uuid, timestamp=utcnow(), status=status, result=result
)
elif event_type == "task-failed":
scheduler_backend.end_task_run(uuid, timestamp=utcnow(), status="failed")
if __name__ == "__main__":
url = sys.argv[1]
logging.basicConfig(level=logging.DEBUG)
- scheduler_backend = get_scheduler("local", args={"db": "service=swh-scheduler"})
+ scheduler_backend = get_scheduler(
+ "postgresql", args={"db": "service=swh-scheduler"}
+ )
channel = get_listener(url, "celeryev.test", scheduler_backend)
logger.info("Start consuming")
channel.start_consuming()
diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py
index be59acf..525df02 100644
--- a/swh/scheduler/celery_backend/runner.py
+++ b/swh/scheduler/celery_backend/runner.py
@@ -1,184 +1,184 @@
# Copyright (C) 2015-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""This is the first scheduler runner. It is in charge of scheduling "oneshot" tasks
(e.g save code now, indexer, vault, deposit, ...). To do this, it reads tasks ouf of the
scheduler backend and pushes those to their associated rabbitmq queues.
The scheduler listener :mod:`swh.scheduler.celery_backend.pika_listener` is the module
in charge of finalizing the task results.
"""
import logging
from typing import Dict, List, Tuple
from deprecated import deprecated
from kombu.utils.uuid import uuid
from swh.core.statsd import statsd
from swh.scheduler import get_scheduler
from swh.scheduler.celery_backend.config import get_available_slots
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.utils import utcnow
logger = logging.getLogger(__name__)
# Max batch size for tasks
MAX_NUM_TASKS = 10000
def run_ready_tasks(
backend: SchedulerInterface,
app,
task_types: List[Dict] = [],
with_priority: bool = False,
) -> List[Dict]:
"""Schedule tasks ready to be scheduled.
This lookups any tasks per task type and mass schedules those accordingly (send
messages to rabbitmq and mark as scheduled equivalent tasks in the scheduler
backend).
If tasks (per task type) with priority exist, they will get redirected to dedicated
high priority queue (standard queue name prefixed with `save_code_now:`).
Args:
backend: scheduler backend to interact with (read/update tasks)
app (App): Celery application to send tasks to
task_types: The list of task types dict to iterate over. By default, empty.
When empty, the full list of task types referenced in the scheduler will be
used.
with_priority: If True, only tasks with priority set will be fetched and
scheduled. By default, False.
Returns:
A list of dictionaries::
{
'task': the scheduler's task id,
'backend_id': Celery's task id,
'scheduler': utcnow()
}
The result can be used to block-wait for the tasks' results::
backend_tasks = run_ready_tasks(self.scheduler, app)
for task in backend_tasks:
AsyncResult(id=task['backend_id']).get()
"""
all_backend_tasks: List[Dict] = []
while True:
if not task_types:
task_types = backend.get_task_types()
task_types_d = {}
pending_tasks = []
for task_type in task_types:
task_type_name = task_type["type"]
task_types_d[task_type_name] = task_type
max_queue_length = task_type["max_queue_length"]
if max_queue_length is None:
max_queue_length = 0
backend_name = task_type["backend_name"]
if with_priority:
# grab max_queue_length (or 10) potential tasks with any priority for
# the same type (limit the result to avoid too long running queries)
grabbed_priority_tasks = backend.grab_ready_priority_tasks(
task_type_name, num_tasks=max_queue_length or 10
)
if grabbed_priority_tasks:
pending_tasks.extend(grabbed_priority_tasks)
logger.info(
"Grabbed %s tasks %s (priority)",
len(grabbed_priority_tasks),
task_type_name,
)
statsd.increment(
"swh_scheduler_runner_scheduled_task_total",
len(grabbed_priority_tasks),
tags={"task_type": task_type_name},
)
else:
num_tasks = get_available_slots(app, backend_name, max_queue_length)
# only pull tasks if the buffer is at least 1/5th empty (= 80%
# full), to help postgresql use properly indexed queries.
if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5:
# Only grab num_tasks tasks with no priority
grabbed_tasks = backend.grab_ready_tasks(
task_type_name, num_tasks=num_tasks
)
if grabbed_tasks:
pending_tasks.extend(grabbed_tasks)
logger.info(
"Grabbed %s tasks %s", len(grabbed_tasks), task_type_name
)
statsd.increment(
"swh_scheduler_runner_scheduled_task_total",
len(grabbed_tasks),
tags={"task_type": task_type_name},
)
if not pending_tasks:
return all_backend_tasks
backend_tasks = []
celery_tasks: List[Tuple[bool, str, str, List, Dict]] = []
for task in pending_tasks:
args = task["arguments"]["args"]
kwargs = task["arguments"]["kwargs"]
backend_name = task_types_d[task["type"]]["backend_name"]
backend_id = uuid()
celery_tasks.append(
(
task.get("priority") is not None,
backend_name,
backend_id,
args,
kwargs,
)
)
data = {
"task": task["id"],
"backend_id": backend_id,
"scheduled": utcnow(),
}
backend_tasks.append(data)
logger.debug("Sent %s celery tasks", len(backend_tasks))
backend.mass_schedule_task_runs(backend_tasks)
for with_priority, backend_name, backend_id, args, kwargs in celery_tasks:
kw = dict(
task_id=backend_id,
args=args,
kwargs=kwargs,
)
if with_priority:
kw["queue"] = f"save_code_now:{backend_name}"
app.send_task(backend_name, **kw)
all_backend_tasks.extend(backend_tasks)
@deprecated(version="0.18", reason="Use `swh scheduler start-runner` instead")
def main():
from .config import app as main_app
for module in main_app.conf.CELERY_IMPORTS:
__import__(module)
- main_backend = get_scheduler("local")
+ main_backend = get_scheduler("postgresql")
try:
run_ready_tasks(main_backend, main_app)
except Exception:
main_backend.rollback()
raise
if __name__ == "__main__":
main()
diff --git a/swh/scheduler/cli/__init__.py b/swh/scheduler/cli/__init__.py
index cda7259..8eeadaa 100644
--- a/swh/scheduler/cli/__init__.py
+++ b/swh/scheduler/cli/__init__.py
@@ -1,102 +1,102 @@
# Copyright (C) 2016-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import logging
import click
from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup
from swh.core.cli import swh as swh_cli_group
# If you're looking for subcommand imports, they are further down this file to
# avoid a circular import!
@swh_cli_group.group(
name="scheduler", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup
)
@click.option(
"--config-file",
"-C",
default=None,
type=click.Path(
exists=True,
dir_okay=False,
),
help="Configuration file.",
)
@click.option(
"--database",
"-d",
default=None,
- help="Scheduling database DSN (imply cls is 'local')",
+ help="Scheduling database DSN (imply cls is 'postgresql')",
)
@click.option(
"--url", "-u", default=None, help="Scheduler's url access (imply cls is 'remote')"
)
@click.option(
"--no-stdout", is_flag=True, default=False, help="Do NOT output logs on the console"
)
@click.pass_context
def cli(ctx, config_file, database, url, no_stdout):
"""Software Heritage Scheduler tools.
Use a local scheduler instance by default (plugged to the
main scheduler db).
"""
try:
from psycopg2 import OperationalError
except ImportError:
class OperationalError(Exception):
pass
from swh.core import config
from swh.scheduler import DEFAULT_CONFIG, get_scheduler
ctx.ensure_object(dict)
logger = logging.getLogger(__name__)
scheduler = None
conf = config.read(config_file, DEFAULT_CONFIG)
if "scheduler" not in conf:
raise ValueError("missing 'scheduler' configuration")
if database:
- conf["scheduler"]["cls"] = "local"
+ conf["scheduler"]["cls"] = "postgresql"
conf["scheduler"]["db"] = database
elif url:
conf["scheduler"]["cls"] = "remote"
conf["scheduler"]["url"] = url
sched_conf = conf["scheduler"]
try:
logger.debug("Instantiating scheduler with %s", sched_conf)
scheduler = get_scheduler(**sched_conf)
except (ValueError, OperationalError):
# it's the subcommand to decide whether not having a proper
# scheduler instance is a problem.
pass
ctx.obj["scheduler"] = scheduler
ctx.obj["config"] = conf
from . import admin, celery_monitor, journal, origin, simulator, task, task_type # noqa
def main():
import click.core
click.core.DEPRECATED_HELP_NOTICE = """
DEPRECATED! Please use the command 'swh scheduler'."""
cli.deprecated = True
return cli(auto_envvar_prefix="SWH_SCHEDULER")
if __name__ == "__main__":
main()
diff --git a/swh/scheduler/pytest_plugin.py b/swh/scheduler/pytest_plugin.py
index dfa3165..f569bb4 100644
--- a/swh/scheduler/pytest_plugin.py
+++ b/swh/scheduler/pytest_plugin.py
@@ -1,111 +1,111 @@
# Copyright (C) 2020-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import timedelta
from functools import partial
from celery.contrib.testing import worker
from celery.contrib.testing.app import TestApp, setup_default_app
import pkg_resources
import pytest
from pytest_postgresql import factories
from swh.core.db.pytest_plugin import initialize_database_for_module
from swh.scheduler import get_scheduler
from swh.scheduler.backend import SchedulerBackend
# celery tasks for testing purpose; tasks themselves should be
# in swh/scheduler/tests/tasks.py
TASK_NAMES = ["ping", "multiping", "add", "error", "echo"]
scheduler_postgresql_proc = factories.postgresql_proc(
load=[
partial(
initialize_database_for_module,
modname="scheduler",
version=SchedulerBackend.current_version,
)
],
)
postgresql_scheduler = factories.postgresql("scheduler_postgresql_proc")
@pytest.fixture
def swh_scheduler_config(request, postgresql_scheduler):
return {
"db": postgresql_scheduler.dsn,
}
@pytest.fixture
def swh_scheduler(swh_scheduler_config):
- scheduler = get_scheduler("local", **swh_scheduler_config)
+ scheduler = get_scheduler("postgresql", **swh_scheduler_config)
for taskname in TASK_NAMES:
scheduler.create_task_type(
{
"type": "swh-test-{}".format(taskname),
"description": "The {} testing task".format(taskname),
"backend_name": "swh.scheduler.tests.tasks.{}".format(taskname),
"default_interval": timedelta(days=1),
"min_interval": timedelta(hours=6),
"max_interval": timedelta(days=12),
}
)
return scheduler
# this alias is used to be able to easily instantiate a db-backed Scheduler
# eg. for the RPC client/server test suite.
swh_db_scheduler = swh_scheduler
@pytest.fixture(scope="session")
def swh_scheduler_celery_app():
"""Set up a Celery app as swh.scheduler and swh worker tests would expect it"""
test_app = TestApp(
set_as_current=True,
enable_logging=True,
task_cls="swh.scheduler.task:SWHTask",
config={
"accept_content": ["application/x-msgpack", "application/json"],
"broker_url": "memory://guest@localhost//",
"task_serializer": "msgpack",
"result_serializer": "json",
},
)
with setup_default_app(test_app, use_trap=False):
from swh.scheduler.celery_backend import config
config.app = test_app
test_app.set_default()
test_app.set_current()
yield test_app
@pytest.fixture(scope="session")
def swh_scheduler_celery_includes():
"""List of task modules that should be loaded by the swh_scheduler_celery_worker on
startup."""
task_modules = ["swh.scheduler.tests.tasks"]
for entrypoint in pkg_resources.iter_entry_points("swh.workers"):
task_modules.extend(entrypoint.load()().get("task_modules", []))
return task_modules
@pytest.fixture(scope="session")
def swh_scheduler_celery_worker(
swh_scheduler_celery_app,
swh_scheduler_celery_includes,
):
"""Spawn a worker"""
for module in swh_scheduler_celery_includes:
swh_scheduler_celery_app.loader.import_task_module(module)
with worker.start_worker(swh_scheduler_celery_app, pool="solo") as w:
yield w
diff --git a/swh/scheduler/tests/test_cli_journal.py b/swh/scheduler/tests/test_cli_journal.py
index 3c7e723..023385f 100644
--- a/swh/scheduler/tests/test_cli_journal.py
+++ b/swh/scheduler/tests/test_cli_journal.py
@@ -1,132 +1,132 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from typing import Dict, List
from click.testing import CliRunner, Result
from confluent_kafka import Producer
import pytest
import yaml
from swh.journal.serializers import value_to_kafka
from swh.scheduler import get_scheduler
from swh.scheduler.cli import cli
from swh.scheduler.tests.test_journal_client import VISIT_STATUSES_1
@pytest.fixture
def swh_scheduler_cfg(postgresql_scheduler, kafka_server):
"""Journal client configuration ready"""
return {
"scheduler": {
- "cls": "local",
+ "cls": "postgresql",
"db": postgresql_scheduler.dsn,
},
"journal": {
"brokers": [kafka_server],
"group_id": "test-consume-visit-status",
},
}
def _write_configuration_path(config: Dict, tmp_path: str) -> str:
config_path = os.path.join(str(tmp_path), "scheduler.yml")
with open(config_path, "w") as f:
f.write(yaml.dump(config))
return config_path
@pytest.fixture
def swh_scheduler_cfg_path(swh_scheduler_cfg, tmp_path):
"""Write scheduler configuration in temporary path and returns such path"""
return _write_configuration_path(swh_scheduler_cfg, tmp_path)
def invoke(args: List[str], config_path: str, catch_exceptions: bool = False) -> Result:
"""Invoke swh scheduler journal subcommands"""
runner = CliRunner()
result = runner.invoke(cli, ["-C" + config_path] + args)
if not catch_exceptions and result.exception:
print(result.output)
raise result.exception
return result
def test_cli_journal_client_origin_visit_status_misconfiguration_no_scheduler(
swh_scheduler_cfg, tmp_path
):
config = swh_scheduler_cfg.copy()
config["scheduler"] = {"cls": "foo"}
config_path = _write_configuration_path(config, tmp_path)
with pytest.raises(ValueError, match="must be instantiated"):
invoke(
[
"journal-client",
"--stop-after-objects",
"1",
],
config_path,
)
def test_cli_journal_client_origin_visit_status_misconfiguration_missing_journal_conf(
swh_scheduler_cfg, tmp_path
):
config = swh_scheduler_cfg.copy()
config.pop("journal", None)
config_path = _write_configuration_path(config, tmp_path)
with pytest.raises(ValueError, match="Missing 'journal'"):
invoke(
[
"journal-client",
"--stop-after-objects",
"1",
],
config_path,
)
def test_cli_journal_client_origin_visit_status(
swh_scheduler_cfg,
swh_scheduler_cfg_path,
):
kafka_server = swh_scheduler_cfg["journal"]["brokers"][0]
swh_scheduler = get_scheduler(**swh_scheduler_cfg["scheduler"])
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test visit-stats producer",
"acks": "all",
}
)
visit_status = VISIT_STATUSES_1[0]
value = value_to_kafka(visit_status)
topic = "swh.journal.objects.origin_visit_status"
producer.produce(topic=topic, key=b"bogus-origin", value=value)
producer.flush()
result = invoke(
[
"journal-client",
"--stop-after-objects",
"1",
],
swh_scheduler_cfg_path,
)
# Check the output
expected_output = "Processed 1 message(s).\nDone.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
actual_visit_stats = swh_scheduler.origin_visit_stats_get(
[(visit_status["origin"], visit_status["type"])]
)
assert actual_visit_stats
assert len(actual_visit_stats) == 1
diff --git a/swh/scheduler/tests/test_init.py b/swh/scheduler/tests/test_init.py
index 9a97548..7673d6d 100644
--- a/swh/scheduler/tests/test_init.py
+++ b/swh/scheduler/tests/test_init.py
@@ -1,77 +1,77 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import inspect
import pytest
from swh.scheduler import get_scheduler
from swh.scheduler.api.client import RemoteScheduler
from swh.scheduler.backend import SchedulerBackend
from swh.scheduler.interface import SchedulerInterface
SERVER_IMPLEMENTATIONS = [
("remote", RemoteScheduler, {"url": "localhost"}),
- ("local", SchedulerBackend, {"db": "something"}),
+ ("postgresql", SchedulerBackend, {"db": "something"}),
]
@pytest.fixture
def mock_psycopg2(mocker):
mocker.patch("swh.scheduler.backend.psycopg2.pool")
def test_init_get_scheduler_failure():
with pytest.raises(ValueError, match="Unknown Scheduler class"):
get_scheduler("unknown-scheduler-storage")
@pytest.mark.parametrize("class_name,expected_class,kwargs", SERVER_IMPLEMENTATIONS)
def test_init_get_scheduler(class_name, expected_class, kwargs, mock_psycopg2):
concrete_scheduler = get_scheduler(class_name, **kwargs)
assert isinstance(concrete_scheduler, expected_class)
assert isinstance(concrete_scheduler, SchedulerInterface)
@pytest.mark.parametrize("class_name,expected_class,kwargs", SERVER_IMPLEMENTATIONS)
def test_init_get_scheduler_deprecation_warning(
class_name, expected_class, kwargs, mock_psycopg2
):
with pytest.warns(DeprecationWarning):
concrete_scheduler = get_scheduler(class_name, args=kwargs)
assert isinstance(concrete_scheduler, expected_class)
def test_types(swh_scheduler) -> None:
"""Checks all methods of SchedulerInterface are implemented by this
backend, and that they have the same signature."""
# Create an instance of the protocol (which cannot be instantiated
# directly, so this creates a subclass, then instantiates it)
interface = type("_", (SchedulerInterface,), {})()
missing_methods = []
for meth_name in dir(interface):
if meth_name.startswith("_"):
continue
interface_meth = getattr(interface, meth_name)
try:
concrete_meth = getattr(swh_scheduler, meth_name)
except AttributeError:
missing_methods.append(meth_name)
continue
expected_signature = inspect.signature(interface_meth)
actual_signature = inspect.signature(concrete_meth)
assert expected_signature == actual_signature, meth_name
assert missing_methods == []
# If all the assertions above succeed, then this one should too.
# But there's no harm in double-checking.
# And we could replace the assertions above by this one, but unlike
# the assertions above, it doesn't explain what is missing.
assert isinstance(swh_scheduler, SchedulerInterface)
diff --git a/swh/scheduler/tests/test_recurrent_visits.py b/swh/scheduler/tests/test_recurrent_visits.py
index b5e9ebf..ad3cfd2 100644
--- a/swh/scheduler/tests/test_recurrent_visits.py
+++ b/swh/scheduler/tests/test_recurrent_visits.py
@@ -1,216 +1,216 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import timedelta
import logging
from queue import Queue
from unittest.mock import MagicMock
import pytest
from swh.scheduler.celery_backend.recurrent_visits import (
DEFAULT_DVCS_POLICY,
VisitSchedulerThreads,
grab_next_visits_policy_weights,
send_visits_for_visit_type,
spawn_visit_scheduler_thread,
terminate_visit_scheduler_threads,
visit_scheduler_thread,
)
from .test_cli import invoke
TEST_MAX_QUEUE = 10000
MODULE_NAME = "swh.scheduler.celery_backend.recurrent_visits"
def _compute_backend_name(visit_type: str) -> str:
"Build a dummy reproducible backend name"
return f"swh.loader.{visit_type}.tasks"
@pytest.fixture
def swh_scheduler(swh_scheduler):
"""Override default fixture of the scheduler to install some more task types."""
for visit_type in ["test-git", "test-hg", "test-svn"]:
task_type = f"load-{visit_type}"
swh_scheduler.create_task_type(
{
"type": task_type,
"max_queue_length": TEST_MAX_QUEUE,
"description": "The {} testing task".format(task_type),
"backend_name": _compute_backend_name(visit_type),
"default_interval": timedelta(days=1),
"min_interval": timedelta(hours=6),
"max_interval": timedelta(days=12),
}
)
return swh_scheduler
def test_cli_schedule_recurrent_unknown_visit_type(swh_scheduler):
"""When passed an unknown visit type, the recurrent visit scheduler should refuse
to start."""
with pytest.raises(ValueError, match="Unknown"):
invoke(
swh_scheduler,
False,
[
"schedule-recurrent",
"--visit-type",
"unknown",
"--visit-type",
"test-git",
],
)
def test_cli_schedule_recurrent_noop(swh_scheduler, mocker):
"""When passing no visit types, the recurrent visit scheduler should start."""
spawn_visit_scheduler_thread = mocker.patch(
f"{MODULE_NAME}.spawn_visit_scheduler_thread"
)
spawn_visit_scheduler_thread.side_effect = SystemExit
# The actual scheduling threads won't spawn, they'll immediately terminate. This
# only exercises the logic to pull task types out of the database
result = invoke(swh_scheduler, False, ["schedule-recurrent"])
assert result.exit_code == 0, result.output
def test_recurrent_visit_scheduling(
swh_scheduler,
caplog,
listed_origins_by_type,
mocker,
):
"""Scheduling known tasks is ok."""
caplog.set_level(logging.DEBUG, MODULE_NAME)
nb_origins = 1000
mock_celery_app = MagicMock()
mock_available_slots = mocker.patch(f"{MODULE_NAME}.get_available_slots")
mock_available_slots.return_value = nb_origins # Slots available in queue
# Make sure the scheduler is properly configured in terms of visit/task types
all_task_types = {
task_type_d["type"]: task_type_d
for task_type_d in swh_scheduler.get_task_types()
}
visit_types = list(listed_origins_by_type.keys())
assert len(visit_types) > 0
task_types = []
origins = []
for visit_type, _origins in listed_origins_by_type.items():
origins.extend(swh_scheduler.record_listed_origins(_origins))
task_type_name = f"load-{visit_type}"
assert task_type_name in all_task_types.keys()
task_type = all_task_types[task_type_name]
task_type["visit_type"] = visit_type
# we'll limit the orchestrator to the origins' type we know
task_types.append(task_type)
for visit_type in ["test-git", "test-svn"]:
task_type = f"load-{visit_type}"
send_visits_for_visit_type(
swh_scheduler,
mock_celery_app,
visit_type,
all_task_types[task_type],
DEFAULT_DVCS_POLICY,
)
assert mock_available_slots.called, "The available slots functions should be called"
records = [record.message for record in caplog.records]
# Mapping over the dict ratio/policies entries can change overall order so let's
# check the set of records
expected_records = set()
for task_type in task_types:
visit_type = task_type["visit_type"]
queue_name = task_type["backend_name"]
msg = (
f"{nb_origins} available slots for visit type {visit_type} "
f"in queue {queue_name}"
)
expected_records.add(msg)
for expected_record in expected_records:
assert expected_record in set(records)
@pytest.mark.parametrize(
"visit_type, extras",
[("test-hg", {}), ("test-git", {"tablesample": 0.1})],
)
def test_recurrent_visit_additional_parameters(
swh_scheduler, mocker, visit_type, extras
):
"""Testing additional policy parameters"""
mock_grab_next_visits = mocker.patch.object(swh_scheduler, "grab_next_visits")
mock_grab_next_visits.return_value = []
policy_cfg = DEFAULT_DVCS_POLICY[:]
for policy in policy_cfg:
policy.update(extras)
grab_next_visits_policy_weights(swh_scheduler, visit_type, 10, policy_cfg)
for call in mock_grab_next_visits.call_args_list:
assert call[1].get("tablesample") == extras.get("tablesample")
@pytest.fixture
def scheduler_config(swh_scheduler_config):
- return {"scheduler": {"cls": "local", **swh_scheduler_config}, "celery": {}}
+ return {"scheduler": {"cls": "postgresql", **swh_scheduler_config}, "celery": {}}
def test_visit_scheduler_thread_unknown_task(
swh_scheduler,
scheduler_config,
):
"""Starting a thread with unknown task type reports the error"""
unknown_visit_type = "unknown"
command_queue = Queue()
exc_queue = Queue()
visit_scheduler_thread(
scheduler_config, unknown_visit_type, command_queue, exc_queue
)
assert command_queue.empty() is True
assert exc_queue.empty() is False
assert len(exc_queue.queue) == 1
result = exc_queue.queue.pop()
assert result[0] == unknown_visit_type
assert isinstance(result[1], ValueError)
def test_spawn_visit_scheduler_thread_noop(scheduler_config, visit_types, mocker):
"""Spawning and terminating threads runs smoothly"""
threads: VisitSchedulerThreads = {}
exc_queue = Queue()
mock_build_app = mocker.patch("swh.scheduler.celery_backend.config.build_app")
mock_build_app.return_value = MagicMock()
assert len(threads) == 0
for visit_type in visit_types:
spawn_visit_scheduler_thread(threads, exc_queue, scheduler_config, visit_type)
# This actually only checks the spawning and terminating logic is sound
assert len(threads) == len(visit_types)
actual_threads = terminate_visit_scheduler_threads(threads)
assert not len(actual_threads)
assert mock_build_app.called
diff --git a/swh/scheduler/tests/test_server.py b/swh/scheduler/tests/test_server.py
index a678dd8..50c5b41 100644
--- a/swh/scheduler/tests/test_server.py
+++ b/swh/scheduler/tests/test_server.py
@@ -1,100 +1,100 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
import yaml
from swh.scheduler.api.server import load_and_check_config
def prepare_config_file(tmpdir, content, name="config.yml"):
"""Prepare configuration file in `$tmpdir/name` with content `content`.
Args:
tmpdir (LocalPath): root directory
content (str/dict): Content of the file either as string or as a dict.
If a dict, converts the dict into a yaml string.
name (str): configuration filename
Returns
path (str) of the configuration file prepared.
"""
config_path = tmpdir / name
if isinstance(content, dict): # convert if needed
content = yaml.dump(content)
config_path.write_text(content, encoding="utf-8")
# pytest on python3.5 does not support LocalPath manipulation, so
# convert path to string
return str(config_path)
@pytest.mark.parametrize("scheduler_class", [None, ""])
def test_load_and_check_config_no_configuration(scheduler_class):
"""Inexistent configuration files raises"""
with pytest.raises(EnvironmentError, match="Configuration file must be defined"):
load_and_check_config(scheduler_class)
def test_load_and_check_config_inexistent_fil():
"""Inexistent config filepath should raise"""
config_path = "/some/inexistent/config.yml"
expected_error = f"Configuration file {config_path} does not exist"
with pytest.raises(FileNotFoundError, match=expected_error):
load_and_check_config(config_path)
def test_load_and_check_config_wrong_configuration(tmpdir):
"""Wrong configuration raises"""
config_path = prepare_config_file(tmpdir, "something: useless")
with pytest.raises(KeyError, match="Missing '%scheduler' configuration"):
load_and_check_config(config_path)
def test_load_and_check_config_remote_config_local_type_raise(tmpdir):
"""Configuration without 'local' storage is rejected"""
config = {"scheduler": {"cls": "remote"}}
config_path = prepare_config_file(tmpdir, config)
expected_error = (
- "The scheduler backend can only be started with a 'local'" " configuration"
+ "The scheduler backend can only be started with a 'postgresql'" " configuration"
)
with pytest.raises(ValueError, match=expected_error):
load_and_check_config(config_path, type="local")
def test_load_and_check_config_local_incomplete_configuration(tmpdir):
"""Incomplete 'local' configuration should raise"""
config = {
"scheduler": {
- "cls": "local",
+ "cls": "postgresql",
"something": "needed-for-test",
}
}
config_path = prepare_config_file(tmpdir, config)
expected_error = "Invalid configuration; missing 'db' config entry"
with pytest.raises(KeyError, match=expected_error):
load_and_check_config(config_path)
def test_load_and_check_config_local_config_fine(tmpdir):
"""Local configuration is fine"""
config = {
"scheduler": {
- "cls": "local",
+ "cls": "postgresql",
"db": "db",
}
}
config_path = prepare_config_file(tmpdir, config)
cfg = load_and_check_config(config_path, type="local")
assert cfg == config
def test_load_and_check_config_remote_config_fine(tmpdir):
"""Remote configuration is fine"""
config = {"scheduler": {"cls": "remote"}}
config_path = prepare_config_file(tmpdir, config)
cfg = load_and_check_config(config_path, type="any")
assert cfg == config