diff --git a/swh/indexer/__init__.py b/swh/indexer/__init__.py --- a/swh/indexer/__init__.py +++ b/swh/indexer/__init__.py @@ -2,30 +2,3 @@ # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information - - -INDEXER_CLASSES = { - 'indexer_mimetype': 'swh.indexer.mimetype.ContentMimetypeIndexer', - 'indexer_language': 'swh.indexer.language.ContentLanguageIndexer', - 'indexer_ctags': 'swh.indexer.ctags.CtagsIndexer', - 'indexer_fossology_license': - 'swh.indexer.fossology_license.ContentFossologyLicenseIndexer', -} - - -TASK_NAMES = { - 'indexer_orchestrator_all': 'swh.indexer.tasks.OrchestratorAllContents', - 'indexer_orchestrator_text': 'swh.indexer.tasks.OrchestratorTextContents', - 'indexer_mimetype': 'swh.indexer.tasks.ContentMimetype', - 'indexer_language': 'swh.indexer.tasks.ContentLanguage', - 'indexer_ctags': 'swh.indexer.tasks.Ctags', - 'indexer_fossology_license': 'swh.indexer.tasks.ContentFossologyLicense', - 'indexer_rehash': 'swh.indexer.tasks.RecomputeChecksums', - 'indexer_revision_metadata': 'swh.indexer.tasks.RevisionMetadata', - 'indexer_origin_intrinsic_metadata': 'swh.indexer.tasks.OriginMetadata', -} - - -__all__ = [ - 'INDEXER_CLASSES', 'TASK_NAMES', -] diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -93,9 +93,7 @@ Then you need to implement the following functions: :func:`filter`: - filter out data already indexed (in storage). This function is used by - the orchestrator and not directly by the indexer - (cf. swh.indexer.orchestrator.BaseOrchestratorIndexer). + filter out data already indexed (in storage). :func:`index_object`: compute index on id with data (retrieved from the storage or the diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py deleted file mode 100644 --- a/swh/indexer/orchestrator.py +++ /dev/null @@ -1,143 +0,0 @@ -# Copyright (C) 2016-2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import random - -from swh.core.config import SWHConfig -from swh.core.utils import grouper -from swh.scheduler import utils -from swh.scheduler import get_scheduler -from swh.scheduler.utils import create_task_dict - - -def get_class(clazz): - """Get a symbol class dynamically by its fully qualified name string - representation. - - """ - parts = clazz.split('.') - module = '.'.join(parts[:-1]) - m = __import__(module) - for comp in parts[1:]: - m = getattr(m, comp) - return m - - -class BaseOrchestratorIndexer(SWHConfig): - """The indexer orchestrator is in charge of dispatching batch of - contents (filtered or not based on presence) to indexers. - - That dispatch is indexer specific, so the configuration reflects it: - - - when `check_presence` flag is true, filter out the - contents already present for that indexer, otherwise send - everything - - - broadcast those (filtered or not) contents to indexers in a - `batch_size` fashioned - - For example:: - - indexers: - indexer_mimetype: - batch_size: 10 - check_presence: false - indexer_language: - batch_size: 2 - check_presence: true - - means: - - - send all contents received as batch of size 10 to the 'mimetype' indexer - - send only unknown contents as batch of size 2 to the 'language' indexer. - - """ - CONFIG_BASE_FILENAME = 'indexer/orchestrator' - - # Overridable in child classes. - from . import TASK_NAMES, INDEXER_CLASSES - - DEFAULT_CONFIG = { - 'scheduler': { - 'cls': 'remote', - 'args': { - 'url': 'http://localhost:5008', - }, - }, - 'indexers': ('dict', { - 'indexer_mimetype': { - 'batch_size': 10, - 'check_presence': True, - }, - }), - } - - def __init__(self): - super().__init__() - self.config = self.parse_config_file() - self.prepare_tasks() - self.prepare_scheduler() - - def prepare_scheduler(self): - self.scheduler = get_scheduler(**self.config['scheduler']) - - def prepare_tasks(self): - indexer_names = list(self.config['indexers']) - random.shuffle(indexer_names) - indexers = {} - tasks = {} - for name in indexer_names: - if name not in self.TASK_NAMES: - raise ValueError('%s must be one of %s' % ( - name, ', '.join(self.TASK_NAMES))) - - opts = self.config['indexers'][name] - indexers[name] = ( - self.INDEXER_CLASSES[name], - opts['check_presence'], - opts['batch_size']) - tasks[name] = utils.get_task(self.TASK_NAMES[name]) - - self.indexers = indexers - self.tasks = tasks - - def run(self, ids): - for task_name, task_attrs in self.indexers.items(): - (idx_class, filtering, batch_size) = task_attrs - if filtering: - policy_update = 'ignore-dups' - indexer_class = get_class(idx_class) - ids_filtered = list(indexer_class().filter(ids)) - if not ids_filtered: - continue - else: - policy_update = 'update-dups' - ids_filtered = ids - - tasks = [] - for ids_to_send in grouper(ids_filtered, batch_size): - tasks.append(create_task_dict( - task_name, - 'oneshot', - ids=list(ids_to_send), - policy_update=policy_update, - )) - self._create_tasks(tasks) - - def _create_tasks(self, tasks): - self.scheduler.create_tasks(tasks) - - -class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer): - """Orchestrator which deals with batch of any types of contents. - - """ - - -class OrchestratorTextContentsIndexer(BaseOrchestratorIndexer): - """Orchestrator which deals with batch of text contents. - - """ - CONFIG_BASE_FILENAME = 'indexer/orchestrator_text' diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py --- a/swh/indexer/origin_head.py +++ b/swh/indexer/origin_head.py @@ -38,7 +38,7 @@ def persist_index_computations(self, results, policy_update): """Do nothing. The indexer's results are not persistent, they - should only be piped to another indexer via the orchestrator.""" + should only be piped to another indexer.""" pass def next_step(self, results, task): diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -7,8 +7,6 @@ from swh.scheduler.task import Task as SchedulerTask -from .orchestrator import OrchestratorAllContentsIndexer -from .orchestrator import OrchestratorTextContentsIndexer from .mimetype import ContentMimetypeIndexer from .language import ContentLanguageIndexer from .ctags import CtagsIndexer @@ -27,26 +25,6 @@ return indexer -class OrchestratorAllContents(Task): - """Main task in charge of reading batch contents (of any type) and - broadcasting them back to other tasks. - - """ - task_queue = 'swh_indexer_orchestrator_content_all' - - Indexer = OrchestratorAllContentsIndexer - - -class OrchestratorTextContents(Task): - """Main task in charge of reading batch contents (of type text) and - broadcasting them back to other tasks. - - """ - task_queue = 'swh_indexer_orchestrator_content_text' - - Indexer = OrchestratorTextContentsIndexer - - class RevisionMetadata(Task): task_queue = 'swh_indexer_revision_metadata' diff --git a/swh/indexer/tests/test_orchestrator.py b/swh/indexer/tests/test_orchestrator.py deleted file mode 100644 --- a/swh/indexer/tests/test_orchestrator.py +++ /dev/null @@ -1,210 +0,0 @@ -# Copyright (C) 2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import unittest - -import celery - -from swh.indexer.orchestrator import BaseOrchestratorIndexer -from swh.indexer.indexer import BaseIndexer -from swh.indexer.tests.test_utils import MockIndexerStorage, MockStorage -from swh.scheduler.tests.scheduler_testing import SchedulerTestFixture - - -class BaseTestIndexer(BaseIndexer): - ADDITIONAL_CONFIG = { - 'tools': ('dict', { - 'name': 'foo', - 'version': 'bar', - 'configuration': {} - }), - } - - def prepare(self): - self.idx_storage = MockIndexerStorage() - self.storage = MockStorage() - - def check(self): - pass - - def filter(self, ids): - self.filtered.append(ids) - return ids - - def run(self, ids, policy_update): - return self.index(ids) - - def index(self, ids): - self.indexed.append(ids) - return [id_ + '_indexed_by_' + self.__class__.__name__ - for id_ in ids] - - def persist_index_computations(self, result, policy_update): - self.persisted = result - - -class Indexer1(BaseTestIndexer): - filtered = [] - indexed = [] - - def filter(self, ids): - return super().filter([id_ for id_ in ids if '1' in id_]) - - -class Indexer2(BaseTestIndexer): - filtered = [] - indexed = [] - - def filter(self, ids): - return super().filter([id_ for id_ in ids if '2' in id_]) - - -class Indexer3(BaseTestIndexer): - filtered = [] - indexed = [] - - def filter(self, ids): - return super().filter([id_ for id_ in ids if '3' in id_]) - - -@celery.task -def indexer1_task(*args, **kwargs): - return Indexer1().run(*args, **kwargs) - - -@celery.task -def indexer2_task(*args, **kwargs): - return Indexer2().run(*args, **kwargs) - - -@celery.task -def indexer3_task(self, *args, **kwargs): - return Indexer3().run(*args, **kwargs) - - -class BaseTestOrchestrator12(BaseOrchestratorIndexer): - TASK_NAMES = { - 'indexer1': 'swh.indexer.tests.test_orchestrator.indexer1_task', - 'indexer2': 'swh.indexer.tests.test_orchestrator.indexer2_task', - 'indexer3': 'swh.indexer.tests.test_orchestrator.indexer3_task', - } - - INDEXER_CLASSES = { - 'indexer1': 'swh.indexer.tests.test_orchestrator.Indexer1', - 'indexer2': 'swh.indexer.tests.test_orchestrator.Indexer2', - 'indexer3': 'swh.indexer.tests.test_orchestrator.Indexer3', - } - - def __init__(self): - super().__init__() - self.running_tasks = [] - - def parse_config_file(self): - return { - 'scheduler': { - 'cls': 'remote', - 'args': { - 'url': 'http://localhost:9999', - }, - }, - 'indexers': { - 'indexer1': { - 'batch_size': 2, - 'check_presence': True, - }, - 'indexer2': { - 'batch_size': 2, - 'check_presence': True, - }, - } - } - - -class MockedTestOrchestrator12(BaseTestOrchestrator12): - def __init__(self): - super().__init__() - self.created_tasks = [] - - def _create_tasks(self, celery_tasks): - self.created_tasks.extend(celery_tasks) - - def prepare_scheduler(self): - pass - - -class OrchestratorTest(SchedulerTestFixture, unittest.TestCase): - def setUp(self): - super().setUp() - self.add_scheduler_task_type( - 'indexer1', - 'swh.indexer.tests.test_orchestrator.indexer1_task') - self.add_scheduler_task_type( - 'indexer2', - 'swh.indexer.tests.test_orchestrator.indexer2_task') - - def test_orchestrator_filter(self): - o = BaseTestOrchestrator12() - o.scheduler = self.scheduler - o.run(['id12', 'id2']) - self.assertEqual(Indexer2.indexed, []) - self.assertEqual(Indexer1.indexed, []) - self.run_ready_tasks() - self.assertEqual(Indexer2.indexed, [['id12', 'id2']]) - self.assertEqual(Indexer1.indexed, [['id12']]) - - -class MockedOrchestratorTest(unittest.TestCase): - maxDiff = None - - def test_mocked_orchestrator_filter(self): - o = MockedTestOrchestrator12() - o.run(['id12', 'id2']) - for task in o.created_tasks: - del task['next_run'] # not worth the trouble testing it properly - self.assertCountEqual(o.created_tasks, [ - {'type': 'indexer1', - 'arguments': { - 'args': [], - 'kwargs': { - 'ids': ['id12'], - 'policy_update': 'ignore-dups'}}, - 'policy': 'oneshot'}, - {'type': 'indexer2', - 'arguments': { - 'args': [], - 'kwargs': { - 'ids': ['id12', 'id2'], - 'policy_update': 'ignore-dups'}}, - 'policy': 'oneshot'}, - ]) - - def test_mocked_orchestrator_batch(self): - o = MockedTestOrchestrator12() - o.run(['id12', 'id2a', 'id2b', 'id2c']) - for task in o.created_tasks: - del task['next_run'] # not worth the trouble testing it properly - self.assertCountEqual(o.created_tasks, [ - {'type': 'indexer1', - 'arguments': { - 'args': [], - 'kwargs': { - 'ids': ['id12'], - 'policy_update': 'ignore-dups'}}, - 'policy': 'oneshot'}, - {'type': 'indexer2', - 'arguments': { - 'args': [], - 'kwargs': { - 'ids': ['id12', 'id2a'], - 'policy_update': 'ignore-dups'}}, - 'policy': 'oneshot'}, - {'type': 'indexer2', - 'arguments': { - 'args': [], - 'kwargs': { - 'ids': ['id2b', 'id2c'], - 'policy_update': 'ignore-dups'}}, - 'policy': 'oneshot'}, - ])