diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py index a3dbae6..9f6e5de 100644 --- a/swh/indexer/orchestrator.py +++ b/swh/indexer/orchestrator.py @@ -1,46 +1,45 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from swh.core.config import SWHConfig from swh.scheduler.celery_backend.config import app from . import TASK_NAMES, INDEXER_CLASSES class OrchestratorIndexer(SWHConfig): """The indexer orchestrator is in charge of: - reading messages (list of sha1s as bytes) - according to its configuration, broadcast those messages to indexers - by eventually filtering by indexers """ CONFIG_BASE_FILENAME = 'indexer/orchestrator' DEFAULT_CONFIG = { 'indexers': ('[str]', ['mimetype']), } def __init__(self): super().__init__() self.config = self.parse_config_file() indexer_names = self.config['indexers'] random.shuffle(indexer_names) self.indexers = { TASK_NAMES[name]: INDEXER_CLASSES[name] for name in indexer_names - if name in app.tasks } def run(self, sha1s): for task_name, indexer_class in self.indexers.items(): indexer = indexer_class() # first filter the contents per indexers - sha1s_filtered = indexer.filter_contents(sha1s) + sha1s_filtered = list(indexer.filter_contents(sha1s)) # now send the message for the indexer to compute and store results app.tasks[task_name].delay(sha1s_filtered) diff --git a/swh/indexer/producer.py b/swh/indexer/producer.py index 8822997..9ea8528 100755 --- a/swh/indexer/producer.py +++ b/swh/indexer/producer.py @@ -1,84 +1,63 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import click import random +import sys -from swh.core.config import SWHConfig -from swh.core import utils -from swh.objstorage import get_objstorage +from swh.core import utils, hashutil from swh.scheduler.celery_backend.config import app + from . import tasks, TASK_NAMES # noqa task_name = TASK_NAMES['orchestrator'] orchestrator_task = app.tasks[task_name] -class ContentIndexerProducer(SWHConfig): - DEFAULT_CONFIG = { - 'objstorage': ('dict', { - 'cls': 'pathslicing', - 'args': { - 'slicing': '0:2/2:4/4:6', - 'root': '/srv/softwareheritage/objects/' - }, - }), - 'batch': ('int', 10), - 'limit': ('str', 'none'), - } - - CONFIG_BASE_FILENAME = 'indexer/producer' - - def __init__(self): - super().__init__() - self.config = self.parse_config_file() - objstorage = self.config['objstorage'] - self.objstorage = get_objstorage(objstorage['cls'], objstorage['args']) - - self.limit = self.config['limit'] - if self.limit == 'none': - self.limit = None - else: - self.limit = int(self.limit) - self.batch = self.config['batch'] - - def get_contents(self): - """Read contents and retrieve randomly one possible path. - - """ - yield from self.objstorage - - def gen_sha1(self): - """Generate batch of grouped sha1s from the objstorage. - - """ - for sha1s in utils.grouper(self.get_contents(), self.batch): - sha1s = list(sha1s) - random.shuffle(sha1s) - yield sha1s - - def run_with_limit(self): - count = 0 - for sha1s in self.gen_sha1(): - count += len(sha1s) - print('%s sent - [%s, ...]' % (len(sha1s), sha1s[0])) - orchestrator_task.delay(sha1s) - if count >= self.limit: - return - - def run_no_limit(self): - for sha1s in self.gen_sha1(): - print('%s sent - [%s, ...]' % (len(sha1s), sha1s[0])) - orchestrator_task.delay(sha1s) - - def run(self, *args, **kwargs): - if self.limit: - self.run_with_limit() - else: - self.run_no_limit() +def read_from_stdin(): + for sha1 in sys.stdin: + yield hashutil.hex_to_hash(sha1.strip()) + + +def gen_sha1(batch): + """Generate batch of grouped sha1s from the objstorage. + + """ + for sha1s in utils.grouper(read_from_stdin(), batch): + sha1s = list(sha1s) + random.shuffle(sha1s) + yield sha1s + + +def run_with_limit(limit, batch): + count = 0 + for sha1s in gen_sha1(batch): + count += len(sha1s) + print('%s sent - [%s, ...]' % (len(sha1s), sha1s[0])) + orchestrator_task.delay(sha1s) + if count >= limit: + return + + +def run_no_limit(batch): + for sha1s in gen_sha1(batch): + print('%s sent - [%s, ...]' % (len(sha1s), sha1s[0])) + orchestrator_task.delay(sha1s) + + +@click.command(help='Read sha1 from stdin and send them for indexing') +@click.option('--limit', default=None, help='Limit the number of data to read') +@click.option('--batch', default='10', help='Group data by batch') +def main(limit, batch): + batch = int(batch) + if limit: + run_with_limit(int(limit), batch) + else: + run_no_limit(batch) if __name__ == '__main__': - ContentIndexerProducer().run() + main()