diff --git a/docs/index.rst b/docs/index.rst --- a/docs/index.rst +++ b/docs/index.rst @@ -144,6 +144,21 @@ Keyword args: + +Writing a new worker for a new swh-task-type +-------------------------------------------- + +When you want to add a new swh-task-type, you need a celery worker backend +capable of executing this new task-type instances. + +Celery workers for swh-scheduler based tasks should be started using the Celery +app in `swh.scheduler.celery_config`. This later, among other things, provides +a loading mechanism for task types based on pkg_resources declared plugins under +the `[swh.workers]` entry point. + +TODO: add a fully working example of a dumb task. + + Reference Documentation ----------------------- diff --git a/swh/scheduler/celery_backend/config.py b/swh/scheduler/celery_backend/config.py --- a/swh/scheduler/celery_backend/config.py +++ b/swh/scheduler/celery_backend/config.py @@ -5,6 +5,7 @@ import logging import os +import pkg_resources import urllib.parse from celery import Celery @@ -197,6 +198,16 @@ # Load the Celery config CONFIG = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) +CONFIG.setdefault('task_modules', []) +# load tasks modules declared as plugin entry points +for entrypoint in pkg_resources.iter_entry_points('swh.workers'): + worker_registrer_fn = entrypoint.load() + # The registry function is expected to return a dict which the 'tasks' key + # is a string (or a list of strings) with the name of the python module in + # which celery tasks are defined. + task_modules = worker_registrer_fn().get('task_modules', []) + CONFIG['task_modules'].extend(task_modules) + # Celery Queues CELERY_QUEUES = [Queue('celery', Exchange('celery'), routing_key='celery')] diff --git a/swh/scheduler/tests/conftest.py b/swh/scheduler/tests/conftest.py --- a/swh/scheduler/tests/conftest.py +++ b/swh/scheduler/tests/conftest.py @@ -2,6 +2,7 @@ import pytest import glob from datetime import timedelta +import pkg_resources from swh.core.utils import numfile_sortkey as sortkey from swh.scheduler import get_scheduler @@ -34,9 +35,12 @@ @pytest.fixture(scope='session') def celery_includes(): - return [ + task_modules = [ 'swh.scheduler.tests.tasks', ] + for entrypoint in pkg_resources.iter_entry_points('swh.workers'): + task_modules.extend(entrypoint.load()().get('task_modules', [])) + return task_modules @pytest.fixture(scope='session')