diff --git a/PKG-INFO b/PKG-INFO index 9f072c2..49c38a0 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,32 +1,32 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.11.0 +Version: 0.12.0 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scheduler/ Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: journal Provides-Extra: simulator diff --git a/docs/simulator.rst b/docs/simulator.rst index 979a389..35708b6 100644 --- a/docs/simulator.rst +++ b/docs/simulator.rst @@ -1,80 +1,80 @@ .. _swh-scheduler-simulator: Software Heritage Scheduler Simulator ===================================== This component simulates the interaction between the scheduling and loading infrastructure of Software Heritage. This allows quick(er) development of new task scheduling policies without having to wait for the actual infrastructure to perform (heavy) loading tasks. Simulator components -------------------- - real instance of the scheduler database - simulated task queues: replaces RabbitMQ with simple in-memory structures - simulated workers: replaces Celery with simple while loops - simulated load tasks: replaces loaders with noops that take a certain time, and generate synthetic OriginVisitStatus objects - simulated archive -> scheduler feedback loop: OriginVisitStatus objects are pushed to a simple queue which gets processed by the scheduler journal client's process function directly (instead of going through swh.storage and swh.journal (kafka)) In short, only the scheduler database and scheduler logic is kept; every other component (RabbitMQ, Celery, Kafka, SWH loaders, SWH storage) is either replaced with an barebones in-process utility, or removed entirely. Installing the simulator ------------------------ The simulator depends on SimPy and other specific libraries. To install them, please use: .. code-block:: bash pip install 'swh.scheduler[simulator]' Running the simulator --------------------- The simulator uses a real instance of the scheduler database, which is (at least for now) persistent across runs of the simulator. You need to set that up beforehand: .. code-block:: bash # if you want to use a temporary instance of postgresql eval `pifpaf run postgresql` # Set this variable for the simulator to know which db to connect to. pifpaf # sets other variables like PGPORT, PGHOST, ... export PGDATABASE=swh-scheduler # Create/initialize the scheduler database swh db create scheduler -d $PGDATABASE swh db init scheduler -d $PGDATABASE # This generates some data in the scheduler database. You can also feed the # database with more realistic data, e.g. from a lister or from a dump of the # production database. swh scheduler -d "dbname=$PGDATABASE" simulator fill-test-data # Run the simulator itself, interacting with the scheduler database you've # just seeded. swh scheduler -d "dbname=$PGDATABASE" simulator run --scheduler origin_scheduler Origin model ------------ The origin model is how we represent the behaviors of origins: when they are created/discovered, how many commits they get and when, and when they fail to load. For now it is only a simple approximation designed to exercise simple cases: origin creation/discovery, a continuous stream of commits, and failure if they have too many commits to load at once. -For details, see :py:`swh.scheduler.simulator.origins`. +For details, see :py:mod:`swh.scheduler.simulator.origins`. To keep the simulation fast enough, each origin's state is kept in memory, so the simulator process will linearly increase in memory usage as it runs. diff --git a/swh.scheduler.egg-info/PKG-INFO b/swh.scheduler.egg-info/PKG-INFO index 9f072c2..49c38a0 100644 --- a/swh.scheduler.egg-info/PKG-INFO +++ b/swh.scheduler.egg-info/PKG-INFO @@ -1,32 +1,32 @@ Metadata-Version: 2.1 Name: swh.scheduler -Version: 0.11.0 +Version: 0.12.0 Summary: Software Heritage Scheduler Home-page: https://forge.softwareheritage.org/diffusion/DSCH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-scheduler Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scheduler/ Description: swh-scheduler ============= Job scheduler for the Software Heritage project. Task manager for asynchronous/delayed tasks, used for both recurrent (e.g., listing a forge, loading new stuff from a Git repository) and one-off activities (e.g., loading a specific version of a source package). Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: journal Provides-Extra: simulator diff --git a/swh.scheduler.egg-info/SOURCES.txt b/swh.scheduler.egg-info/SOURCES.txt index 4f36b88..f9d7578 100644 --- a/swh.scheduler.egg-info/SOURCES.txt +++ b/swh.scheduler.egg-info/SOURCES.txt @@ -1,130 +1,131 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE LICENSE.Celery MANIFEST.in Makefile README.md conftest.py mypy.ini pyproject.toml pytest.ini requirements-journal.txt requirements-simulator.txt requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini data/README.md data/elastic-template.json data/update-index-settings.json docs/.gitignore docs/Makefile docs/cli.rst docs/conf.py docs/index.rst docs/simulator.rst docs/_static/.placeholder docs/_templates/.placeholder sql/.gitignore sql/Makefile sql/updates/02.sql sql/updates/03.sql sql/updates/04.sql sql/updates/05.sql sql/updates/06.sql sql/updates/07.sql sql/updates/08.sql sql/updates/09.sql sql/updates/10.sql sql/updates/11.sql sql/updates/12.sql sql/updates/13.sql sql/updates/14.sql sql/updates/15.sql sql/updates/16.sql sql/updates/17.sql sql/updates/18.sql sql/updates/19.sql sql/updates/20.sql sql/updates/23.sql sql/updates/24.sql sql/updates/25.sql sql/updates/26.sql swh/__init__.py swh.scheduler.egg-info/PKG-INFO swh.scheduler.egg-info/SOURCES.txt swh.scheduler.egg-info/dependency_links.txt swh.scheduler.egg-info/entry_points.txt swh.scheduler.egg-info/requires.txt swh.scheduler.egg-info/top_level.txt swh/scheduler/__init__.py swh/scheduler/backend.py swh/scheduler/backend_es.py swh/scheduler/cli_utils.py swh/scheduler/elasticsearch_memory.py swh/scheduler/exc.py swh/scheduler/interface.py swh/scheduler/journal_client.py swh/scheduler/model.py swh/scheduler/py.typed swh/scheduler/pytest_plugin.py swh/scheduler/task.py swh/scheduler/utils.py swh/scheduler/api/__init__.py swh/scheduler/api/client.py swh/scheduler/api/serializers.py swh/scheduler/api/server.py swh/scheduler/celery_backend/__init__.py swh/scheduler/celery_backend/config.py swh/scheduler/celery_backend/listener.py swh/scheduler/celery_backend/pika_listener.py swh/scheduler/celery_backend/runner.py swh/scheduler/cli/__init__.py swh/scheduler/cli/admin.py swh/scheduler/cli/celery_monitor.py swh/scheduler/cli/journal.py swh/scheduler/cli/origin.py swh/scheduler/cli/simulator.py swh/scheduler/cli/task.py swh/scheduler/cli/task_type.py swh/scheduler/cli/utils.py swh/scheduler/simulator/__init__.py swh/scheduler/simulator/common.py swh/scheduler/simulator/origin_scheduler.py swh/scheduler/simulator/origins.py swh/scheduler/simulator/task_scheduler.py swh/scheduler/sql/10-superuser-init.sql swh/scheduler/sql/30-schema.sql swh/scheduler/sql/40-func.sql swh/scheduler/sql/50-data.sql swh/scheduler/sql/60-indexes.sql swh/scheduler/tests/__init__.py swh/scheduler/tests/common.py swh/scheduler/tests/conftest.py swh/scheduler/tests/tasks.py swh/scheduler/tests/test_api_client.py swh/scheduler/tests/test_celery_tasks.py swh/scheduler/tests/test_cli.py swh/scheduler/tests/test_cli_celery_monitor.py swh/scheduler/tests/test_cli_journal.py swh/scheduler/tests/test_cli_origin.py swh/scheduler/tests/test_cli_task_type.py swh/scheduler/tests/test_common.py +swh/scheduler/tests/test_config.py swh/scheduler/tests/test_init.py swh/scheduler/tests/test_journal_client.py swh/scheduler/tests/test_model.py swh/scheduler/tests/test_scheduler.py swh/scheduler/tests/test_server.py swh/scheduler/tests/test_simulator.py swh/scheduler/tests/test_utils.py swh/scheduler/tests/es/__init__.py swh/scheduler/tests/es/conftest.py swh/scheduler/tests/es/test_backend_es.py swh/scheduler/tests/es/test_cli_task.py swh/scheduler/tests/es/test_elasticsearch_memory.py \ No newline at end of file diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py index ea469ad..463a40d 100644 --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -1,132 +1,165 @@ -# Copyright (C) 2015-2018 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging +from typing import Dict, List, Tuple from kombu.utils.uuid import uuid from swh.core.statsd import statsd -from swh.scheduler import compute_nb_tasks_from, get_scheduler +from swh.scheduler import get_scheduler +from swh.scheduler.interface import SchedulerInterface from swh.scheduler.utils import utcnow logger = logging.getLogger(__name__) # Max batch size for tasks MAX_NUM_TASKS = 10000 -def run_ready_tasks(backend, app): - """Run tasks that are ready +def run_ready_tasks(backend: SchedulerInterface, app) -> List[Dict]: + """Schedule tasks ready to be scheduled. + + This lookups any tasks per task type and mass schedules those accordingly (send + messages to rabbitmq and mark as scheduled equivalent tasks in the scheduler + backend). + + If tasks (per task type) with priority exist, they will get redirected to dedicated + high priority queue (standard queue name prefixed with `save_code_now:`). Args: - backend (Scheduler): backend to read tasks to schedule + backend: scheduler backend to interact with (read/update tasks) app (App): Celery application to send tasks to Returns: A list of dictionaries:: { 'task': the scheduler's task id, 'backend_id': Celery's task id, 'scheduler': utcnow() } The result can be used to block-wait for the tasks' results:: backend_tasks = run_ready_tasks(self.scheduler, app) for task in backend_tasks: AsyncResult(id=task['backend_id']).get() """ - all_backend_tasks = [] + all_backend_tasks: List[Dict] = [] while True: task_types = {} pending_tasks = [] for task_type in backend.get_task_types(): task_type_name = task_type["type"] task_types[task_type_name] = task_type max_queue_length = task_type["max_queue_length"] if max_queue_length is None: max_queue_length = 0 backend_name = task_type["backend_name"] if max_queue_length: try: queue_length = app.get_queue_length(backend_name) except ValueError: queue_length = None if queue_length is None: # Running without RabbitMQ (probably a test env). num_tasks = MAX_NUM_TASKS else: num_tasks = min(max_queue_length - queue_length, MAX_NUM_TASKS) else: num_tasks = MAX_NUM_TASKS # only pull tasks if the buffer is at least 1/5th empty (= 80% # full), to help postgresql use properly indexed queries. if num_tasks > min(MAX_NUM_TASKS, max_queue_length) // 5: - num_tasks, num_tasks_priority = compute_nb_tasks_from(num_tasks) - + # Only grab num_tasks tasks with no priority grabbed_tasks = backend.grab_ready_tasks( - task_type_name, - num_tasks=num_tasks, - num_tasks_priority=num_tasks_priority, + task_type_name, num_tasks=num_tasks, num_tasks_priority=0 ) if grabbed_tasks: pending_tasks.extend(grabbed_tasks) logger.info( "Grabbed %s tasks %s", len(grabbed_tasks), task_type_name ) statsd.increment( "swh_scheduler_runner_scheduled_task_total", len(grabbed_tasks), tags={"task_type": task_type_name}, ) + # grab max_queue_length (or 10) potential tasks with any priority for the + # same type (limit the result to avoid too long running queries) + grabbed_priority_tasks = backend.grab_ready_priority_tasks( + task_type_name, num_tasks=max_queue_length or 10 + ) + if grabbed_priority_tasks: + pending_tasks.extend(grabbed_priority_tasks) + logger.info( + "Grabbed %s tasks %s (priority)", + len(grabbed_priority_tasks), + task_type_name, + ) + statsd.increment( + "swh_scheduler_runner_scheduled_task_total", + len(grabbed_priority_tasks), + tags={"task_type": task_type_name}, + ) + if not pending_tasks: return all_backend_tasks backend_tasks = [] - celery_tasks = [] + celery_tasks: List[Tuple[bool, str, str, List, Dict]] = [] for task in pending_tasks: args = task["arguments"]["args"] kwargs = task["arguments"]["kwargs"] backend_name = task_types[task["type"]]["backend_name"] backend_id = uuid() - celery_tasks.append((backend_name, backend_id, args, kwargs)) + celery_tasks.append( + ( + task.get("priority") is not None, + backend_name, + backend_id, + args, + kwargs, + ) + ) data = { "task": task["id"], "backend_id": backend_id, "scheduled": utcnow(), } backend_tasks.append(data) logger.debug("Sent %s celery tasks", len(backend_tasks)) backend.mass_schedule_task_runs(backend_tasks) - for backend_name, backend_id, args, kwargs in celery_tasks: - app.send_task( - backend_name, task_id=backend_id, args=args, kwargs=kwargs, - ) + for with_priority, backend_name, backend_id, args, kwargs in celery_tasks: + kw = dict(task_id=backend_id, args=args, kwargs=kwargs,) + if with_priority: + kw["queue"] = f"save_code_now:{backend_name}" + app.send_task(backend_name, **kw) all_backend_tasks.extend(backend_tasks) def main(): from .config import app as main_app for module in main_app.conf.CELERY_IMPORTS: __import__(module) main_backend = get_scheduler("local") try: run_ready_tasks(main_backend, main_app) except Exception: main_backend.rollback() raise if __name__ == "__main__": main() diff --git a/swh/scheduler/simulator/origin_scheduler.py b/swh/scheduler/simulator/origin_scheduler.py index ca16912..682e12f 100644 --- a/swh/scheduler/simulator/origin_scheduler.py +++ b/swh/scheduler/simulator/origin_scheduler.py @@ -1,68 +1,68 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Agents using the new origin-aware scheduler.""" import logging from typing import Any, Dict, Generator, Iterator, List from simpy import Event from swh.scheduler.journal_client import process_journal_objects from .common import Environment, Queue, Task, TaskEvent logger = logging.getLogger(__name__) def scheduler_runner_process( env: Environment, task_queues: Dict[str, Queue], policy: str, min_batch_size: int ) -> Iterator[Event]: """Scheduler runner. Grabs next visits from the database according to the scheduling policy, and fills the task_queues accordingly.""" while True: for visit_type, queue in task_queues.items(): remaining = queue.slots_remaining() if remaining < min_batch_size: continue next_origins = env.scheduler.grab_next_visits( visit_type, remaining, policy=policy, timestamp=env.time ) logger.debug( "%s runner: running %s %s tasks", env.time, visit_type, len(next_origins), ) for origin in next_origins: yield queue.put(Task(visit_type=origin.visit_type, origin=origin.url)) yield env.timeout(10.0) def scheduler_journal_client_process( env: Environment, status_queue: Queue ) -> Generator[Event, TaskEvent, None]: """Scheduler journal client. Every once in a while, pulls - `OriginVisitStatus`es from the status_queue to update the scheduler - origin_visit_stats table.""" + :class:`OriginVisitStatuses ` + from the status_queue to update the scheduler origin_visit_stats table.""" BATCH_SIZE = 100 statuses: List[Dict[str, Any]] = [] while True: task_event = yield status_queue.get() statuses.append(task_event.status.to_dict()) if len(statuses) < BATCH_SIZE: continue logger.debug( "%s journal client: processing %s statuses", env.time, len(statuses) ) process_journal_objects( {"origin_visit_status": statuses}, scheduler=env.scheduler ) statuses = [] diff --git a/swh/scheduler/task.py b/swh/scheduler/task.py index 46bbe81..0db6204 100644 --- a/swh/scheduler/task.py +++ b/swh/scheduler/task.py @@ -1,81 +1,85 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime from celery import current_app import celery.app.task from celery.utils.log import get_task_logger from swh.core.statsd import Statsd def ts(): return int(datetime.utcnow().timestamp()) class SWHTask(celery.app.task.Task): """a schedulable task (abstract class) Current implementation is based on Celery. See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for how to use tasks once instantiated """ _statsd = None _log = None + reject_on_worker_lost = None + """Inherited from :class:`celery.app.task.Task`, but we need to override + its docstring because it uses a custom ReST role""" + @property def statsd(self): if self._statsd: return self._statsd worker_name = current_app.conf.get("worker_name") if worker_name: self._statsd = Statsd( constant_tags={"task": self.name, "worker": worker_name,} ) return self._statsd else: statsd = Statsd( constant_tags={"task": self.name, "worker": "unknown worker",} ) return statsd def __call__(self, *args, **kwargs): self.statsd.increment("swh_task_called_count") self.statsd.gauge("swh_task_start_ts", ts()) with self.statsd.timed("swh_task_duration_seconds"): result = super().__call__(*args, **kwargs) try: status = result["status"] if status == "success": status = "eventful" if result.get("eventful") else "uneventful" except Exception: status = "eventful" if result else "uneventful" self.statsd.gauge("swh_task_end_ts", ts(), tags={"status": status}) return result def on_failure(self, exc, task_id, args, kwargs, einfo): self.statsd.increment("swh_task_failure_count") def on_success(self, retval, task_id, args, kwargs): self.statsd.increment("swh_task_success_count") # this is a swh specific event. Used to attach the retval to the # task_run self.send_event("task-result", result=retval) @property def log(self): if self._log is None: self._log = get_task_logger(self.name) return self._log def run(self, *args, **kwargs): self.log.debug("%s: args=%s, kwargs=%s", self.name, args, kwargs) ret = super().run(*args, **kwargs) self.log.debug("%s: OK => %s", self.name, ret) return ret diff --git a/swh/scheduler/tests/test_celery_tasks.py b/swh/scheduler/tests/test_celery_tasks.py index ab8be43..1a20f31 100644 --- a/swh/scheduler/tests/test_celery_tasks.py +++ b/swh/scheduler/tests/test_celery_tasks.py @@ -1,174 +1,254 @@ +# Copyright (C) 2019-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Module in charge of testing the scheduler runner module""" + from itertools import count from time import sleep from celery.result import AsyncResult, GroupResult import pytest from swh.scheduler.celery_backend.runner import run_ready_tasks from swh.scheduler.utils import create_task_dict def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker): res = swh_scheduler_celery_app.send_task("swh.scheduler.tests.tasks.ping") assert res res.wait() assert res.successful() assert res.result == "OK" def test_ping_with_kw(swh_scheduler_celery_app, swh_scheduler_celery_worker): res = swh_scheduler_celery_app.send_task( "swh.scheduler.tests.tasks.ping", kwargs={"a": 1} ) assert res res.wait() assert res.successful() assert res.result == "OK (kw={'a': 1})" def test_multiping(swh_scheduler_celery_app, swh_scheduler_celery_worker): "Test that a task that spawns subtasks (group) works" res = swh_scheduler_celery_app.send_task( "swh.scheduler.tests.tasks.multiping", kwargs={"n": 5} ) assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id promise = GroupResult.restore(promise_id, app=swh_scheduler_celery_app) for i in range(5): if promise.ready(): break sleep(1) results = [x.get() for x in promise.results] assert len(results) == 5 for i in range(5): assert ("OK (kw={'i': %s})" % i) in results def test_scheduler_fixture( swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler ): "Test that the scheduler fixture works properly" task_type = swh_scheduler.get_task_type("swh-test-ping") assert task_type assert task_type["backend_name"] == "swh.scheduler.tests.tasks.ping" swh_scheduler.create_tasks([create_task_dict("swh-test-ping", "oneshot")]) backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) assert backend_tasks for task in backend_tasks: # Make sure the task completed AsyncResult(id=task["backend_id"]).get() -def test_task_return_value( +def test_run_ready_task_standard( swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler ): - task_type = swh_scheduler.get_task_type("swh-test-add") + """Ensure scheduler runner schedules tasks ready for scheduling""" + task_type_name, backend_name = "swh-test-add", "swh.scheduler.tests.tasks.add" + task_type = swh_scheduler.get_task_type(task_type_name) assert task_type - assert task_type["backend_name"] == "swh.scheduler.tests.tasks.add" + assert task_type["backend_name"] == backend_name + + task_inputs = [ + ("oneshot", (12, 30)), + ("oneshot", (20, 10)), + ("recurring", (30, 10)), + ] + + tasks = swh_scheduler.create_tasks( + create_task_dict(task_type_name, policy, *args) + for (policy, args) in task_inputs + ) + + assert len(tasks) == len(task_inputs) - swh_scheduler.create_tasks([create_task_dict("swh-test-add", "oneshot", 12, 30)]) + task_ids = set() + for task in tasks: + assert task["status"] == "next_run_not_scheduled" + assert task["priority"] is None + task_ids.add(task["id"]) backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) - assert len(backend_tasks) == 1 - task = backend_tasks[0] - value = AsyncResult(id=task["backend_id"]).get() - assert value == 42 + assert len(backend_tasks) == len(tasks) + + scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name) + assert len(scheduled_tasks) == len(tasks) + for task in scheduled_tasks: + assert task["status"] == "next_run_scheduled" + assert task["id"] in task_ids + + for i, (_, args) in enumerate(task_inputs): + # take one of the task and read it from the queue backend + task = backend_tasks[i] + value = AsyncResult(id=task["backend_id"]).get() + assert value == sum(args) + + +def test_run_ready_task_with_priority( + swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler +): + """Ensure scheduler runner schedules priority tasks ready for scheduling""" + task_type_name, backend_name = "swh-test-add", "swh.scheduler.tests.tasks.add" + task_type = swh_scheduler.get_task_type(task_type_name) + assert task_type + assert task_type["backend_name"] == backend_name + + task_inputs = [ + ("oneshot", (10, 22), "low"), + ("oneshot", (20, 10), "normal"), + ("recurring", (30, 10), "high"), + ] + + tasks = swh_scheduler.create_tasks( + create_task_dict(task_type_name, policy, *args, priority=priority) + for (policy, args, priority) in task_inputs + ) + + assert len(tasks) == len(task_inputs) + + task_ids = set() + for task in tasks: + assert task["status"] == "next_run_not_scheduled" + assert task["priority"] is not None + task_ids.add(task["id"]) + + backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) + assert len(backend_tasks) == len(tasks) + + scheduled_tasks = swh_scheduler.search_tasks(task_type=task_type_name) + assert len(scheduled_tasks) == len(tasks) + for task in scheduled_tasks: + assert task["status"] == "next_run_scheduled" + assert task["id"] in task_ids + + # TODO: Make the worker consume those messages so this can go green + # for i, (_, args, _) in enumerate(task_inputs): + # # take one of the task and read it from the queue backend + # task = backend_tasks[i] + # value = AsyncResult(id=task["backend_id"]).get() + # assert value == sum(args) def test_task_exception( swh_scheduler_celery_app, swh_scheduler_celery_worker, swh_scheduler ): task_type = swh_scheduler.get_task_type("swh-test-error") assert task_type assert task_type["backend_name"] == "swh.scheduler.tests.tasks.error" swh_scheduler.create_tasks([create_task_dict("swh-test-error", "oneshot")]) backend_tasks = run_ready_tasks(swh_scheduler, swh_scheduler_celery_app) assert len(backend_tasks) == 1 task = backend_tasks[0] result = AsyncResult(id=task["backend_id"]) with pytest.raises(NotImplementedError): result.get() def test_statsd(swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker): m = mocker.patch("swh.scheduler.task.Statsd._send_to_server") mocker.patch("swh.scheduler.task.ts", side_effect=count()) mocker.patch("swh.core.statsd.monotonic", side_effect=count()) res = swh_scheduler_celery_app.send_task("swh.scheduler.tests.tasks.echo") assert res res.wait() assert res.successful() assert res.result == {} m.assert_any_call( "swh_task_called_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_start_ts:0|g|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_end_ts:1|g|" "#status:uneventful,task:swh.scheduler.tests.tasks.echo," "worker:unknown worker" ) m.assert_any_call( "swh_task_duration_seconds:1000|ms|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_success_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) def test_statsd_with_status( swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker ): m = mocker.patch("swh.scheduler.task.Statsd._send_to_server") mocker.patch("swh.scheduler.task.ts", side_effect=count()) mocker.patch("swh.core.statsd.monotonic", side_effect=count()) res = swh_scheduler_celery_app.send_task( "swh.scheduler.tests.tasks.echo", kwargs={"status": "eventful"} ) assert res res.wait() assert res.successful() assert res.result == {"status": "eventful"} m.assert_any_call( "swh_task_called_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_start_ts:0|g|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_end_ts:1|g|" "#status:eventful,task:swh.scheduler.tests.tasks.echo," "worker:unknown worker" ) m.assert_any_call( "swh_task_duration_seconds:1000|ms|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) m.assert_any_call( "swh_task_success_count:1|c|" "#task:swh.scheduler.tests.tasks.echo,worker:unknown worker" ) diff --git a/swh/scheduler/tests/test_config.py b/swh/scheduler/tests/test_config.py new file mode 100644 index 0000000..c166f62 --- /dev/null +++ b/swh/scheduler/tests/test_config.py @@ -0,0 +1,18 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from swh.scheduler.celery_backend.config import route_for_task + + +@pytest.mark.parametrize("name", ["swh.something", "swh.anything"]) +def test_route_for_task_routing(name): + assert route_for_task(name, [], {}, {}) == {"queue": name} + + +@pytest.mark.parametrize("name", [None, "foobar"]) +def test_route_for_task_no_routing(name): + assert route_for_task(name, [], {}, {}) is None