diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -7,6 +7,7 @@ import logging import os import time +from typing import Any, Dict, List, Optional, Set, Tuple, Union from confluent_kafka import Consumer, KafkaException, KafkaError @@ -75,9 +76,17 @@ """ def __init__( - self, brokers, group_id, prefix=None, object_types=None, - max_messages=0, process_timeout=0, auto_offset_reset='earliest', - stop_on_eof=False, **kwargs): + self, + brokers: Union[str, List[str]], + group_id: str, + prefix: Optional[str] = None, + object_types: Optional[List[str]] = None, + max_messages: Optional[int] = None, + process_timeout: Optional[float] = None, + auto_offset_reset: str = 'earliest', + stop_on_eof: bool = False, + **kwargs + ): if prefix is None: prefix = DEFAULT_PREFIX if object_types is None: @@ -161,7 +170,7 @@ self.max_messages = max_messages self.process_timeout = process_timeout - self.eof_reached = set() + self.eof_reached: Set[Tuple[str, str]] = set() self._object_types = object_types @@ -214,7 +223,7 @@ return nb_messages def handle_messages(self, messages, worker_fn): - objects = defaultdict(list) + objects: Dict[str, List[Any]] = defaultdict(list) nb_processed = 0 for message in messages: diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -37,14 +37,12 @@ ) producer.flush() - config = { - 'brokers': 'localhost:%d' % kafka_server[1], - 'group_id': kafka_consumer_group, - 'prefix': kafka_prefix, - 'max_messages': 1, - } - client = JournalClient(**config) - + client = JournalClient( + brokers='localhost:%d' % kafka_server[1], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + max_messages=1, + ) worker_fn = MagicMock() client.process(worker_fn) @@ -73,13 +71,13 @@ ) producer.flush() - config = { - 'brokers': 'localhost:%d' % kafka_server[1], - 'group_id': kafka_consumer_group, - 'prefix': kafka_prefix, - 'stop_on_eof': True, - } - client = JournalClient(**config) + client = JournalClient( + brokers='localhost:%d' % kafka_server[1], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + max_messages=1, + stop_on_eof=True, + ) worker_fn = MagicMock() client.process(worker_fn) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -72,13 +72,12 @@ producer.flush() # Fill the storage from Kafka - config = { - 'brokers': 'localhost:%d' % kafka_server[1], - 'group_id': kafka_consumer_group, - 'prefix': kafka_prefix, - 'max_messages': nb_sent, - } - replayer = JournalClient(**config) + replayer = JournalClient( + brokers='localhost:%d' % kafka_server[1], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + max_messages=nb_sent, + ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: @@ -147,7 +146,7 @@ writer.send('origin_visit', 'foo', visit) queue_size = len(queue) - assert replayer.max_messages == 0 + assert replayer.max_messages is None replayer.max_messages = queue_size storage = get_storage(**storage_config) diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -79,7 +79,7 @@ pass queue_size = len(queue) - assert replayer.max_messages == 0 + assert replayer.max_messages is None replayer.max_messages = queue_size storage2 = get_storage(**storage_config) @@ -135,7 +135,7 @@ contents.append(obj) queue_size = len(queue) - assert replayer.max_messages == 0 + assert replayer.max_messages is None replayer.max_messages = queue_size storage2 = get_storage(**storage_config) diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -69,7 +69,7 @@ def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): self._object_types = object_types self.consumer = MockedKafkaConsumer(queue) - self.process_timeout = 0 - self.max_messages = 0 + self.process_timeout = None + self.max_messages = None self.value_deserializer = kafka_to_value self.stop_on_eof = False