diff --git a/swh/search/cli.py b/swh/search/cli.py --- a/swh/search/cli.py +++ b/swh/search/cli.py @@ -54,7 +54,8 @@ """Listens for new objects from the SWH Journal, and schedules tasks to run relevant indexers (currently, only origin) on these new objects.""" - client = get_journal_client(ctx, object_types=['origin']) + client = get_journal_client( + ctx, object_types=['origin'], max_messages=max_messages) search = get_search(**ctx.obj['config']['search']) worker_fn = functools.partial( diff --git a/swh/search/tests/test_cli.py b/swh/search/tests/test_cli.py --- a/swh/search/tests/test_cli.py +++ b/swh/search/tests/test_cli.py @@ -9,6 +9,7 @@ from click.testing import CliRunner from swh.journal.serializers import value_to_kafka +from swh.journal.tests.utils import MockedKafkaConsumer from swh.search.cli import cli from .test_elasticsearch import BaseElasticsearchTest @@ -49,8 +50,6 @@ def test__journal_client__origin(self): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" - mock_consumer = MagicMock() - topic = 'swh.journal.objects.origin' value = value_to_kafka({ 'url': 'http://foobar.baz', @@ -59,7 +58,8 @@ message.error.return_value = None message.topic.return_value = topic message.value.return_value = value - mock_consumer.poll.return_value = message + + mock_consumer = MockedKafkaConsumer([message]) with patch('swh.journal.client.Consumer', return_value=mock_consumer): @@ -69,10 +69,6 @@ ], JOURNAL_OBJECTS_CONFIG, elasticsearch_host=self._elasticsearch_host) - mock_consumer.subscribe.assert_called_once_with(topics=[topic]) - mock_consumer.poll.assert_called_once_with(timeout=1.0) - mock_consumer.commit.assert_called_once_with() - # Check the output expected_output = ( 'Processed 1 messages.\n'