diff --git a/swh/indexer/__init__.py b/swh/indexer/__init__.py index 5b71c6d..2350643 100644 --- a/swh/indexer/__init__.py +++ b/swh/indexer/__init__.py @@ -1,26 +1,27 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from .mimetype import ContentMimetypeIndexer from .language import ContentLanguageIndexer INDEXER_CLASSES = { 'mimetype': ContentMimetypeIndexer, 'language': ContentLanguageIndexer, } TASK_NAMES = { - 'orchestrator': 'swh.indexer.tasks.SWHOrchestratorTask', + 'orchestrator_all': 'swh.indexer.tasks.SWHOrchestratorAllContentsTask', + 'orchestrator_text': 'swh.indexer.tasks.SWHOrchestratorTextContentsTask', 'mimetype': 'swh.indexer.tasks.SWHContentMimetypeTask', 'language': 'swh.indexer.tasks.SWHContentLanguageTask', } __all__ = [ 'INDEXER_CLASSES', 'TASK_NAMES', 'ContentMimetypeIndexer', 'ContentLanguageIndexer' ] diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py index 604e2c7..bf96fca 100644 --- a/swh/indexer/orchestrator.py +++ b/swh/indexer/orchestrator.py @@ -1,48 +1,61 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from swh.core.config import SWHConfig from swh.scheduler.celery_backend.config import app from . import TASK_NAMES, INDEXER_CLASSES -class OrchestratorIndexer(SWHConfig): +class BaseOrchestratorIndexer(SWHConfig): """The indexer orchestrator is in charge of: - reading batch of contents (list of sha1s as bytes) - according to its configuration, filter or not the contents - and then broadcast those contents to indexers """ CONFIG_BASE_FILENAME = 'indexer/orchestrator' DEFAULT_CONFIG = { 'indexers': ('[str]', ['mimetype']), } def __init__(self): super().__init__() self.config = self.parse_config_file() indexer_names = self.config['indexers'] random.shuffle(indexer_names) self.indexers = { TASK_NAMES[name]: INDEXER_CLASSES[name] for name in indexer_names } def run(self, sha1s): for task_name, indexer_class in self.indexers.items(): indexer = indexer_class() # first filter the contents per indexers sha1s_filtered = list(indexer.filter_contents(sha1s)) if not sha1s_filtered: continue # now send the message for the indexer to compute and store results app.tasks[task_name].delay(sha1s_filtered) + + +class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer): + """Orchestrator which deals with batch of any types of contents. + + """ + + +class OrchestratorTextContentsIndexer(BaseOrchestratorIndexer): + """Orchestrator which deals with batch of text contents. + + """ + CONFIG_BASE_FILENAME = 'indexer/orchestrator_text' diff --git a/swh/indexer/producer.py b/swh/indexer/producer.py index 9ea8528..e8b7085 100755 --- a/swh/indexer/producer.py +++ b/swh/indexer/producer.py @@ -1,63 +1,63 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import random import sys from swh.core import utils, hashutil from swh.scheduler.celery_backend.config import app from . import tasks, TASK_NAMES # noqa -task_name = TASK_NAMES['orchestrator'] +task_name = TASK_NAMES['orchestrator_all'] orchestrator_task = app.tasks[task_name] def read_from_stdin(): for sha1 in sys.stdin: yield hashutil.hex_to_hash(sha1.strip()) def gen_sha1(batch): """Generate batch of grouped sha1s from the objstorage. """ for sha1s in utils.grouper(read_from_stdin(), batch): sha1s = list(sha1s) random.shuffle(sha1s) yield sha1s def run_with_limit(limit, batch): count = 0 for sha1s in gen_sha1(batch): count += len(sha1s) print('%s sent - [%s, ...]' % (len(sha1s), sha1s[0])) orchestrator_task.delay(sha1s) if count >= limit: return def run_no_limit(batch): for sha1s in gen_sha1(batch): print('%s sent - [%s, ...]' % (len(sha1s), sha1s[0])) orchestrator_task.delay(sha1s) @click.command(help='Read sha1 from stdin and send them for indexing') @click.option('--limit', default=None, help='Limit the number of data to read') @click.option('--batch', default='10', help='Group data by batch') def main(limit, batch): batch = int(batch) if limit: run_with_limit(int(limit), batch) else: run_no_limit(batch) if __name__ == '__main__': main() diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py index 914e886..65d3f33 100644 --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -1,40 +1,52 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task -from .orchestrator import OrchestratorIndexer +from .orchestrator import OrchestratorAllContentsIndexer +from .orchestrator import OrchestratorTextContentsIndexer from . import ContentMimetypeIndexer, ContentLanguageIndexer -class SWHOrchestratorTask(Task): - """Main task in charge of reading messages and broadcasting them back - to other tasks. +class SWHOrchestratorAllTask(Task): + """Main task in charge of reading batch contents (of any type) and + broadcasting them back to other tasks. """ - task_queue = 'swh_indexer_orchestrator' + task_queue = 'swh_indexer_all_content' def run(self, *args, **kwargs): - OrchestratorIndexer().run(*args, **kwargs) + OrchestratorAllContentsIndexer().run(*args, **kwargs) + + +class SWHOrchestratorTextContentTask(Task): + """Main task in charge of reading batch contents (of type text) and + broadcasting them back to other tasks. + + """ + task_queue = 'swh_indexer_text_content' + + def run(self, *args, **kwargs): + OrchestratorTextContentsIndexer().run(*args, **kwargs) class SWHContentMimetypeTask(Task): """Task which computes the mimetype, encoding from the sha1's content. """ task_queue = 'swh_indexer_content_mimetype' def run(self, *args, **kwargs): ContentMimetypeIndexer().run(*args, **kwargs) class SWHContentLanguageTask(Task): """Task which computes the language from the sha1's content. """ task_queue = 'swh_indexer_content_language' def run(self, *args, **kwargs): ContentLanguageIndexer().run(*args, **kwargs)