diff --git a/swh/scheduler/celery_backend/runner.py b/swh/scheduler/celery_backend/runner.py index 25a0705..bae11d8 100644 --- a/swh/scheduler/celery_backend/runner.py +++ b/swh/scheduler/celery_backend/runner.py @@ -1,94 +1,105 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import time - import arrow from celery import group from swh.scheduler import get_scheduler, compute_nb_tasks_from from .config import app as main_app # Max batch size for tasks MAX_NUM_TASKS = 10000 def run_ready_tasks(backend, app): """Run tasks that are ready Args backend (Scheduler): backend to read tasks to schedule app (App): Celery application to send tasks to + Returns: + A list of dictionaries: + { + 'task': the scheduler's task id, + 'backend_id': Celery's task id, + 'scheduler': arrow.utcnow() + } + + The result can be used to block-wait for the tasks' results: + + backend_tasks = run_ready_tasks(self.scheduler, app) + for task in backend_tasks: + AsyncResult(id=task['backend_id']).get() + """ + all_backend_tasks = [] while True: - throttled = False cursor = backend.cursor() task_types = {} pending_tasks = [] for task_type in backend.get_task_types(cursor=cursor): task_type_name = task_type['type'] task_types[task_type_name] = task_type max_queue_length = task_type['max_queue_length'] - if max_queue_length: - backend_name = task_type['backend_name'] + backend_name = task_type['backend_name'] + if max_queue_length and backend_name in app.tasks: queue_name = app.tasks[backend_name].task_queue queue_length = app.get_queue_length(queue_name) num_tasks = min(max_queue_length - queue_length, MAX_NUM_TASKS) else: num_tasks = MAX_NUM_TASKS if num_tasks > 0: num_tasks, num_tasks_priority = compute_nb_tasks_from( num_tasks) pending_tasks.extend( backend.grab_ready_tasks( task_type_name, num_tasks=num_tasks, num_tasks_priority=num_tasks_priority, cursor=cursor)) if not pending_tasks: - break + return all_backend_tasks celery_tasks = [] for task in pending_tasks: args = task['arguments']['args'] kwargs = task['arguments']['kwargs'] celery_task = app.tasks[ task_types[task['type']]['backend_name'] ].s(*args, **kwargs) celery_tasks.append(celery_task) group_result = group(celery_tasks).delay() backend_tasks = [{ 'task': task['id'], 'backend_id': group_result.results[i].id, 'scheduled': arrow.utcnow(), } for i, task in enumerate(pending_tasks)] backend.mass_schedule_task_runs(backend_tasks, cursor=cursor) backend.commit() - if throttled: - time.sleep(10) + all_backend_tasks.extend(backend_tasks) if __name__ == '__main__': for module in main_app.conf.CELERY_IMPORTS: __import__(module) main_backend = get_scheduler('local') try: run_ready_tasks(main_backend, main_app) except Exception: main_backend.rollback() raise diff --git a/swh/scheduler/tests/scheduler_testing.py b/swh/scheduler/tests/scheduler_testing.py new file mode 100644 index 0000000..ba11ec3 --- /dev/null +++ b/swh/scheduler/tests/scheduler_testing.py @@ -0,0 +1,71 @@ +import glob +import pytest +import os.path +import datetime + +from celery.result import AsyncResult +from celery.contrib.testing.worker import start_worker +import celery.contrib.testing.tasks # noqa + +from swh.core.tests.db_testing import DbTestFixture, DB_DUMP_TYPES +from swh.core.utils import numfile_sortkey as sortkey + +from swh.scheduler import get_scheduler +from swh.scheduler.celery_backend.runner import run_ready_tasks +from swh.scheduler.celery_backend.config import app +from swh.scheduler.tests.celery_testing import CeleryTestFixture + +from . import SQL_DIR + +DUMP_FILES = os.path.join(SQL_DIR, '*.sql') + + +@pytest.mark.db +class SchedulerTestFixture(CeleryTestFixture, DbTestFixture): + """Base class for test case classes, providing an SWH scheduler as + the `scheduler` attribute.""" + SCHEDULER_DB_NAME = 'softwareheritage-scheduler-test-fixture' + + def add_scheduler_task_type(self, task_type, backend_name): + task_type = { + 'type': task_type, + 'description': 'Update a git repository', + 'backend_name': backend_name, + 'default_interval': datetime.timedelta(days=64), + 'min_interval': datetime.timedelta(hours=12), + 'max_interval': datetime.timedelta(days=64), + 'backoff_factor': 2, + 'max_queue_length': None, + 'num_retries': 7, + 'retry_delay': datetime.timedelta(hours=2), + } + self.scheduler.create_task_type(task_type) + + def run_ready_tasks(self): + """Runs the scheduler and a Celery worker, then blocks until + all tasks are completed.""" + with start_worker(app): + backend_tasks = run_ready_tasks(self.scheduler, app) + for task in backend_tasks: + AsyncResult(id=task['backend_id']).get() + + @classmethod + def setUpClass(cls): + all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) + + all_dump_files = [(x, DB_DUMP_TYPES[os.path.splitext(x)[1]]) + for x in all_dump_files] + + cls.add_db(name=cls.SCHEDULER_DB_NAME, + dumps=all_dump_files) + super().setUpClass() + + def setUp(self): + super().setUp() + self.scheduler_config = { + 'scheduling_db': 'dbname=' + self.SCHEDULER_DB_NAME} + self.scheduler = get_scheduler('local', self.scheduler_config) + + def tearDown(self): + self.scheduler.close_connection() + super().tearDown() diff --git a/swh/scheduler/tests/test_fixtures.py b/swh/scheduler/tests/test_fixtures.py new file mode 100644 index 0000000..9e9146e --- /dev/null +++ b/swh/scheduler/tests/test_fixtures.py @@ -0,0 +1,38 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import unittest + +from swh.scheduler.tests.scheduler_testing import SchedulerTestFixture +from swh.scheduler.task import Task +from swh.scheduler.utils import create_task_dict + +task_has_run = False + + +class SomeTestTask(Task): + def run(self, *, foo): + global task_has_run + assert foo == 'bar' + task_has_run = True + + +class FixtureTest(SchedulerTestFixture, unittest.TestCase): + def setUp(self): + super().setUp() + self.add_scheduler_task_type( + 'some_test_task_type', + 'swh.scheduler.tests.test_fixtures.SomeTestTask') + + def test_task_run(self): + self.scheduler.create_tasks([create_task_dict( + 'some_test_task_type', + 'oneshot', + foo='bar', + )]) + + self.assertEqual(task_has_run, False) + self.run_ready_tasks() + self.assertEqual(task_has_run, True)