diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -70,9 +70,10 @@ throughput across the nodes sharing the same group_id. Messages are processed by the `worker_fn` callback passed to the `process` - method, in batches of maximum 20 messages (currently hardcoded). If set, - the processing stops after processing `stop_after_objects` messages in - total. + method, in batches of maximum `batch_size` messages (defaults to 200). + + If set, the processing stops after processing `stop_after_objects` messages + in total. `stop_on_eof` stops the processing when the client has reached the end of each partition in turn. @@ -91,6 +92,7 @@ prefix: Optional[str] = None, object_types: Optional[List[str]] = None, stop_after_objects: Optional[int] = None, + batch_size: int = 200, process_timeout: Optional[float] = None, auto_offset_reset: str = 'earliest', stop_on_eof: bool = False, @@ -111,6 +113,9 @@ 'Option \'object_types\' only accepts %s, not %s.' % (ACCEPTED_OBJECT_TYPES, object_type)) + if batch_size <= 0: + raise ValueError("Option 'batch_size' needs to be positive") + self.value_deserializer = kafka_to_value if isinstance(brokers, str): @@ -180,6 +185,7 @@ self.stop_after_objects = stop_after_objects self.process_timeout = process_timeout self.eof_reached: Set[Tuple[str, str]] = set() + self.batch_size = batch_size self._object_types = object_types @@ -212,8 +218,7 @@ timeout = self.process_timeout - elapsed - batch_size = 20 - + batch_size = self.batch_size if self.stop_after_objects: if total_objects_processed >= self.stop_after_objects: break diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -4,12 +4,14 @@ # See top-level LICENSE file for more information from subprocess import Popen -from typing import Tuple +from typing import Dict, List, Tuple from unittest.mock import MagicMock from confluent_kafka import Producer +import pytest from swh.model.hypothesis_strategies import revisions +from swh.model.model import Content from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka @@ -83,3 +85,54 @@ client.process(worker_fn) worker_fn.assert_called_once_with({'revision': [rev.to_dict()]}) + + +@pytest.mark.parametrize("batch_size", [1, 5, 100]) +def test_client_batch_size( + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + batch_size: int, +): + (_, port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + num_objects = 2 * batch_size + 1 + assert num_objects < 256, "Too many objects, generation will fail" + + producer = Producer({ + 'bootstrap.servers': 'localhost:{}'.format(port), + 'client.id': 'test producer', + 'enable.idempotence': 'true', + }) + + contents = [Content.from_data(bytes([i])) for i in range(num_objects)] + + # Fill Kafka + for content in contents: + producer.produce( + topic=kafka_prefix + '.content', + key=key_to_kafka(content.sha1), + value=value_to_kafka(content.to_dict()), + ) + + producer.flush() + + client = JournalClient( + brokers=['localhost:%d' % kafka_server[1]], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=num_objects, + batch_size=batch_size, + ) + + collected_output: List[Dict] = [] + + def worker_fn(objects): + received = objects['content'] + assert len(received) <= batch_size + collected_output.extend(received) + + client.process(worker_fn) + + assert collected_output == [content.to_dict() for content in contents] diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -73,3 +73,4 @@ self.stop_after_objects = None self.value_deserializer = kafka_to_value self.stop_on_eof = False + self.batch_size = 200