diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -3,4 +3,4 @@ swh.objstorage >= 0.0.28 swh.scheduler >= 0.0.47 swh.storage >= 0.0.123 -swh.journal >= 0.0.6 +swh.journal >= 0.0.11 diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -3,15 +3,19 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import functools + import click from swh.core import config from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup +from swh.journal.cli import get_journal_client from swh.scheduler import get_scheduler from swh.scheduler.cli_utils import schedule_origin_batches from swh.storage import get_storage from swh.indexer import metadata_dictionary +from swh.indexer.journal_client import process_journal_objects from swh.indexer.storage import get_indexer_storage from swh.indexer.storage.api.server import load_and_check_config, app @@ -157,11 +161,60 @@ origins = list_origins_by_producer(idx_storage, mappings, tool_ids) - kwargs = {"policy_update": "update-dups", "parse_ids": False} + kwargs = {"policy_update": "update-dups"} schedule_origin_batches( scheduler, task_type, origins, origin_batch_size, kwargs) +@cli.command('journal-client') +@click.option('--scheduler-url', '-s', default=None, + help="URL of the scheduler API") +@click.option('--origin-metadata-task-type', + default='index-origin-metadata', + help='Name of the task running the origin metadata indexer.') +@click.option('--broker', 'brokers', type=str, multiple=True, + help='Kafka broker to connect to.') +@click.option('--prefix', type=str, default=None, + help='Prefix of Kafka topic names to read from.') +@click.option('--group-id', '--consumer-id', type=str, + help='Name of the consumer/group id for reading from Kafka.') +@click.option('--max-messages', '-m', default=None, type=int, + help='Maximum number of objects to replay. Default is to ' + 'run forever.') +@click.pass_context +def journal_client(ctx, scheduler_url, origin_metadata_task_type, + brokers, prefix, group_id, max_messages): + """Listens for new objects from the SWH Journal, and schedules tasks + to run relevant indexers (currently, only origin-intrinsic-metadata) + on these new objects.""" + scheduler = _get_api( + get_scheduler, + ctx.obj['config'], + 'scheduler', + scheduler_url + ) + + client = get_journal_client( + ctx, brokers, prefix, group_id, object_types=['origin_visit']) + + worker_fn = functools.partial( + process_journal_objects, + scheduler=scheduler, + task_names={ + 'origin_metadata': origin_metadata_task_type, + } + ) + nb_messages = 0 + try: + while not max_messages or nb_messages < max_messages: + nb_messages += client.process(worker_fn) + print('Processed %d messages.' % nb_messages) + except KeyboardInterrupt: + ctx.exit(0) + else: + print('Done.') + + @cli.command('rpc-serve') @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") diff --git a/swh/indexer/journal_client.py b/swh/indexer/journal_client.py --- a/swh/indexer/journal_client.py +++ b/swh/indexer/journal_client.py @@ -5,84 +5,30 @@ import logging -from swh.journal.client import JournalClient -from swh.scheduler import get_scheduler from swh.scheduler.utils import create_task_dict -class IndexerJournalClient(JournalClient): - """Client in charge of listing new received origins and origin_visits - in the swh journal. - - """ - CONFIG_BASE_FILENAME = 'indexer/journal_client' - - ADDITIONAL_CONFIG = { - 'scheduler': ('dict', { - 'cls': 'remote', - 'args': { - 'url': 'http://localhost:5008/', - } - }), - 'origin_visit_tasks': ('List[dict]', [ - { - 'type': 'index-origin-metadata', - 'kwargs': { - 'policy_update': 'update-dups', - 'parse_ids': False, - } - } - ]), - } - - def __init__(self): - super().__init__(extra_configuration={ - 'object_types': ['origin_visit'], - }) - self.scheduler = get_scheduler(**self.config['scheduler']) - logging.info( - 'Starting indexer journal client with config %r', - self.config) - - def process_objects(self, messages): - assert set(messages) == {'origin_visit'}, set(messages) - for origin_visit in messages['origin_visit']: - self.process_origin_visit(origin_visit) - - def process_origin_visit(self, origin_visit): - task_dicts = [] - logging.debug('processing origin visit %r', origin_visit) - if origin_visit[b'status'] == b'full': - for task_config in self.config['origin_visit_tasks']: - logging.info( - 'Scheduling %s for visit of origin %d', - task_config['type'], origin_visit[b'origin']) - task_dicts.append(create_task_dict( - task_config['type'], - 'oneshot', - [origin_visit[b'origin']], - **task_config['kwargs'], - )) - else: - logging.debug('status is not "full", ignoring.') - - if task_dicts: - self.scheduler.create_tasks(task_dicts) - - -if __name__ == '__main__': - logging.basicConfig( - level=logging.INFO, - format='%(asctime)s %(process)d %(levelname)s %(message)s' - ) - - import click - - @click.command() - def main(): - """Log the new received origin and origin_visits. - - """ - IndexerJournalClient().process() - - main() +def process_journal_objects(messages, *, scheduler, task_names): + """Worker function for `JournalClient.process(worker_fn)`, after + currification of `scheduler` and `task_names`.""" + assert set(messages) == {'origin_visit'}, set(messages) + for origin_visit in messages['origin_visit']: + process_origin_visit(origin_visit, scheduler, task_names) + + +def process_origin_visit(origin_visit, scheduler, task_names): + task_dicts = [] + logging.debug('processing origin visit %r', origin_visit) + if origin_visit[b'status'] == b'full': + if task_names.get('origin_metadata'): + task_dicts.append(create_task_dict( + task_names['origin_metadata'], + 'oneshot', + [origin_visit[b'origin'][b'url']], + policy_update='update-dups', + )) + else: + logging.debug('status is not "full", ignoring.') + + if task_dicts: + scheduler.create_tasks(task_dicts) diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -3,10 +3,11 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import namedtuple from functools import reduce import re import tempfile -from unittest.mock import patch +from unittest.mock import patch, MagicMock from click.testing import CliRunner @@ -76,11 +77,11 @@ def _assert_tasks_for_origins(tasks, origins): - expected_kwargs = {"policy_update": "update-dups", "parse_ids": False} + expected_kwargs = {"policy_update": "update-dups"} assert {task['type'] for task in tasks} == {'index-origin-metadata'} assert all(len(task['arguments']['args']) == 1 for task in tasks) - assert all(task['arguments']['kwargs'] == expected_kwargs - for task in tasks) + for task in tasks: + assert task['arguments']['kwargs'] == expected_kwargs, task assert _origins_in_task_args(tasks) == set(origins) @@ -320,3 +321,51 @@ _assert_tasks_for_origins( tasks, [x*2 for x in range(55)]) + + +def test_journal_client(storage, indexer_scheduler): + """Tests the re-indexing when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + mock_consumer = MagicMock() + + partition = namedtuple('_partition', ['topic'])( + topic='swh.journal.objects.origin_visit') + message = namedtuple('_message', ['value'])( + value={ + b'status': b'full', + b'origin': { + b'url': 'file:///dev/zero', + } + } + ) + mock_consumer.poll.return_value = {partition: [message]} + + with patch('swh.journal.client.KafkaConsumer', + return_value=mock_consumer): + result = invoke(indexer_scheduler, False, [ + 'journal-client', + '--max-messages', '1', + '--broker', '192.0.2.1', + '--prefix', 'swh.journal.objects', + '--group-id', 'test-consumer', + ]) + + mock_consumer.subscribe.assert_called_once_with( + topics=['swh.journal.objects.origin_visit']) + mock_consumer.poll.assert_called_once_with() + mock_consumer.commit.assert_called_once_with() + + # Check the output + expected_output = ( + 'Processed 1 messages.\n' + 'Done.\n' + ) + assert result.exit_code == 0, result.output + assert result.output == expected_output + + # Check scheduled tasks + tasks = indexer_scheduler.search_tasks() + assert len(tasks) == 1 + _assert_tasks_for_origins( + tasks, + ['file:///dev/zero']) diff --git a/swh/indexer/tests/test_journal_client.py b/swh/indexer/tests/test_journal_client.py new file mode 100644 --- /dev/null +++ b/swh/indexer/tests/test_journal_client.py @@ -0,0 +1,42 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import unittest +from unittest.mock import Mock + +from swh.indexer.journal_client import process_journal_objects + + +class JournalClientTest(unittest.TestCase): + def testOriginVisit(self): + mock_scheduler = Mock() + messages = { + 'origin_visit': [ + { + b'status': b'full', + b'origin': { + b'url': 'file:///dev/zero', + } + } + ] + } + process_journal_objects( + messages, scheduler=mock_scheduler, + task_names={'origin_metadata': 'task-name'}) + self.assertTrue(mock_scheduler.create_tasks.called) + call_args = mock_scheduler.create_tasks.call_args + (args, kwargs) = call_args + self.assertEqual(kwargs, {}) + del args[0][0]['next_run'] + self.assertEqual(args, ([ + { + 'arguments': { + 'kwargs': {'policy_update': 'update-dups'}, + 'args': (['file:///dev/zero'],) + }, + 'policy': 'oneshot', + 'type': 'task-name' + } + ],))