diff --git a/requirements-swh.txt b/requirements-swh.txt index 675ffa5..de4709b 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ # Add here internal Software Heritage dependencies, one per line. swh.core[http] -swh.journal +swh.journal >= 0.0.27 swh.model diff --git a/swh/search/cli.py b/swh/search/cli.py index b843ba0..b43d113 100644 --- a/swh/search/cli.py +++ b/swh/search/cli.py @@ -1,88 +1,89 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import click from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.journal.cli import get_journal_client from . import get_search from .journal_client import process_journal_objects from .api.server import load_and_check_config, app @click.group(name='search', context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.pass_context def cli(ctx, config_file): '''Software Heritage Search tools.''' ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj['config'] = conf @cli.command('initialize') @click.pass_context def initialize(ctx): """Creates Elasticsearch indices.""" search = get_search(**ctx.obj['config']['search']) search.initialize() print('Done.') @cli.group('journal-client') @click.pass_context def journal_client(ctx): """""" pass @journal_client.command('objects') -@click.option('--max-messages', '-m', default=None, type=int, +@click.option('--stop-after-objects', '-s', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.pass_context -def journal_client_objects(ctx, max_messages): +def journal_client_objects(ctx, stop_after_objects): """Listens for new objects from the SWH Journal, and schedules tasks to run relevant indexers (currently, only origin) on these new objects.""" client = get_journal_client( ctx, object_types=['origin', 'origin_visit'], - max_messages=max_messages) + stop_after_objects=stop_after_objects) search = get_search(**ctx.obj['config']['search']) worker_fn = functools.partial( process_journal_objects, search=search, ) nb_messages = 0 try: - while not max_messages or nb_messages < max_messages: - nb_messages += client.process(worker_fn) - print('Processed %d messages.' % nb_messages) + nb_messages = client.process(worker_fn) + print('Processed %d messages.' % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') + finally: + client.close() @cli.command('rpc-serve') @click.argument('config-path', required=True) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5010, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") def rpc_server(config_path, host, port, debug): """Starts a Software Heritage Indexer RPC HTTP server.""" api_cfg = load_and_check_config(config_path, type='any') app.config.update(api_cfg) app.run(host, port=int(port), debug=bool(debug)) diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py index b6599cd..677b8af 100644 --- a/swh/search/tests/test_cli.py +++ b/swh/search/tests/test_cli.py @@ -1,133 +1,133 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile from unittest.mock import patch, MagicMock from click.testing import CliRunner from swh.journal.serializers import value_to_kafka from swh.journal.tests.utils import MockedKafkaConsumer from swh.search.cli import cli from .test_elasticsearch import BaseElasticsearchTest CLI_CONFIG = ''' search: cls: elasticsearch args: hosts: - '{elasticsearch_host}' ''' JOURNAL_OBJECTS_CONFIG = ''' journal: brokers: - 192.0.2.1 prefix: swh.journal.objects group_id: test-consumer ''' class MockedKafkaConsumerWithTopics(MockedKafkaConsumer): def list_topics(self, timeout=None): return { 'swh.journal.objects.origin', 'swh.journal.objects.origin_visit', } def invoke(catch_exceptions, args, config='', *, elasticsearch_host): runner = CliRunner() with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write((CLI_CONFIG + config).format( elasticsearch_host=elasticsearch_host )) config_fd.seek(0) result = runner.invoke(cli, ['-C' + config_fd.name] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result class CliTestCase(BaseElasticsearchTest): def test__journal_client__origin(self): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" topic = 'swh.journal.objects.origin' value = value_to_kafka({ 'url': 'http://foobar.baz', }) message = MagicMock() message.error.return_value = None message.topic.return_value = topic message.value.return_value = value mock_consumer = MockedKafkaConsumerWithTopics([message]) with patch('swh.journal.client.Consumer', return_value=mock_consumer): result = invoke(False, [ 'journal-client', 'objects', - '--max-messages', '1', + '--stop-after-objects', '1', ], JOURNAL_OBJECTS_CONFIG, elasticsearch_host=self._elasticsearch_host) # Check the output expected_output = ( 'Processed 1 messages.\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output self.search.flush() results = self.search.origin_search(url_pattern='foobar') assert results == {'next_page_token': None, 'results': [ {'url': 'http://foobar.baz'}]} results = self.search.origin_search(url_pattern='foobar', with_visit=True) assert results == {'next_page_token': None, 'results': []} def test__journal_client__origin_visit(self): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" topic = 'swh.journal.objects.origin_visit' value = value_to_kafka({ 'origin': 'http://foobar.baz', }) message = MagicMock() message.error.return_value = None message.topic.return_value = topic message.value.return_value = value mock_consumer = MockedKafkaConsumerWithTopics([message]) with patch('swh.journal.client.Consumer', return_value=mock_consumer): result = invoke(False, [ 'journal-client', 'objects', - '--max-messages', '1', + '--stop-after-objects', '1', ], JOURNAL_OBJECTS_CONFIG, elasticsearch_host=self._elasticsearch_host) # Check the output expected_output = ( 'Processed 1 messages.\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output self.search.flush() results = self.search.origin_search(url_pattern='foobar', with_visit=True) assert results == {'next_page_token': None, 'results': [ {'url': 'http://foobar.baz'}]}