diff --git a/README.md b/README.md --- a/README.md +++ b/README.md @@ -11,81 +11,36 @@ - revision: - metadata -## Context +An indexer is in charge of: +- looking up objects +- extracting information from those objects +- store those information in the swh-indexer db -SWH has currently stored around 5B contents. The table `content` -holds their checksums. - -Those contents are physically stored in an object storage (using -disks) and replicated in another. Those object storages are not -destined for reading yet. - -We are in the process to copy those contents over to azure's blob -storages. As such, we will use that opportunity to trigger the -computations on these contents once those have been copied over. - - -## Workers - -There are two types of workers: -- orchestrators (orchestrator, orchestrator-text) -- indexer (mimetype, language, ctags, fossology-license) - -### Orchestrator - - -The orchestrator is in charge of dispatching a batch of sha1 hashes to -different indexers. - -Orchestration procedure: -- receive batch of sha1s -- split those batches into groups (according to setup) -- broadcast those group to indexers - -There are two types of orchestrators: - -- orchestrator (swh_indexer_orchestrator_content_all): Receives and - broadcast sha1 ids (of contents) to indexers (currently only the - mimetype indexer) - -- orchestrator-text (swh_indexer_orchestrator_content_text): Receives - batch of sha1 ids (of textual contents) and broadcast those to - indexers (currently language, ctags, and fossology-license - indexers). - - -### Indexers - - -An indexer is in charge of the content retrieval and indexation of the -extracted information in the swh-indexer db. - -There are two types of indexers: +There are multiple indexers working on different object types: - content indexer: works with content sha1 hashes - revision indexer: works with revision sha1 hashes + - origin indexer: works with origin identifiers Indexation procedure: - receive batch of ids - retrieve the associated data depending on object type - compute for that object some index - store the result to swh's storage -- (and possibly do some broadcast itself) Current content indexers: -- mimetype (queue swh_indexer_content_mimetype): compute the mimetype, - filter out the textual contents and broadcast the list to the - orchestrator-text +- mimetype (queue swh_indexer_content_mimetype): detect the encoding + and mimetype -- language (queue swh_indexer_content_language): detect the programming language +- language (queue swh_indexer_content_language): detect the + programming language -- ctags (queue swh_indexer_content_ctags): try and compute tags - information +- ctags (queue swh_indexer_content_ctags): compute tags information -- fossology-license (queue swh_indexer_fossology_license): try and - compute the license +- fossology-license (queue swh_indexer_fossology_license): compute the + license -- metadata : translate file into translated_metadata dict +- metadata: translate file into translated_metadata dict Current revision indexers: diff --git a/swh/indexer/__init__.py b/swh/indexer/__init__.py --- a/swh/indexer/__init__.py +++ b/swh/indexer/__init__.py @@ -2,30 +2,3 @@ # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information - - -INDEXER_CLASSES = { - 'indexer_mimetype': 'swh.indexer.mimetype.ContentMimetypeIndexer', - 'indexer_language': 'swh.indexer.language.ContentLanguageIndexer', - 'indexer_ctags': 'swh.indexer.ctags.CtagsIndexer', - 'indexer_fossology_license': - 'swh.indexer.fossology_license.ContentFossologyLicenseIndexer', -} - - -TASK_NAMES = { - 'indexer_orchestrator_all': 'swh.indexer.tasks.OrchestratorAllContents', - 'indexer_orchestrator_text': 'swh.indexer.tasks.OrchestratorTextContents', - 'indexer_mimetype': 'swh.indexer.tasks.ContentMimetype', - 'indexer_language': 'swh.indexer.tasks.ContentLanguage', - 'indexer_ctags': 'swh.indexer.tasks.Ctags', - 'indexer_fossology_license': 'swh.indexer.tasks.ContentFossologyLicense', - 'indexer_rehash': 'swh.indexer.tasks.RecomputeChecksums', - 'indexer_revision_metadata': 'swh.indexer.tasks.RevisionMetadata', - 'indexer_origin_intrinsic_metadata': 'swh.indexer.tasks.OriginMetadata', -} - - -__all__ = [ - 'INDEXER_CLASSES', 'TASK_NAMES', -] diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -93,9 +93,7 @@ Then you need to implement the following functions: :func:`filter`: - filter out data already indexed (in storage). This function is used by - the orchestrator and not directly by the indexer - (cf. swh.indexer.orchestrator.BaseOrchestratorIndexer). + filter out data already indexed (in storage). :func:`index_object`: compute index on id with data (retrieved from the storage or the diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py deleted file mode 100644 --- a/swh/indexer/orchestrator.py +++ /dev/null @@ -1,143 +0,0 @@ -# Copyright (C) 2016-2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import random - -from swh.core.config import SWHConfig -from swh.core.utils import grouper -from swh.scheduler import utils -from swh.scheduler import get_scheduler -from swh.scheduler.utils import create_task_dict - - -def get_class(clazz): - """Get a symbol class dynamically by its fully qualified name string - representation. - - """ - parts = clazz.split('.') - module = '.'.join(parts[:-1]) - m = __import__(module) - for comp in parts[1:]: - m = getattr(m, comp) - return m - - -class BaseOrchestratorIndexer(SWHConfig): - """The indexer orchestrator is in charge of dispatching batch of - contents (filtered or not based on presence) to indexers. - - That dispatch is indexer specific, so the configuration reflects it: - - - when `check_presence` flag is true, filter out the - contents already present for that indexer, otherwise send - everything - - - broadcast those (filtered or not) contents to indexers in a - `batch_size` fashioned - - For example:: - - indexers: - indexer_mimetype: - batch_size: 10 - check_presence: false - indexer_language: - batch_size: 2 - check_presence: true - - means: - - - send all contents received as batch of size 10 to the 'mimetype' indexer - - send only unknown contents as batch of size 2 to the 'language' indexer. - - """ - CONFIG_BASE_FILENAME = 'indexer/orchestrator' - - # Overridable in child classes. - from . import TASK_NAMES, INDEXER_CLASSES - - DEFAULT_CONFIG = { - 'scheduler': { - 'cls': 'remote', - 'args': { - 'url': 'http://localhost:5008', - }, - }, - 'indexers': ('dict', { - 'indexer_mimetype': { - 'batch_size': 10, - 'check_presence': True, - }, - }), - } - - def __init__(self): - super().__init__() - self.config = self.parse_config_file() - self.prepare_tasks() - self.prepare_scheduler() - - def prepare_scheduler(self): - self.scheduler = get_scheduler(**self.config['scheduler']) - - def prepare_tasks(self): - indexer_names = list(self.config['indexers']) - random.shuffle(indexer_names) - indexers = {} - tasks = {} - for name in indexer_names: - if name not in self.TASK_NAMES: - raise ValueError('%s must be one of %s' % ( - name, ', '.join(self.TASK_NAMES))) - - opts = self.config['indexers'][name] - indexers[name] = ( - self.INDEXER_CLASSES[name], - opts['check_presence'], - opts['batch_size']) - tasks[name] = utils.get_task(self.TASK_NAMES[name]) - - self.indexers = indexers - self.tasks = tasks - - def run(self, ids): - for task_name, task_attrs in self.indexers.items(): - (idx_class, filtering, batch_size) = task_attrs - if filtering: - policy_update = 'ignore-dups' - indexer_class = get_class(idx_class) - ids_filtered = list(indexer_class().filter(ids)) - if not ids_filtered: - continue - else: - policy_update = 'update-dups' - ids_filtered = ids - - tasks = [] - for ids_to_send in grouper(ids_filtered, batch_size): - tasks.append(create_task_dict( - task_name, - 'oneshot', - ids=list(ids_to_send), - policy_update=policy_update, - )) - self._create_tasks(tasks) - - def _create_tasks(self, tasks): - self.scheduler.create_tasks(tasks) - - -class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer): - """Orchestrator which deals with batch of any types of contents. - - """ - - -class OrchestratorTextContentsIndexer(BaseOrchestratorIndexer): - """Orchestrator which deals with batch of text contents. - - """ - CONFIG_BASE_FILENAME = 'indexer/orchestrator_text' diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py --- a/swh/indexer/origin_head.py +++ b/swh/indexer/origin_head.py @@ -38,7 +38,7 @@ def persist_index_computations(self, results, policy_update): """Do nothing. The indexer's results are not persistent, they - should only be piped to another indexer via the orchestrator.""" + should only be piped to another indexer.""" pass def next_step(self, results, task): diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -7,8 +7,6 @@ from swh.scheduler.task import Task as SchedulerTask -from .orchestrator import OrchestratorAllContentsIndexer -from .orchestrator import OrchestratorTextContentsIndexer from .mimetype import ContentMimetypeIndexer from .language import ContentLanguageIndexer from .ctags import CtagsIndexer @@ -27,26 +25,6 @@ return indexer -class OrchestratorAllContents(Task): - """Main task in charge of reading batch contents (of any type) and - broadcasting them back to other tasks. - - """ - task_queue = 'swh_indexer_orchestrator_content_all' - - Indexer = OrchestratorAllContentsIndexer - - -class OrchestratorTextContents(Task): - """Main task in charge of reading batch contents (of type text) and - broadcasting them back to other tasks. - - """ - task_queue = 'swh_indexer_orchestrator_content_text' - - Indexer = OrchestratorTextContentsIndexer - - class RevisionMetadata(Task): task_queue = 'swh_indexer_revision_metadata' diff --git a/swh/indexer/tests/test_orchestrator.py b/swh/indexer/tests/test_orchestrator.py deleted file mode 100644 --- a/swh/indexer/tests/test_orchestrator.py +++ /dev/null @@ -1,210 +0,0 @@ -# Copyright (C) 2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import unittest - -import celery - -from swh.indexer.orchestrator import BaseOrchestratorIndexer -from swh.indexer.indexer import BaseIndexer -from swh.indexer.tests.test_utils import MockIndexerStorage, MockStorage -from swh.scheduler.tests.scheduler_testing import SchedulerTestFixture - - -class BaseTestIndexer(BaseIndexer): - ADDITIONAL_CONFIG = { - 'tools': ('dict', { - 'name': 'foo', - 'version': 'bar', - 'configuration': {} - }), - } - - def prepare(self): - self.idx_storage = MockIndexerStorage() - self.storage = MockStorage() - - def check(self): - pass - - def filter(self, ids): - self.filtered.append(ids) - return ids - - def run(self, ids, policy_update): - return self.index(ids) - - def index(self, ids): - self.indexed.append(ids) - return [id_ + '_indexed_by_' + self.__class__.__name__ - for id_ in ids] - - def persist_index_computations(self, result, policy_update): - self.persisted = result - - -class Indexer1(BaseTestIndexer): - filtered = [] - indexed = [] - - def filter(self, ids): - return super().filter([id_ for id_ in ids if '1' in id_]) - - -class Indexer2(BaseTestIndexer): - filtered = [] - indexed = [] - - def filter(self, ids): - return super().filter([id_ for id_ in ids if '2' in id_]) - - -class Indexer3(BaseTestIndexer): - filtered = [] - indexed = [] - - def filter(self, ids): - return super().filter([id_ for id_ in ids if '3' in id_]) - - -@celery.task -def indexer1_task(*args, **kwargs): - return Indexer1().run(*args, **kwargs) - - -@celery.task -def indexer2_task(*args, **kwargs): - return Indexer2().run(*args, **kwargs) - - -@celery.task -def indexer3_task(self, *args, **kwargs): - return Indexer3().run(*args, **kwargs) - - -class BaseTestOrchestrator12(BaseOrchestratorIndexer): - TASK_NAMES = { - 'indexer1': 'swh.indexer.tests.test_orchestrator.indexer1_task', - 'indexer2': 'swh.indexer.tests.test_orchestrator.indexer2_task', - 'indexer3': 'swh.indexer.tests.test_orchestrator.indexer3_task', - } - - INDEXER_CLASSES = { - 'indexer1': 'swh.indexer.tests.test_orchestrator.Indexer1', - 'indexer2': 'swh.indexer.tests.test_orchestrator.Indexer2', - 'indexer3': 'swh.indexer.tests.test_orchestrator.Indexer3', - } - - def __init__(self): - super().__init__() - self.running_tasks = [] - - def parse_config_file(self): - return { - 'scheduler': { - 'cls': 'remote', - 'args': { - 'url': 'http://localhost:9999', - }, - }, - 'indexers': { - 'indexer1': { - 'batch_size': 2, - 'check_presence': True, - }, - 'indexer2': { - 'batch_size': 2, - 'check_presence': True, - }, - } - } - - -class MockedTestOrchestrator12(BaseTestOrchestrator12): - def __init__(self): - super().__init__() - self.created_tasks = [] - - def _create_tasks(self, celery_tasks): - self.created_tasks.extend(celery_tasks) - - def prepare_scheduler(self): - pass - - -class OrchestratorTest(SchedulerTestFixture, unittest.TestCase): - def setUp(self): - super().setUp() - self.add_scheduler_task_type( - 'indexer1', - 'swh.indexer.tests.test_orchestrator.indexer1_task') - self.add_scheduler_task_type( - 'indexer2', - 'swh.indexer.tests.test_orchestrator.indexer2_task') - - def test_orchestrator_filter(self): - o = BaseTestOrchestrator12() - o.scheduler = self.scheduler - o.run(['id12', 'id2']) - self.assertEqual(Indexer2.indexed, []) - self.assertEqual(Indexer1.indexed, []) - self.run_ready_tasks() - self.assertEqual(Indexer2.indexed, [['id12', 'id2']]) - self.assertEqual(Indexer1.indexed, [['id12']]) - - -class MockedOrchestratorTest(unittest.TestCase): - maxDiff = None - - def test_mocked_orchestrator_filter(self): - o = MockedTestOrchestrator12() - o.run(['id12', 'id2']) - for task in o.created_tasks: - del task['next_run'] # not worth the trouble testing it properly - self.assertCountEqual(o.created_tasks, [ - {'type': 'indexer1', - 'arguments': { - 'args': [], - 'kwargs': { - 'ids': ['id12'], - 'policy_update': 'ignore-dups'}}, - 'policy': 'oneshot'}, - {'type': 'indexer2', - 'arguments': { - 'args': [], - 'kwargs': { - 'ids': ['id12', 'id2'], - 'policy_update': 'ignore-dups'}}, - 'policy': 'oneshot'}, - ]) - - def test_mocked_orchestrator_batch(self): - o = MockedTestOrchestrator12() - o.run(['id12', 'id2a', 'id2b', 'id2c']) - for task in o.created_tasks: - del task['next_run'] # not worth the trouble testing it properly - self.assertCountEqual(o.created_tasks, [ - {'type': 'indexer1', - 'arguments': { - 'args': [], - 'kwargs': { - 'ids': ['id12'], - 'policy_update': 'ignore-dups'}}, - 'policy': 'oneshot'}, - {'type': 'indexer2', - 'arguments': { - 'args': [], - 'kwargs': { - 'ids': ['id12', 'id2a'], - 'policy_update': 'ignore-dups'}}, - 'policy': 'oneshot'}, - {'type': 'indexer2', - 'arguments': { - 'args': [], - 'kwargs': { - 'ids': ['id2b', 'id2c'], - 'policy_update': 'ignore-dups'}}, - 'policy': 'oneshot'}, - ])