Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7124450
D633.id1980.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
14 KB
Subscribers
None
D633.id1980.diff
View Options
diff --git a/swh/indexer/__init__.py b/swh/indexer/__init__.py
--- a/swh/indexer/__init__.py
+++ b/swh/indexer/__init__.py
@@ -2,30 +2,3 @@
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-
-
-INDEXER_CLASSES = {
- 'indexer_mimetype': 'swh.indexer.mimetype.ContentMimetypeIndexer',
- 'indexer_language': 'swh.indexer.language.ContentLanguageIndexer',
- 'indexer_ctags': 'swh.indexer.ctags.CtagsIndexer',
- 'indexer_fossology_license':
- 'swh.indexer.fossology_license.ContentFossologyLicenseIndexer',
-}
-
-
-TASK_NAMES = {
- 'indexer_orchestrator_all': 'swh.indexer.tasks.OrchestratorAllContents',
- 'indexer_orchestrator_text': 'swh.indexer.tasks.OrchestratorTextContents',
- 'indexer_mimetype': 'swh.indexer.tasks.ContentMimetype',
- 'indexer_language': 'swh.indexer.tasks.ContentLanguage',
- 'indexer_ctags': 'swh.indexer.tasks.Ctags',
- 'indexer_fossology_license': 'swh.indexer.tasks.ContentFossologyLicense',
- 'indexer_rehash': 'swh.indexer.tasks.RecomputeChecksums',
- 'indexer_revision_metadata': 'swh.indexer.tasks.RevisionMetadata',
- 'indexer_origin_intrinsic_metadata': 'swh.indexer.tasks.OriginMetadata',
-}
-
-
-__all__ = [
- 'INDEXER_CLASSES', 'TASK_NAMES',
-]
diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py
--- a/swh/indexer/indexer.py
+++ b/swh/indexer/indexer.py
@@ -93,9 +93,7 @@
Then you need to implement the following functions:
:func:`filter`:
- filter out data already indexed (in storage). This function is used by
- the orchestrator and not directly by the indexer
- (cf. swh.indexer.orchestrator.BaseOrchestratorIndexer).
+ filter out data already indexed (in storage).
:func:`index_object`:
compute index on id with data (retrieved from the storage or the
diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py
deleted file mode 100644
--- a/swh/indexer/orchestrator.py
+++ /dev/null
@@ -1,143 +0,0 @@
-# Copyright (C) 2016-2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import random
-
-from swh.core.config import SWHConfig
-from swh.core.utils import grouper
-from swh.scheduler import utils
-from swh.scheduler import get_scheduler
-from swh.scheduler.utils import create_task_dict
-
-
-def get_class(clazz):
- """Get a symbol class dynamically by its fully qualified name string
- representation.
-
- """
- parts = clazz.split('.')
- module = '.'.join(parts[:-1])
- m = __import__(module)
- for comp in parts[1:]:
- m = getattr(m, comp)
- return m
-
-
-class BaseOrchestratorIndexer(SWHConfig):
- """The indexer orchestrator is in charge of dispatching batch of
- contents (filtered or not based on presence) to indexers.
-
- That dispatch is indexer specific, so the configuration reflects it:
-
- - when `check_presence` flag is true, filter out the
- contents already present for that indexer, otherwise send
- everything
-
- - broadcast those (filtered or not) contents to indexers in a
- `batch_size` fashioned
-
- For example::
-
- indexers:
- indexer_mimetype:
- batch_size: 10
- check_presence: false
- indexer_language:
- batch_size: 2
- check_presence: true
-
- means:
-
- - send all contents received as batch of size 10 to the 'mimetype' indexer
- - send only unknown contents as batch of size 2 to the 'language' indexer.
-
- """
- CONFIG_BASE_FILENAME = 'indexer/orchestrator'
-
- # Overridable in child classes.
- from . import TASK_NAMES, INDEXER_CLASSES
-
- DEFAULT_CONFIG = {
- 'scheduler': {
- 'cls': 'remote',
- 'args': {
- 'url': 'http://localhost:5008',
- },
- },
- 'indexers': ('dict', {
- 'indexer_mimetype': {
- 'batch_size': 10,
- 'check_presence': True,
- },
- }),
- }
-
- def __init__(self):
- super().__init__()
- self.config = self.parse_config_file()
- self.prepare_tasks()
- self.prepare_scheduler()
-
- def prepare_scheduler(self):
- self.scheduler = get_scheduler(**self.config['scheduler'])
-
- def prepare_tasks(self):
- indexer_names = list(self.config['indexers'])
- random.shuffle(indexer_names)
- indexers = {}
- tasks = {}
- for name in indexer_names:
- if name not in self.TASK_NAMES:
- raise ValueError('%s must be one of %s' % (
- name, ', '.join(self.TASK_NAMES)))
-
- opts = self.config['indexers'][name]
- indexers[name] = (
- self.INDEXER_CLASSES[name],
- opts['check_presence'],
- opts['batch_size'])
- tasks[name] = utils.get_task(self.TASK_NAMES[name])
-
- self.indexers = indexers
- self.tasks = tasks
-
- def run(self, ids):
- for task_name, task_attrs in self.indexers.items():
- (idx_class, filtering, batch_size) = task_attrs
- if filtering:
- policy_update = 'ignore-dups'
- indexer_class = get_class(idx_class)
- ids_filtered = list(indexer_class().filter(ids))
- if not ids_filtered:
- continue
- else:
- policy_update = 'update-dups'
- ids_filtered = ids
-
- tasks = []
- for ids_to_send in grouper(ids_filtered, batch_size):
- tasks.append(create_task_dict(
- task_name,
- 'oneshot',
- ids=list(ids_to_send),
- policy_update=policy_update,
- ))
- self._create_tasks(tasks)
-
- def _create_tasks(self, tasks):
- self.scheduler.create_tasks(tasks)
-
-
-class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer):
- """Orchestrator which deals with batch of any types of contents.
-
- """
-
-
-class OrchestratorTextContentsIndexer(BaseOrchestratorIndexer):
- """Orchestrator which deals with batch of text contents.
-
- """
- CONFIG_BASE_FILENAME = 'indexer/orchestrator_text'
diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py
--- a/swh/indexer/origin_head.py
+++ b/swh/indexer/origin_head.py
@@ -38,7 +38,7 @@
def persist_index_computations(self, results, policy_update):
"""Do nothing. The indexer's results are not persistent, they
- should only be piped to another indexer via the orchestrator."""
+ should only be piped to another indexer."""
pass
def next_step(self, results, task):
diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py
--- a/swh/indexer/tasks.py
+++ b/swh/indexer/tasks.py
@@ -7,8 +7,6 @@
from swh.scheduler.task import Task as SchedulerTask
-from .orchestrator import OrchestratorAllContentsIndexer
-from .orchestrator import OrchestratorTextContentsIndexer
from .mimetype import ContentMimetypeIndexer
from .language import ContentLanguageIndexer
from .ctags import CtagsIndexer
@@ -27,26 +25,6 @@
return indexer
-class OrchestratorAllContents(Task):
- """Main task in charge of reading batch contents (of any type) and
- broadcasting them back to other tasks.
-
- """
- task_queue = 'swh_indexer_orchestrator_content_all'
-
- Indexer = OrchestratorAllContentsIndexer
-
-
-class OrchestratorTextContents(Task):
- """Main task in charge of reading batch contents (of type text) and
- broadcasting them back to other tasks.
-
- """
- task_queue = 'swh_indexer_orchestrator_content_text'
-
- Indexer = OrchestratorTextContentsIndexer
-
-
class RevisionMetadata(Task):
task_queue = 'swh_indexer_revision_metadata'
diff --git a/swh/indexer/tests/test_orchestrator.py b/swh/indexer/tests/test_orchestrator.py
deleted file mode 100644
--- a/swh/indexer/tests/test_orchestrator.py
+++ /dev/null
@@ -1,210 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import unittest
-
-import celery
-
-from swh.indexer.orchestrator import BaseOrchestratorIndexer
-from swh.indexer.indexer import BaseIndexer
-from swh.indexer.tests.test_utils import MockIndexerStorage, MockStorage
-from swh.scheduler.tests.scheduler_testing import SchedulerTestFixture
-
-
-class BaseTestIndexer(BaseIndexer):
- ADDITIONAL_CONFIG = {
- 'tools': ('dict', {
- 'name': 'foo',
- 'version': 'bar',
- 'configuration': {}
- }),
- }
-
- def prepare(self):
- self.idx_storage = MockIndexerStorage()
- self.storage = MockStorage()
-
- def check(self):
- pass
-
- def filter(self, ids):
- self.filtered.append(ids)
- return ids
-
- def run(self, ids, policy_update):
- return self.index(ids)
-
- def index(self, ids):
- self.indexed.append(ids)
- return [id_ + '_indexed_by_' + self.__class__.__name__
- for id_ in ids]
-
- def persist_index_computations(self, result, policy_update):
- self.persisted = result
-
-
-class Indexer1(BaseTestIndexer):
- filtered = []
- indexed = []
-
- def filter(self, ids):
- return super().filter([id_ for id_ in ids if '1' in id_])
-
-
-class Indexer2(BaseTestIndexer):
- filtered = []
- indexed = []
-
- def filter(self, ids):
- return super().filter([id_ for id_ in ids if '2' in id_])
-
-
-class Indexer3(BaseTestIndexer):
- filtered = []
- indexed = []
-
- def filter(self, ids):
- return super().filter([id_ for id_ in ids if '3' in id_])
-
-
-@celery.task
-def indexer1_task(*args, **kwargs):
- return Indexer1().run(*args, **kwargs)
-
-
-@celery.task
-def indexer2_task(*args, **kwargs):
- return Indexer2().run(*args, **kwargs)
-
-
-@celery.task
-def indexer3_task(self, *args, **kwargs):
- return Indexer3().run(*args, **kwargs)
-
-
-class BaseTestOrchestrator12(BaseOrchestratorIndexer):
- TASK_NAMES = {
- 'indexer1': 'swh.indexer.tests.test_orchestrator.indexer1_task',
- 'indexer2': 'swh.indexer.tests.test_orchestrator.indexer2_task',
- 'indexer3': 'swh.indexer.tests.test_orchestrator.indexer3_task',
- }
-
- INDEXER_CLASSES = {
- 'indexer1': 'swh.indexer.tests.test_orchestrator.Indexer1',
- 'indexer2': 'swh.indexer.tests.test_orchestrator.Indexer2',
- 'indexer3': 'swh.indexer.tests.test_orchestrator.Indexer3',
- }
-
- def __init__(self):
- super().__init__()
- self.running_tasks = []
-
- def parse_config_file(self):
- return {
- 'scheduler': {
- 'cls': 'remote',
- 'args': {
- 'url': 'http://localhost:9999',
- },
- },
- 'indexers': {
- 'indexer1': {
- 'batch_size': 2,
- 'check_presence': True,
- },
- 'indexer2': {
- 'batch_size': 2,
- 'check_presence': True,
- },
- }
- }
-
-
-class MockedTestOrchestrator12(BaseTestOrchestrator12):
- def __init__(self):
- super().__init__()
- self.created_tasks = []
-
- def _create_tasks(self, celery_tasks):
- self.created_tasks.extend(celery_tasks)
-
- def prepare_scheduler(self):
- pass
-
-
-class OrchestratorTest(SchedulerTestFixture, unittest.TestCase):
- def setUp(self):
- super().setUp()
- self.add_scheduler_task_type(
- 'indexer1',
- 'swh.indexer.tests.test_orchestrator.indexer1_task')
- self.add_scheduler_task_type(
- 'indexer2',
- 'swh.indexer.tests.test_orchestrator.indexer2_task')
-
- def test_orchestrator_filter(self):
- o = BaseTestOrchestrator12()
- o.scheduler = self.scheduler
- o.run(['id12', 'id2'])
- self.assertEqual(Indexer2.indexed, [])
- self.assertEqual(Indexer1.indexed, [])
- self.run_ready_tasks()
- self.assertEqual(Indexer2.indexed, [['id12', 'id2']])
- self.assertEqual(Indexer1.indexed, [['id12']])
-
-
-class MockedOrchestratorTest(unittest.TestCase):
- maxDiff = None
-
- def test_mocked_orchestrator_filter(self):
- o = MockedTestOrchestrator12()
- o.run(['id12', 'id2'])
- for task in o.created_tasks:
- del task['next_run'] # not worth the trouble testing it properly
- self.assertCountEqual(o.created_tasks, [
- {'type': 'indexer1',
- 'arguments': {
- 'args': [],
- 'kwargs': {
- 'ids': ['id12'],
- 'policy_update': 'ignore-dups'}},
- 'policy': 'oneshot'},
- {'type': 'indexer2',
- 'arguments': {
- 'args': [],
- 'kwargs': {
- 'ids': ['id12', 'id2'],
- 'policy_update': 'ignore-dups'}},
- 'policy': 'oneshot'},
- ])
-
- def test_mocked_orchestrator_batch(self):
- o = MockedTestOrchestrator12()
- o.run(['id12', 'id2a', 'id2b', 'id2c'])
- for task in o.created_tasks:
- del task['next_run'] # not worth the trouble testing it properly
- self.assertCountEqual(o.created_tasks, [
- {'type': 'indexer1',
- 'arguments': {
- 'args': [],
- 'kwargs': {
- 'ids': ['id12'],
- 'policy_update': 'ignore-dups'}},
- 'policy': 'oneshot'},
- {'type': 'indexer2',
- 'arguments': {
- 'args': [],
- 'kwargs': {
- 'ids': ['id12', 'id2a'],
- 'policy_update': 'ignore-dups'}},
- 'policy': 'oneshot'},
- {'type': 'indexer2',
- 'arguments': {
- 'args': [],
- 'kwargs': {
- 'ids': ['id2b', 'id2c'],
- 'policy_update': 'ignore-dups'}},
- 'policy': 'oneshot'},
- ])
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 21 2024, 11:25 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223189
Attached To
D633: indexer: Remove orchestrator layer
Event Timeline
Log In to Comment