Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123256
D633.id1961.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
17 KB
Subscribers
None
D633.id1961.diff
View Options
diff --git a/README.md b/README.md
--- a/README.md
+++ b/README.md
@@ -11,81 +11,36 @@
- revision:
- metadata
-## Context
+An indexer is in charge of:
+- looking up objects
+- extracting information from those objects
+- store those information in the swh-indexer db
-SWH has currently stored around 5B contents. The table `content`
-holds their checksums.
-
-Those contents are physically stored in an object storage (using
-disks) and replicated in another. Those object storages are not
-destined for reading yet.
-
-We are in the process to copy those contents over to azure's blob
-storages. As such, we will use that opportunity to trigger the
-computations on these contents once those have been copied over.
-
-
-## Workers
-
-There are two types of workers:
-- orchestrators (orchestrator, orchestrator-text)
-- indexer (mimetype, language, ctags, fossology-license)
-
-### Orchestrator
-
-
-The orchestrator is in charge of dispatching a batch of sha1 hashes to
-different indexers.
-
-Orchestration procedure:
-- receive batch of sha1s
-- split those batches into groups (according to setup)
-- broadcast those group to indexers
-
-There are two types of orchestrators:
-
-- orchestrator (swh_indexer_orchestrator_content_all): Receives and
- broadcast sha1 ids (of contents) to indexers (currently only the
- mimetype indexer)
-
-- orchestrator-text (swh_indexer_orchestrator_content_text): Receives
- batch of sha1 ids (of textual contents) and broadcast those to
- indexers (currently language, ctags, and fossology-license
- indexers).
-
-
-### Indexers
-
-
-An indexer is in charge of the content retrieval and indexation of the
-extracted information in the swh-indexer db.
-
-There are two types of indexers:
+There are multiple indexers working on different object types:
- content indexer: works with content sha1 hashes
- revision indexer: works with revision sha1 hashes
+ - origin indexer: works with origin identifiers
Indexation procedure:
- receive batch of ids
- retrieve the associated data depending on object type
- compute for that object some index
- store the result to swh's storage
-- (and possibly do some broadcast itself)
Current content indexers:
-- mimetype (queue swh_indexer_content_mimetype): compute the mimetype,
- filter out the textual contents and broadcast the list to the
- orchestrator-text
+- mimetype (queue swh_indexer_content_mimetype): detect the encoding
+ and mimetype
-- language (queue swh_indexer_content_language): detect the programming language
+- language (queue swh_indexer_content_language): detect the
+ programming language
-- ctags (queue swh_indexer_content_ctags): try and compute tags
- information
+- ctags (queue swh_indexer_content_ctags): compute tags information
-- fossology-license (queue swh_indexer_fossology_license): try and
- compute the license
+- fossology-license (queue swh_indexer_fossology_license): compute the
+ license
-- metadata : translate file into translated_metadata dict
+- metadata: translate file into translated_metadata dict
Current revision indexers:
diff --git a/swh/indexer/__init__.py b/swh/indexer/__init__.py
--- a/swh/indexer/__init__.py
+++ b/swh/indexer/__init__.py
@@ -2,30 +2,3 @@
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-
-
-INDEXER_CLASSES = {
- 'indexer_mimetype': 'swh.indexer.mimetype.ContentMimetypeIndexer',
- 'indexer_language': 'swh.indexer.language.ContentLanguageIndexer',
- 'indexer_ctags': 'swh.indexer.ctags.CtagsIndexer',
- 'indexer_fossology_license':
- 'swh.indexer.fossology_license.ContentFossologyLicenseIndexer',
-}
-
-
-TASK_NAMES = {
- 'indexer_orchestrator_all': 'swh.indexer.tasks.OrchestratorAllContents',
- 'indexer_orchestrator_text': 'swh.indexer.tasks.OrchestratorTextContents',
- 'indexer_mimetype': 'swh.indexer.tasks.ContentMimetype',
- 'indexer_language': 'swh.indexer.tasks.ContentLanguage',
- 'indexer_ctags': 'swh.indexer.tasks.Ctags',
- 'indexer_fossology_license': 'swh.indexer.tasks.ContentFossologyLicense',
- 'indexer_rehash': 'swh.indexer.tasks.RecomputeChecksums',
- 'indexer_revision_metadata': 'swh.indexer.tasks.RevisionMetadata',
- 'indexer_origin_intrinsic_metadata': 'swh.indexer.tasks.OriginMetadata',
-}
-
-
-__all__ = [
- 'INDEXER_CLASSES', 'TASK_NAMES',
-]
diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py
--- a/swh/indexer/indexer.py
+++ b/swh/indexer/indexer.py
@@ -93,9 +93,7 @@
Then you need to implement the following functions:
:func:`filter`:
- filter out data already indexed (in storage). This function is used by
- the orchestrator and not directly by the indexer
- (cf. swh.indexer.orchestrator.BaseOrchestratorIndexer).
+ filter out data already indexed (in storage).
:func:`index_object`:
compute index on id with data (retrieved from the storage or the
diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py
deleted file mode 100644
--- a/swh/indexer/orchestrator.py
+++ /dev/null
@@ -1,143 +0,0 @@
-# Copyright (C) 2016-2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import random
-
-from swh.core.config import SWHConfig
-from swh.core.utils import grouper
-from swh.scheduler import utils
-from swh.scheduler import get_scheduler
-from swh.scheduler.utils import create_task_dict
-
-
-def get_class(clazz):
- """Get a symbol class dynamically by its fully qualified name string
- representation.
-
- """
- parts = clazz.split('.')
- module = '.'.join(parts[:-1])
- m = __import__(module)
- for comp in parts[1:]:
- m = getattr(m, comp)
- return m
-
-
-class BaseOrchestratorIndexer(SWHConfig):
- """The indexer orchestrator is in charge of dispatching batch of
- contents (filtered or not based on presence) to indexers.
-
- That dispatch is indexer specific, so the configuration reflects it:
-
- - when `check_presence` flag is true, filter out the
- contents already present for that indexer, otherwise send
- everything
-
- - broadcast those (filtered or not) contents to indexers in a
- `batch_size` fashioned
-
- For example::
-
- indexers:
- indexer_mimetype:
- batch_size: 10
- check_presence: false
- indexer_language:
- batch_size: 2
- check_presence: true
-
- means:
-
- - send all contents received as batch of size 10 to the 'mimetype' indexer
- - send only unknown contents as batch of size 2 to the 'language' indexer.
-
- """
- CONFIG_BASE_FILENAME = 'indexer/orchestrator'
-
- # Overridable in child classes.
- from . import TASK_NAMES, INDEXER_CLASSES
-
- DEFAULT_CONFIG = {
- 'scheduler': {
- 'cls': 'remote',
- 'args': {
- 'url': 'http://localhost:5008',
- },
- },
- 'indexers': ('dict', {
- 'indexer_mimetype': {
- 'batch_size': 10,
- 'check_presence': True,
- },
- }),
- }
-
- def __init__(self):
- super().__init__()
- self.config = self.parse_config_file()
- self.prepare_tasks()
- self.prepare_scheduler()
-
- def prepare_scheduler(self):
- self.scheduler = get_scheduler(**self.config['scheduler'])
-
- def prepare_tasks(self):
- indexer_names = list(self.config['indexers'])
- random.shuffle(indexer_names)
- indexers = {}
- tasks = {}
- for name in indexer_names:
- if name not in self.TASK_NAMES:
- raise ValueError('%s must be one of %s' % (
- name, ', '.join(self.TASK_NAMES)))
-
- opts = self.config['indexers'][name]
- indexers[name] = (
- self.INDEXER_CLASSES[name],
- opts['check_presence'],
- opts['batch_size'])
- tasks[name] = utils.get_task(self.TASK_NAMES[name])
-
- self.indexers = indexers
- self.tasks = tasks
-
- def run(self, ids):
- for task_name, task_attrs in self.indexers.items():
- (idx_class, filtering, batch_size) = task_attrs
- if filtering:
- policy_update = 'ignore-dups'
- indexer_class = get_class(idx_class)
- ids_filtered = list(indexer_class().filter(ids))
- if not ids_filtered:
- continue
- else:
- policy_update = 'update-dups'
- ids_filtered = ids
-
- tasks = []
- for ids_to_send in grouper(ids_filtered, batch_size):
- tasks.append(create_task_dict(
- task_name,
- 'oneshot',
- ids=list(ids_to_send),
- policy_update=policy_update,
- ))
- self._create_tasks(tasks)
-
- def _create_tasks(self, tasks):
- self.scheduler.create_tasks(tasks)
-
-
-class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer):
- """Orchestrator which deals with batch of any types of contents.
-
- """
-
-
-class OrchestratorTextContentsIndexer(BaseOrchestratorIndexer):
- """Orchestrator which deals with batch of text contents.
-
- """
- CONFIG_BASE_FILENAME = 'indexer/orchestrator_text'
diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py
--- a/swh/indexer/origin_head.py
+++ b/swh/indexer/origin_head.py
@@ -38,7 +38,7 @@
def persist_index_computations(self, results, policy_update):
"""Do nothing. The indexer's results are not persistent, they
- should only be piped to another indexer via the orchestrator."""
+ should only be piped to another indexer."""
pass
def next_step(self, results, task):
diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py
--- a/swh/indexer/tasks.py
+++ b/swh/indexer/tasks.py
@@ -7,8 +7,6 @@
from swh.scheduler.task import Task as SchedulerTask
-from .orchestrator import OrchestratorAllContentsIndexer
-from .orchestrator import OrchestratorTextContentsIndexer
from .mimetype import ContentMimetypeIndexer
from .language import ContentLanguageIndexer
from .ctags import CtagsIndexer
@@ -27,26 +25,6 @@
return indexer
-class OrchestratorAllContents(Task):
- """Main task in charge of reading batch contents (of any type) and
- broadcasting them back to other tasks.
-
- """
- task_queue = 'swh_indexer_orchestrator_content_all'
-
- Indexer = OrchestratorAllContentsIndexer
-
-
-class OrchestratorTextContents(Task):
- """Main task in charge of reading batch contents (of type text) and
- broadcasting them back to other tasks.
-
- """
- task_queue = 'swh_indexer_orchestrator_content_text'
-
- Indexer = OrchestratorTextContentsIndexer
-
-
class RevisionMetadata(Task):
task_queue = 'swh_indexer_revision_metadata'
diff --git a/swh/indexer/tests/test_orchestrator.py b/swh/indexer/tests/test_orchestrator.py
deleted file mode 100644
--- a/swh/indexer/tests/test_orchestrator.py
+++ /dev/null
@@ -1,210 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import unittest
-
-import celery
-
-from swh.indexer.orchestrator import BaseOrchestratorIndexer
-from swh.indexer.indexer import BaseIndexer
-from swh.indexer.tests.test_utils import MockIndexerStorage, MockStorage
-from swh.scheduler.tests.scheduler_testing import SchedulerTestFixture
-
-
-class BaseTestIndexer(BaseIndexer):
- ADDITIONAL_CONFIG = {
- 'tools': ('dict', {
- 'name': 'foo',
- 'version': 'bar',
- 'configuration': {}
- }),
- }
-
- def prepare(self):
- self.idx_storage = MockIndexerStorage()
- self.storage = MockStorage()
-
- def check(self):
- pass
-
- def filter(self, ids):
- self.filtered.append(ids)
- return ids
-
- def run(self, ids, policy_update):
- return self.index(ids)
-
- def index(self, ids):
- self.indexed.append(ids)
- return [id_ + '_indexed_by_' + self.__class__.__name__
- for id_ in ids]
-
- def persist_index_computations(self, result, policy_update):
- self.persisted = result
-
-
-class Indexer1(BaseTestIndexer):
- filtered = []
- indexed = []
-
- def filter(self, ids):
- return super().filter([id_ for id_ in ids if '1' in id_])
-
-
-class Indexer2(BaseTestIndexer):
- filtered = []
- indexed = []
-
- def filter(self, ids):
- return super().filter([id_ for id_ in ids if '2' in id_])
-
-
-class Indexer3(BaseTestIndexer):
- filtered = []
- indexed = []
-
- def filter(self, ids):
- return super().filter([id_ for id_ in ids if '3' in id_])
-
-
-@celery.task
-def indexer1_task(*args, **kwargs):
- return Indexer1().run(*args, **kwargs)
-
-
-@celery.task
-def indexer2_task(*args, **kwargs):
- return Indexer2().run(*args, **kwargs)
-
-
-@celery.task
-def indexer3_task(self, *args, **kwargs):
- return Indexer3().run(*args, **kwargs)
-
-
-class BaseTestOrchestrator12(BaseOrchestratorIndexer):
- TASK_NAMES = {
- 'indexer1': 'swh.indexer.tests.test_orchestrator.indexer1_task',
- 'indexer2': 'swh.indexer.tests.test_orchestrator.indexer2_task',
- 'indexer3': 'swh.indexer.tests.test_orchestrator.indexer3_task',
- }
-
- INDEXER_CLASSES = {
- 'indexer1': 'swh.indexer.tests.test_orchestrator.Indexer1',
- 'indexer2': 'swh.indexer.tests.test_orchestrator.Indexer2',
- 'indexer3': 'swh.indexer.tests.test_orchestrator.Indexer3',
- }
-
- def __init__(self):
- super().__init__()
- self.running_tasks = []
-
- def parse_config_file(self):
- return {
- 'scheduler': {
- 'cls': 'remote',
- 'args': {
- 'url': 'http://localhost:9999',
- },
- },
- 'indexers': {
- 'indexer1': {
- 'batch_size': 2,
- 'check_presence': True,
- },
- 'indexer2': {
- 'batch_size': 2,
- 'check_presence': True,
- },
- }
- }
-
-
-class MockedTestOrchestrator12(BaseTestOrchestrator12):
- def __init__(self):
- super().__init__()
- self.created_tasks = []
-
- def _create_tasks(self, celery_tasks):
- self.created_tasks.extend(celery_tasks)
-
- def prepare_scheduler(self):
- pass
-
-
-class OrchestratorTest(SchedulerTestFixture, unittest.TestCase):
- def setUp(self):
- super().setUp()
- self.add_scheduler_task_type(
- 'indexer1',
- 'swh.indexer.tests.test_orchestrator.indexer1_task')
- self.add_scheduler_task_type(
- 'indexer2',
- 'swh.indexer.tests.test_orchestrator.indexer2_task')
-
- def test_orchestrator_filter(self):
- o = BaseTestOrchestrator12()
- o.scheduler = self.scheduler
- o.run(['id12', 'id2'])
- self.assertEqual(Indexer2.indexed, [])
- self.assertEqual(Indexer1.indexed, [])
- self.run_ready_tasks()
- self.assertEqual(Indexer2.indexed, [['id12', 'id2']])
- self.assertEqual(Indexer1.indexed, [['id12']])
-
-
-class MockedOrchestratorTest(unittest.TestCase):
- maxDiff = None
-
- def test_mocked_orchestrator_filter(self):
- o = MockedTestOrchestrator12()
- o.run(['id12', 'id2'])
- for task in o.created_tasks:
- del task['next_run'] # not worth the trouble testing it properly
- self.assertCountEqual(o.created_tasks, [
- {'type': 'indexer1',
- 'arguments': {
- 'args': [],
- 'kwargs': {
- 'ids': ['id12'],
- 'policy_update': 'ignore-dups'}},
- 'policy': 'oneshot'},
- {'type': 'indexer2',
- 'arguments': {
- 'args': [],
- 'kwargs': {
- 'ids': ['id12', 'id2'],
- 'policy_update': 'ignore-dups'}},
- 'policy': 'oneshot'},
- ])
-
- def test_mocked_orchestrator_batch(self):
- o = MockedTestOrchestrator12()
- o.run(['id12', 'id2a', 'id2b', 'id2c'])
- for task in o.created_tasks:
- del task['next_run'] # not worth the trouble testing it properly
- self.assertCountEqual(o.created_tasks, [
- {'type': 'indexer1',
- 'arguments': {
- 'args': [],
- 'kwargs': {
- 'ids': ['id12'],
- 'policy_update': 'ignore-dups'}},
- 'policy': 'oneshot'},
- {'type': 'indexer2',
- 'arguments': {
- 'args': [],
- 'kwargs': {
- 'ids': ['id12', 'id2a'],
- 'policy_update': 'ignore-dups'}},
- 'policy': 'oneshot'},
- {'type': 'indexer2',
- 'arguments': {
- 'args': [],
- 'kwargs': {
- 'ids': ['id2b', 'id2c'],
- 'policy_update': 'ignore-dups'}},
- 'policy': 'oneshot'},
- ])
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Wed, Dec 18, 6:36 AM (23 h, 8 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3231708
Attached To
D633: indexer: Remove orchestrator layer
Event Timeline
Log In to Comment