diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py --- a/swh/indexer/orchestrator.py +++ b/swh/indexer/orchestrator.py @@ -4,12 +4,12 @@ # See top-level LICENSE file for more information import random - -from celery import group +import datetime from swh.core.config import SWHConfig from swh.core.utils import grouper from swh.scheduler import utils +from swh.scheduler import get_scheduler def get_class(clazz): @@ -60,6 +60,7 @@ from . import TASK_NAMES, INDEXER_CLASSES DEFAULT_CONFIG = { + 'scheduler_url': 'http://localhost:9999', # TODO: better default 'indexers': ('dict', { 'mimetype': { 'batch_size': 10, @@ -71,6 +72,11 @@ def prepare(self): super().prepare() self.prepare_tasks() + self.prepare_scheduler() + + def prepare_scheduler(self): + self.scheduler = get_scheduler('remote', { + 'url': self.config['scheduler_url']}) def prepare_tasks(self): indexer_names = list(self.config['indexers']) @@ -93,8 +99,8 @@ self.tasks = tasks def run(self, ids): - all_results = [] - for name, (idx_class, filtering, batch_size) in self.indexers.items(): + for task_name, task_attrs in self.indexers.items(): + (idx_class, filtering, batch_size) = task_attrs if filtering: policy_update = 'ignore-dups' indexer_class = get_class(idx_class) @@ -105,19 +111,22 @@ policy_update = 'update-dups' ids_filtered = ids - celery_tasks = [] + tasks = [] for ids_to_send in grouper(ids_filtered, batch_size): - celery_task = self.tasks[name].s( - ids=list(ids_to_send), - policy_update=policy_update) - celery_tasks.append(celery_task) - - all_results.append(self._run_tasks(celery_tasks)) - - return all_results - - def _run_tasks(self, celery_tasks): - return group(celery_tasks).delay() + tasks.append({ + 'type': task_name, + 'arguments': { + 'args': [], + 'kwargs': { + 'ids': list(ids_to_send), + 'policy_update': policy_update}}, + 'next_run': datetime.datetime.now(), + 'retries_left': 5, + }) + self._create_tasks(tasks) + + def _create_tasks(self, tasks): + self.scheduler.create_tasks(tasks) class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer): diff --git a/swh/indexer/tests/__init__.py b/swh/indexer/tests/__init__.py --- a/swh/indexer/tests/__init__.py +++ b/swh/indexer/tests/__init__.py @@ -1,15 +1,4 @@ import swh.indexer from os import path -from celery.contrib.testing.worker import start_worker -import celery.contrib.testing.tasks # noqa - -from swh.scheduler.celery_backend.config import app - -__all__ = ['start_worker_thread'] - SQL_DIR = path.join(path.dirname(swh.indexer.__file__), 'sql') - - -def start_worker_thread(): - return start_worker(app) diff --git a/swh/indexer/tests/test_orchestrator.py b/swh/indexer/tests/test_orchestrator.py --- a/swh/indexer/tests/test_orchestrator.py +++ b/swh/indexer/tests/test_orchestrator.py @@ -11,7 +11,7 @@ from swh.indexer.indexer import BaseIndexer from swh.indexer.tests.test_utils import MockIndexerStorage, MockStorage from swh.scheduler.tests.celery_testing import CeleryTestFixture -from swh.indexer.tests import start_worker_thread +from swh.scheduler.tests.scheduler_testing import SchedulerTestFixture class BaseTestIndexer(BaseIndexer): @@ -98,12 +98,9 @@ 'indexer3': 'swh.indexer.tests.test_orchestrator.Indexer3', } - def __init__(self): - super().__init__() - self.running_tasks = [] - def prepare(self): self.config = { + 'scheduler_url': 'http://localhost:9999', 'indexers': { 'indexer1': { 'batch_size': 2, @@ -116,29 +113,42 @@ } } self.prepare_tasks() + self.prepare_scheduler() class MockedTestOrchestrator12(TestOrchestrator12): - def _run_tasks(self, celery_tasks): - self.running_tasks.extend(celery_tasks) + def __init__(self): + super().__init__() + self.created_tasks = [] + def _create_tasks(self, celery_tasks): + self.created_tasks.extend(celery_tasks) + + def prepare_scheduler(self): + pass + + +class OrchestratorTest(CeleryTestFixture, SchedulerTestFixture, + unittest.TestCase): + def setUp(self): + super().setUp() + self.add_scheduler_task_type( + 'indexer1', + 'swh.indexer.tests.test_orchestrator.indexer1_task') + self.add_scheduler_task_type( + 'indexer2', + 'swh.indexer.tests.test_orchestrator.indexer2_task') -class OrchestratorTest(CeleryTestFixture, unittest.TestCase): def test_orchestrator_filter(self): - with start_worker_thread(): - o = TestOrchestrator12() - o.prepare() - promises = o.run(['id12', 'id2']) - results = [] - for promise in reversed(promises): - results.append(promise.get(timeout=10)) - self.assertCountEqual( - results, - [[['id12_indexed_by_Indexer1']], - [['id12_indexed_by_Indexer2', - 'id2_indexed_by_Indexer2']]]) - self.assertEqual(Indexer2.indexed, [['id12', 'id2']]) - self.assertEqual(Indexer1.indexed, [['id12']]) + o = TestOrchestrator12() + o.prepare() + o.scheduler = self.scheduler + o.run(['id12', 'id2']) + self.assertEqual(Indexer2.indexed, []) + self.assertEqual(Indexer1.indexed, []) + self.run_ready_tasks() + self.assertEqual(Indexer2.indexed, [['id12', 'id2']]) + self.assertEqual(Indexer1.indexed, [['id12']]) class MockedOrchestratorTest(unittest.TestCase): @@ -148,52 +158,51 @@ o = MockedTestOrchestrator12() o.prepare() o.run(['id12', 'id2']) - self.assertCountEqual(o.running_tasks, [ - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.indexer1_task'}, - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12', 'id2'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, + for task in o.created_tasks: + del task['next_run'] # not worth the trouble testing it properly + self.assertCountEqual(o.created_tasks, [ + {'type': 'indexer1', + 'arguments': { + 'args': [], + 'kwargs': { + 'ids': ['id12'], + 'policy_update': 'ignore-dups'}}, + 'retries_left': 5}, + {'type': 'indexer2', + 'arguments': { + 'args': [], + 'kwargs': { + 'ids': ['id12', 'id2'], + 'policy_update': 'ignore-dups'}}, + 'retries_left': 5}, ]) def test_mocked_orchestrator_batch(self): o = MockedTestOrchestrator12() o.prepare() o.run(['id12', 'id2a', 'id2b', 'id2c']) - self.assertCountEqual(o.running_tasks, [ - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.indexer1_task'}, - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12', 'id2a'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id2b', 'id2c'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, + for task in o.created_tasks: + del task['next_run'] # not worth the trouble testing it properly + self.assertCountEqual(o.created_tasks, [ + {'type': 'indexer1', + 'arguments': { + 'args': [], + 'kwargs': { + 'ids': ['id12'], + 'policy_update': 'ignore-dups'}}, + 'retries_left': 5}, + {'type': 'indexer2', + 'arguments': { + 'args': [], + 'kwargs': { + 'ids': ['id12', 'id2a'], + 'policy_update': 'ignore-dups'}}, + 'retries_left': 5}, + {'type': 'indexer2', + 'arguments': { + 'args': [], + 'kwargs': { + 'ids': ['id2b', 'id2c'], + 'policy_update': 'ignore-dups'}}, + 'retries_left': 5}, ])