diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py index 212b3e8..d63c696 100644 --- a/swh/indexer/orchestrator.py +++ b/swh/indexer/orchestrator.py @@ -1,134 +1,143 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random -from celery import group - from swh.core.config import SWHConfig from swh.core.utils import grouper from swh.scheduler import utils +from swh.scheduler import get_scheduler +from swh.scheduler.utils import create_task_dict def get_class(clazz): """Get a symbol class dynamically by its fully qualified name string representation. """ parts = clazz.split('.') module = '.'.join(parts[:-1]) m = __import__(module) for comp in parts[1:]: m = getattr(m, comp) return m class BaseOrchestratorIndexer(SWHConfig): """The indexer orchestrator is in charge of dispatching batch of contents (filtered or not based on presence) to indexers. That dispatch is indexer specific, so the configuration reflects it: - when `check_presence` flag is true, filter out the contents already present for that indexer, otherwise send everything - broadcast those (filtered or not) contents to indexers in a `batch_size` fashioned For example:: indexers: mimetype: batch_size: 10 check_presence: false language: batch_size: 2 check_presence: true means: - send all contents received as batch of size 10 to the 'mimetype' indexer - send only unknown contents as batch of size 2 to the 'language' indexer. """ CONFIG_BASE_FILENAME = 'indexer/orchestrator' # Overridable in child classes. from . import TASK_NAMES, INDEXER_CLASSES DEFAULT_CONFIG = { + 'scheduler': { + 'cls': 'remote', + 'args': { + 'url': 'http://localhost:5008', + }, + }, 'indexers': ('dict', { 'mimetype': { 'batch_size': 10, 'check_presence': True, }, }), } def __init__(self): super().__init__() self.config = self.parse_config_file() self.prepare_tasks() + self.prepare_scheduler() + + def prepare_scheduler(self): + self.scheduler = get_scheduler(**self.config['scheduler']) def prepare_tasks(self): indexer_names = list(self.config['indexers']) random.shuffle(indexer_names) indexers = {} tasks = {} for name in indexer_names: if name not in self.TASK_NAMES: raise ValueError('%s must be one of %s' % ( name, ', '.join(self.TASK_NAMES))) opts = self.config['indexers'][name] indexers[name] = ( self.INDEXER_CLASSES[name], opts['check_presence'], opts['batch_size']) tasks[name] = utils.get_task(self.TASK_NAMES[name]) self.indexers = indexers self.tasks = tasks def run(self, ids): - all_results = [] - for name, (idx_class, filtering, batch_size) in self.indexers.items(): + for task_name, task_attrs in self.indexers.items(): + (idx_class, filtering, batch_size) = task_attrs if filtering: policy_update = 'ignore-dups' indexer_class = get_class(idx_class) ids_filtered = list(indexer_class().filter(ids)) if not ids_filtered: continue else: policy_update = 'update-dups' ids_filtered = ids - celery_tasks = [] + tasks = [] for ids_to_send in grouper(ids_filtered, batch_size): - celery_task = self.tasks[name].s( + tasks.append(create_task_dict( + task_name, + 'oneshot', ids=list(ids_to_send), - policy_update=policy_update) - celery_tasks.append(celery_task) - - all_results.append(self._run_tasks(celery_tasks)) - - return all_results + policy_update=policy_update, + )) + self._create_tasks(tasks) - def _run_tasks(self, celery_tasks): - return group(celery_tasks).delay() + def _create_tasks(self, tasks): + self.scheduler.create_tasks(tasks) class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer): """Orchestrator which deals with batch of any types of contents. """ class OrchestratorTextContentsIndexer(BaseOrchestratorIndexer): """Orchestrator which deals with batch of text contents. """ CONFIG_BASE_FILENAME = 'indexer/orchestrator_text' diff --git a/swh/indexer/tests/test_orchestrator.py b/swh/indexer/tests/test_orchestrator.py index 8c9201b..6f6a013 100644 --- a/swh/indexer/tests/test_orchestrator.py +++ b/swh/indexer/tests/test_orchestrator.py @@ -1,195 +1,215 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest import celery from swh.indexer.orchestrator import BaseOrchestratorIndexer from swh.indexer.indexer import BaseIndexer from swh.indexer.tests.test_utils import MockIndexerStorage, MockStorage -from swh.scheduler.tests.celery_testing import CeleryTestFixture -from swh.indexer.tests import start_worker_thread +from swh.scheduler.tests.scheduler_testing import SchedulerTestFixture class BaseTestIndexer(BaseIndexer): ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'foo', 'version': 'bar', 'configuration': {} }), } def prepare(self): self.idx_storage = MockIndexerStorage() self.storage = MockStorage() def check(self): pass def filter(self, ids): self.filtered.append(ids) return ids def run(self, ids, policy_update): return self.index(ids) def index(self, ids): self.indexed.append(ids) return [id_ + '_indexed_by_' + self.__class__.__name__ for id_ in ids] def persist_index_computations(self, result, policy_update): self.persisted = result class Indexer1(BaseTestIndexer): filtered = [] indexed = [] def filter(self, ids): return super().filter([id_ for id_ in ids if '1' in id_]) class Indexer2(BaseTestIndexer): filtered = [] indexed = [] def filter(self, ids): return super().filter([id_ for id_ in ids if '2' in id_]) class Indexer3(BaseTestIndexer): filtered = [] indexed = [] def filter(self, ids): return super().filter([id_ for id_ in ids if '3' in id_]) @celery.task def indexer1_task(*args, **kwargs): return Indexer1().run(*args, **kwargs) @celery.task def indexer2_task(*args, **kwargs): return Indexer2().run(*args, **kwargs) @celery.task def indexer3_task(self, *args, **kwargs): return Indexer3().run(*args, **kwargs) class TestOrchestrator12(BaseOrchestratorIndexer): TASK_NAMES = { 'indexer1': 'swh.indexer.tests.test_orchestrator.indexer1_task', 'indexer2': 'swh.indexer.tests.test_orchestrator.indexer2_task', 'indexer3': 'swh.indexer.tests.test_orchestrator.indexer3_task', } INDEXER_CLASSES = { 'indexer1': 'swh.indexer.tests.test_orchestrator.Indexer1', 'indexer2': 'swh.indexer.tests.test_orchestrator.Indexer2', 'indexer3': 'swh.indexer.tests.test_orchestrator.Indexer3', } def __init__(self): super().__init__() self.running_tasks = [] def parse_config_file(self): return { + 'scheduler': { + 'cls': 'remote', + 'args': { + 'url': 'http://localhost:9999', + }, + }, 'indexers': { 'indexer1': { 'batch_size': 2, 'check_presence': True, }, 'indexer2': { 'batch_size': 2, 'check_presence': True, }, } } class MockedTestOrchestrator12(TestOrchestrator12): - def _run_tasks(self, celery_tasks): - self.running_tasks.extend(celery_tasks) + def __init__(self): + super().__init__() + self.created_tasks = [] + + def _create_tasks(self, celery_tasks): + self.created_tasks.extend(celery_tasks) + + def prepare_scheduler(self): + pass -class OrchestratorTest(CeleryTestFixture, unittest.TestCase): +class OrchestratorTest(SchedulerTestFixture, unittest.TestCase): + def setUp(self): + super().setUp() + self.add_scheduler_task_type( + 'indexer1', + 'swh.indexer.tests.test_orchestrator.indexer1_task') + self.add_scheduler_task_type( + 'indexer2', + 'swh.indexer.tests.test_orchestrator.indexer2_task') + def test_orchestrator_filter(self): - with start_worker_thread(): - o = TestOrchestrator12() - promises = o.run(['id12', 'id2']) - results = [] - for promise in reversed(promises): - results.append(promise.get(timeout=10)) - self.assertCountEqual( - results, - [[['id12_indexed_by_Indexer1']], - [['id12_indexed_by_Indexer2', - 'id2_indexed_by_Indexer2']]]) - self.assertEqual(Indexer2.indexed, [['id12', 'id2']]) - self.assertEqual(Indexer1.indexed, [['id12']]) + o = TestOrchestrator12() + o.scheduler = self.scheduler + o.run(['id12', 'id2']) + self.assertEqual(Indexer2.indexed, []) + self.assertEqual(Indexer1.indexed, []) + self.run_ready_tasks() + self.assertEqual(Indexer2.indexed, [['id12', 'id2']]) + self.assertEqual(Indexer1.indexed, [['id12']]) class MockedOrchestratorTest(unittest.TestCase): maxDiff = None def test_mocked_orchestrator_filter(self): o = MockedTestOrchestrator12() o.run(['id12', 'id2']) - self.assertCountEqual(o.running_tasks, [ - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.indexer1_task'}, - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12', 'id2'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, + for task in o.created_tasks: + del task['next_run'] # not worth the trouble testing it properly + self.assertCountEqual(o.created_tasks, [ + {'type': 'indexer1', + 'arguments': { + 'args': [], + 'kwargs': { + 'ids': ['id12'], + 'policy_update': 'ignore-dups'}}, + 'priority': None, + 'policy': 'oneshot'}, + {'type': 'indexer2', + 'arguments': { + 'args': [], + 'kwargs': { + 'ids': ['id12', 'id2'], + 'policy_update': 'ignore-dups'}}, + 'priority': None, + 'policy': 'oneshot'}, ]) def test_mocked_orchestrator_batch(self): o = MockedTestOrchestrator12() o.run(['id12', 'id2a', 'id2b', 'id2c']) - self.assertCountEqual(o.running_tasks, [ - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.indexer1_task'}, - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12', 'id2a'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id2b', 'id2c'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, + for task in o.created_tasks: + del task['next_run'] # not worth the trouble testing it properly + self.assertCountEqual(o.created_tasks, [ + {'type': 'indexer1', + 'arguments': { + 'args': [], + 'kwargs': { + 'ids': ['id12'], + 'policy_update': 'ignore-dups'}}, + 'priority': None, + 'policy': 'oneshot'}, + {'type': 'indexer2', + 'arguments': { + 'args': [], + 'kwargs': { + 'ids': ['id12', 'id2a'], + 'policy_update': 'ignore-dups'}}, + 'priority': None, + 'policy': 'oneshot'}, + {'type': 'indexer2', + 'arguments': { + 'args': [], + 'kwargs': { + 'ids': ['id2b', 'id2c'], + 'policy_update': 'ignore-dups'}}, + 'priority': None, + 'policy': 'oneshot'}, ])