diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py index 5b8e379..186e169 100644 --- a/swh/indexer/orchestrator.py +++ b/swh/indexer/orchestrator.py @@ -1,99 +1,128 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group from swh.core.config import SWHConfig +from swh.core.utils import grouper from swh.scheduler import utils from . import TASK_NAMES, INDEXER_CLASSES +def get_class(clazz): + """Get a symbol class dynamically by its fully qualified name string + representation. + + """ + parts = clazz.split('.') + module = '.'.join(parts[:-1]) + m = __import__(module) + for comp in parts[1:]: + m = getattr(m, comp) + return m + + class BaseOrchestratorIndexer(SWHConfig): - """The indexer orchestrator is in charge of: - - reading batch of contents (list of sha1s as bytes) + """The indexer orchestrator is in charge of dispatching batch of + contents (filtered or not based on presence) to indexers. + + That dispatch is indexer specific, so the configuration reflects it: + + - when check_presence flag is true, filter out the contents + already present for that indexer, otherwise send everything + + - broadcast those (filtered or not) contents to indexers in a + batch_size fashioned + + For example: + + ```yaml + indexers: + mimetype: + batch_size: 10 + check_presence: false + language: + batch_size: 2 + check_presence: true + ``` - - according to its configuration (flag check_presence), filter (or - not) the contents already present + means: - - and then broadcast those contents to indexers + - send all contents received as batch of size 10 to the 'mimetype' + indexer + + - send only unknown contents as batch of size 2 to the 'language' + indexer. """ CONFIG_BASE_FILENAME = 'indexer/orchestrator' DEFAULT_CONFIG = { - 'indexers': ('[str]', ['mimetype']), - 'check_presence': ('bool', True), + 'indexers': ('dict', { + 'mimetype': { + 'batch_size': 10, + 'check_presence': True, + }, + }), } def __init__(self): super().__init__() self.config = self.parse_config_file() - indexer_names = self.config['indexers'] + indexer_names = list(self.config['indexers'].keys()) random.shuffle(indexer_names) - self.indexers = { - TASK_NAMES[name]: INDEXER_CLASSES[name] - for name in indexer_names - } - self.check_presence = self.config['check_presence'] - - def run_with_check(self, sha1s): - """Run with checking the presence on sha1s in db to filter them out. - - """ - celery_tasks = [] - for task_name, indexer_class in self.indexers.items(): - indexer = indexer_class() - - # filter the contents per indexers - sha1s_filtered = list(indexer.filter_contents(sha1s)) - if not sha1s_filtered: - continue + indexers = {} + tasks = {} + for name in indexer_names: + if name not in TASK_NAMES: + raise ValueError('%s must be one of %s' % ( + name, TASK_NAMES.keys())) - # send message for indexer to compute and store results on - # filtered sha1s - celery_task = utils.get_task(task_name).s( - sha1s=sha1s_filtered, - policy_update='ignore-dups') - celery_tasks.append(celery_task) + opts = self.config['indexers'][name] + indexers[name] = ( + INDEXER_CLASSES[name], + opts['check_presence'], + opts['batch_size']) + tasks[name] = utils.get_task(TASK_NAMES[name]) - return celery_tasks - - def run_no_check(self, sha1s): - """Simply broadcast sha1s to the indexers' queue. - - """ - celery_tasks = [] - for task_name, _ in self.indexers.items(): - # send message for indexer to compute and store results - celery_task = utils.get_task(task_name).s( - sha1s=sha1s, - policy_update='update-dups') - celery_tasks.append(celery_task) - - return celery_tasks + self.indexers = indexers + self.tasks = tasks def run(self, sha1s): - if self.check_presence: - celery_tasks = self.run_with_check(sha1s) - else: - celery_tasks = self.run_no_check(sha1s) - - group(celery_tasks).delay() + for name, (idx_class, filtering, batch_size) in self.indexers.items(): + if filtering: + policy_update = 'ignore-dups' + indexer_class = get_class(idx_class) + sha1s_filtered = list(indexer_class().filter_contents(sha1s)) + if not sha1s_filtered: + continue + else: + policy_update = 'update-dups' + sha1s_filtered = sha1s + + celery_tasks = [] + for sha1s_to_send in grouper(sha1s_filtered, batch_size): + celery_task = self.tasks[name].s( + sha1s=list(sha1s_to_send), + policy_update=policy_update) + celery_tasks.append(celery_task) + + group(celery_tasks).delay() class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer): """Orchestrator which deals with batch of any types of contents. """ class OrchestratorTextContentsIndexer(BaseOrchestratorIndexer): """Orchestrator which deals with batch of text contents. """ CONFIG_BASE_FILENAME = 'indexer/orchestrator_text'