Page MenuHomeSoftware Heritage

D633.id1961.diff
No OneTemporary

D633.id1961.diff

diff --git a/README.md b/README.md
--- a/README.md
+++ b/README.md
@@ -11,81 +11,36 @@
- revision:
- metadata
-## Context
+An indexer is in charge of:
+- looking up objects
+- extracting information from those objects
+- store those information in the swh-indexer db
-SWH has currently stored around 5B contents. The table `content`
-holds their checksums.
-
-Those contents are physically stored in an object storage (using
-disks) and replicated in another. Those object storages are not
-destined for reading yet.
-
-We are in the process to copy those contents over to azure's blob
-storages. As such, we will use that opportunity to trigger the
-computations on these contents once those have been copied over.
-
-
-## Workers
-
-There are two types of workers:
-- orchestrators (orchestrator, orchestrator-text)
-- indexer (mimetype, language, ctags, fossology-license)
-
-### Orchestrator
-
-
-The orchestrator is in charge of dispatching a batch of sha1 hashes to
-different indexers.
-
-Orchestration procedure:
-- receive batch of sha1s
-- split those batches into groups (according to setup)
-- broadcast those group to indexers
-
-There are two types of orchestrators:
-
-- orchestrator (swh_indexer_orchestrator_content_all): Receives and
- broadcast sha1 ids (of contents) to indexers (currently only the
- mimetype indexer)
-
-- orchestrator-text (swh_indexer_orchestrator_content_text): Receives
- batch of sha1 ids (of textual contents) and broadcast those to
- indexers (currently language, ctags, and fossology-license
- indexers).
-
-
-### Indexers
-
-
-An indexer is in charge of the content retrieval and indexation of the
-extracted information in the swh-indexer db.
-
-There are two types of indexers:
+There are multiple indexers working on different object types:
- content indexer: works with content sha1 hashes
- revision indexer: works with revision sha1 hashes
+ - origin indexer: works with origin identifiers
Indexation procedure:
- receive batch of ids
- retrieve the associated data depending on object type
- compute for that object some index
- store the result to swh's storage
-- (and possibly do some broadcast itself)
Current content indexers:
-- mimetype (queue swh_indexer_content_mimetype): compute the mimetype,
- filter out the textual contents and broadcast the list to the
- orchestrator-text
+- mimetype (queue swh_indexer_content_mimetype): detect the encoding
+ and mimetype
-- language (queue swh_indexer_content_language): detect the programming language
+- language (queue swh_indexer_content_language): detect the
+ programming language
-- ctags (queue swh_indexer_content_ctags): try and compute tags
- information
+- ctags (queue swh_indexer_content_ctags): compute tags information
-- fossology-license (queue swh_indexer_fossology_license): try and
- compute the license
+- fossology-license (queue swh_indexer_fossology_license): compute the
+ license
-- metadata : translate file into translated_metadata dict
+- metadata: translate file into translated_metadata dict
Current revision indexers:
diff --git a/swh/indexer/__init__.py b/swh/indexer/__init__.py
--- a/swh/indexer/__init__.py
+++ b/swh/indexer/__init__.py
@@ -2,30 +2,3 @@
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-
-
-INDEXER_CLASSES = {
- 'indexer_mimetype': 'swh.indexer.mimetype.ContentMimetypeIndexer',
- 'indexer_language': 'swh.indexer.language.ContentLanguageIndexer',
- 'indexer_ctags': 'swh.indexer.ctags.CtagsIndexer',
- 'indexer_fossology_license':
- 'swh.indexer.fossology_license.ContentFossologyLicenseIndexer',
-}
-
-
-TASK_NAMES = {
- 'indexer_orchestrator_all': 'swh.indexer.tasks.OrchestratorAllContents',
- 'indexer_orchestrator_text': 'swh.indexer.tasks.OrchestratorTextContents',
- 'indexer_mimetype': 'swh.indexer.tasks.ContentMimetype',
- 'indexer_language': 'swh.indexer.tasks.ContentLanguage',
- 'indexer_ctags': 'swh.indexer.tasks.Ctags',
- 'indexer_fossology_license': 'swh.indexer.tasks.ContentFossologyLicense',
- 'indexer_rehash': 'swh.indexer.tasks.RecomputeChecksums',
- 'indexer_revision_metadata': 'swh.indexer.tasks.RevisionMetadata',
- 'indexer_origin_intrinsic_metadata': 'swh.indexer.tasks.OriginMetadata',
-}
-
-
-__all__ = [
- 'INDEXER_CLASSES', 'TASK_NAMES',
-]
diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py
--- a/swh/indexer/indexer.py
+++ b/swh/indexer/indexer.py
@@ -93,9 +93,7 @@
Then you need to implement the following functions:
:func:`filter`:
- filter out data already indexed (in storage). This function is used by
- the orchestrator and not directly by the indexer
- (cf. swh.indexer.orchestrator.BaseOrchestratorIndexer).
+ filter out data already indexed (in storage).
:func:`index_object`:
compute index on id with data (retrieved from the storage or the
diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py
deleted file mode 100644
--- a/swh/indexer/orchestrator.py
+++ /dev/null
@@ -1,143 +0,0 @@
-# Copyright (C) 2016-2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import random
-
-from swh.core.config import SWHConfig
-from swh.core.utils import grouper
-from swh.scheduler import utils
-from swh.scheduler import get_scheduler
-from swh.scheduler.utils import create_task_dict
-
-
-def get_class(clazz):
- """Get a symbol class dynamically by its fully qualified name string
- representation.
-
- """
- parts = clazz.split('.')
- module = '.'.join(parts[:-1])
- m = __import__(module)
- for comp in parts[1:]:
- m = getattr(m, comp)
- return m
-
-
-class BaseOrchestratorIndexer(SWHConfig):
- """The indexer orchestrator is in charge of dispatching batch of
- contents (filtered or not based on presence) to indexers.
-
- That dispatch is indexer specific, so the configuration reflects it:
-
- - when `check_presence` flag is true, filter out the
- contents already present for that indexer, otherwise send
- everything
-
- - broadcast those (filtered or not) contents to indexers in a
- `batch_size` fashioned
-
- For example::
-
- indexers:
- indexer_mimetype:
- batch_size: 10
- check_presence: false
- indexer_language:
- batch_size: 2
- check_presence: true
-
- means:
-
- - send all contents received as batch of size 10 to the 'mimetype' indexer
- - send only unknown contents as batch of size 2 to the 'language' indexer.
-
- """
- CONFIG_BASE_FILENAME = 'indexer/orchestrator'
-
- # Overridable in child classes.
- from . import TASK_NAMES, INDEXER_CLASSES
-
- DEFAULT_CONFIG = {
- 'scheduler': {
- 'cls': 'remote',
- 'args': {
- 'url': 'http://localhost:5008',
- },
- },
- 'indexers': ('dict', {
- 'indexer_mimetype': {
- 'batch_size': 10,
- 'check_presence': True,
- },
- }),
- }
-
- def __init__(self):
- super().__init__()
- self.config = self.parse_config_file()
- self.prepare_tasks()
- self.prepare_scheduler()
-
- def prepare_scheduler(self):
- self.scheduler = get_scheduler(**self.config['scheduler'])
-
- def prepare_tasks(self):
- indexer_names = list(self.config['indexers'])
- random.shuffle(indexer_names)
- indexers = {}
- tasks = {}
- for name in indexer_names:
- if name not in self.TASK_NAMES:
- raise ValueError('%s must be one of %s' % (
- name, ', '.join(self.TASK_NAMES)))
-
- opts = self.config['indexers'][name]
- indexers[name] = (
- self.INDEXER_CLASSES[name],
- opts['check_presence'],
- opts['batch_size'])
- tasks[name] = utils.get_task(self.TASK_NAMES[name])
-
- self.indexers = indexers
- self.tasks = tasks
-
- def run(self, ids):
- for task_name, task_attrs in self.indexers.items():
- (idx_class, filtering, batch_size) = task_attrs
- if filtering:
- policy_update = 'ignore-dups'
- indexer_class = get_class(idx_class)
- ids_filtered = list(indexer_class().filter(ids))
- if not ids_filtered:
- continue
- else:
- policy_update = 'update-dups'
- ids_filtered = ids
-
- tasks = []
- for ids_to_send in grouper(ids_filtered, batch_size):
- tasks.append(create_task_dict(
- task_name,
- 'oneshot',
- ids=list(ids_to_send),
- policy_update=policy_update,
- ))
- self._create_tasks(tasks)
-
- def _create_tasks(self, tasks):
- self.scheduler.create_tasks(tasks)
-
-
-class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer):
- """Orchestrator which deals with batch of any types of contents.
-
- """
-
-
-class OrchestratorTextContentsIndexer(BaseOrchestratorIndexer):
- """Orchestrator which deals with batch of text contents.
-
- """
- CONFIG_BASE_FILENAME = 'indexer/orchestrator_text'
diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py
--- a/swh/indexer/origin_head.py
+++ b/swh/indexer/origin_head.py
@@ -38,7 +38,7 @@
def persist_index_computations(self, results, policy_update):
"""Do nothing. The indexer's results are not persistent, they
- should only be piped to another indexer via the orchestrator."""
+ should only be piped to another indexer."""
pass
def next_step(self, results, task):
diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py
--- a/swh/indexer/tasks.py
+++ b/swh/indexer/tasks.py
@@ -7,8 +7,6 @@
from swh.scheduler.task import Task as SchedulerTask
-from .orchestrator import OrchestratorAllContentsIndexer
-from .orchestrator import OrchestratorTextContentsIndexer
from .mimetype import ContentMimetypeIndexer
from .language import ContentLanguageIndexer
from .ctags import CtagsIndexer
@@ -27,26 +25,6 @@
return indexer
-class OrchestratorAllContents(Task):
- """Main task in charge of reading batch contents (of any type) and
- broadcasting them back to other tasks.
-
- """
- task_queue = 'swh_indexer_orchestrator_content_all'
-
- Indexer = OrchestratorAllContentsIndexer
-
-
-class OrchestratorTextContents(Task):
- """Main task in charge of reading batch contents (of type text) and
- broadcasting them back to other tasks.
-
- """
- task_queue = 'swh_indexer_orchestrator_content_text'
-
- Indexer = OrchestratorTextContentsIndexer
-
-
class RevisionMetadata(Task):
task_queue = 'swh_indexer_revision_metadata'
diff --git a/swh/indexer/tests/test_orchestrator.py b/swh/indexer/tests/test_orchestrator.py
deleted file mode 100644
--- a/swh/indexer/tests/test_orchestrator.py
+++ /dev/null
@@ -1,210 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import unittest
-
-import celery
-
-from swh.indexer.orchestrator import BaseOrchestratorIndexer
-from swh.indexer.indexer import BaseIndexer
-from swh.indexer.tests.test_utils import MockIndexerStorage, MockStorage
-from swh.scheduler.tests.scheduler_testing import SchedulerTestFixture
-
-
-class BaseTestIndexer(BaseIndexer):
- ADDITIONAL_CONFIG = {
- 'tools': ('dict', {
- 'name': 'foo',
- 'version': 'bar',
- 'configuration': {}
- }),
- }
-
- def prepare(self):
- self.idx_storage = MockIndexerStorage()
- self.storage = MockStorage()
-
- def check(self):
- pass
-
- def filter(self, ids):
- self.filtered.append(ids)
- return ids
-
- def run(self, ids, policy_update):
- return self.index(ids)
-
- def index(self, ids):
- self.indexed.append(ids)
- return [id_ + '_indexed_by_' + self.__class__.__name__
- for id_ in ids]
-
- def persist_index_computations(self, result, policy_update):
- self.persisted = result
-
-
-class Indexer1(BaseTestIndexer):
- filtered = []
- indexed = []
-
- def filter(self, ids):
- return super().filter([id_ for id_ in ids if '1' in id_])
-
-
-class Indexer2(BaseTestIndexer):
- filtered = []
- indexed = []
-
- def filter(self, ids):
- return super().filter([id_ for id_ in ids if '2' in id_])
-
-
-class Indexer3(BaseTestIndexer):
- filtered = []
- indexed = []
-
- def filter(self, ids):
- return super().filter([id_ for id_ in ids if '3' in id_])
-
-
-@celery.task
-def indexer1_task(*args, **kwargs):
- return Indexer1().run(*args, **kwargs)
-
-
-@celery.task
-def indexer2_task(*args, **kwargs):
- return Indexer2().run(*args, **kwargs)
-
-
-@celery.task
-def indexer3_task(self, *args, **kwargs):
- return Indexer3().run(*args, **kwargs)
-
-
-class BaseTestOrchestrator12(BaseOrchestratorIndexer):
- TASK_NAMES = {
- 'indexer1': 'swh.indexer.tests.test_orchestrator.indexer1_task',
- 'indexer2': 'swh.indexer.tests.test_orchestrator.indexer2_task',
- 'indexer3': 'swh.indexer.tests.test_orchestrator.indexer3_task',
- }
-
- INDEXER_CLASSES = {
- 'indexer1': 'swh.indexer.tests.test_orchestrator.Indexer1',
- 'indexer2': 'swh.indexer.tests.test_orchestrator.Indexer2',
- 'indexer3': 'swh.indexer.tests.test_orchestrator.Indexer3',
- }
-
- def __init__(self):
- super().__init__()
- self.running_tasks = []
-
- def parse_config_file(self):
- return {
- 'scheduler': {
- 'cls': 'remote',
- 'args': {
- 'url': 'http://localhost:9999',
- },
- },
- 'indexers': {
- 'indexer1': {
- 'batch_size': 2,
- 'check_presence': True,
- },
- 'indexer2': {
- 'batch_size': 2,
- 'check_presence': True,
- },
- }
- }
-
-
-class MockedTestOrchestrator12(BaseTestOrchestrator12):
- def __init__(self):
- super().__init__()
- self.created_tasks = []
-
- def _create_tasks(self, celery_tasks):
- self.created_tasks.extend(celery_tasks)
-
- def prepare_scheduler(self):
- pass
-
-
-class OrchestratorTest(SchedulerTestFixture, unittest.TestCase):
- def setUp(self):
- super().setUp()
- self.add_scheduler_task_type(
- 'indexer1',
- 'swh.indexer.tests.test_orchestrator.indexer1_task')
- self.add_scheduler_task_type(
- 'indexer2',
- 'swh.indexer.tests.test_orchestrator.indexer2_task')
-
- def test_orchestrator_filter(self):
- o = BaseTestOrchestrator12()
- o.scheduler = self.scheduler
- o.run(['id12', 'id2'])
- self.assertEqual(Indexer2.indexed, [])
- self.assertEqual(Indexer1.indexed, [])
- self.run_ready_tasks()
- self.assertEqual(Indexer2.indexed, [['id12', 'id2']])
- self.assertEqual(Indexer1.indexed, [['id12']])
-
-
-class MockedOrchestratorTest(unittest.TestCase):
- maxDiff = None
-
- def test_mocked_orchestrator_filter(self):
- o = MockedTestOrchestrator12()
- o.run(['id12', 'id2'])
- for task in o.created_tasks:
- del task['next_run'] # not worth the trouble testing it properly
- self.assertCountEqual(o.created_tasks, [
- {'type': 'indexer1',
- 'arguments': {
- 'args': [],
- 'kwargs': {
- 'ids': ['id12'],
- 'policy_update': 'ignore-dups'}},
- 'policy': 'oneshot'},
- {'type': 'indexer2',
- 'arguments': {
- 'args': [],
- 'kwargs': {
- 'ids': ['id12', 'id2'],
- 'policy_update': 'ignore-dups'}},
- 'policy': 'oneshot'},
- ])
-
- def test_mocked_orchestrator_batch(self):
- o = MockedTestOrchestrator12()
- o.run(['id12', 'id2a', 'id2b', 'id2c'])
- for task in o.created_tasks:
- del task['next_run'] # not worth the trouble testing it properly
- self.assertCountEqual(o.created_tasks, [
- {'type': 'indexer1',
- 'arguments': {
- 'args': [],
- 'kwargs': {
- 'ids': ['id12'],
- 'policy_update': 'ignore-dups'}},
- 'policy': 'oneshot'},
- {'type': 'indexer2',
- 'arguments': {
- 'args': [],
- 'kwargs': {
- 'ids': ['id12', 'id2a'],
- 'policy_update': 'ignore-dups'}},
- 'policy': 'oneshot'},
- {'type': 'indexer2',
- 'arguments': {
- 'args': [],
- 'kwargs': {
- 'ids': ['id2b', 'id2c'],
- 'policy_update': 'ignore-dups'}},
- 'policy': 'oneshot'},
- ])

File Metadata

Mime Type
text/plain
Expires
Wed, Dec 18, 6:36 AM (20 h, 40 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3231708

Event Timeline