Page MenuHomeSoftware Heritage

D2795.id.diff
No OneTemporary

D2795.id.diff

diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -7,6 +7,7 @@
import logging
import os
import time
+from typing import Any, Dict, List, Optional, Set, Tuple, Union
from confluent_kafka import Consumer, KafkaException, KafkaError
@@ -75,9 +76,17 @@
"""
def __init__(
- self, brokers, group_id, prefix=None, object_types=None,
- max_messages=0, process_timeout=0, auto_offset_reset='earliest',
- stop_on_eof=False, **kwargs):
+ self,
+ brokers: Union[str, List[str]],
+ group_id: str,
+ prefix: Optional[str] = None,
+ object_types: Optional[List[str]] = None,
+ max_messages: Optional[int] = None,
+ process_timeout: Optional[float] = None,
+ auto_offset_reset: str = 'earliest',
+ stop_on_eof: bool = False,
+ **kwargs
+ ):
if prefix is None:
prefix = DEFAULT_PREFIX
if object_types is None:
@@ -161,7 +170,7 @@
self.max_messages = max_messages
self.process_timeout = process_timeout
- self.eof_reached = set()
+ self.eof_reached: Set[Tuple[str, str]] = set()
self._object_types = object_types
@@ -214,7 +223,7 @@
return nb_messages
def handle_messages(self, messages, worker_fn):
- objects = defaultdict(list)
+ objects: Dict[str, List[Any]] = defaultdict(list)
nb_processed = 0
for message in messages:
diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py
--- a/swh/journal/tests/test_client.py
+++ b/swh/journal/tests/test_client.py
@@ -37,14 +37,12 @@
)
producer.flush()
- config = {
- 'brokers': 'localhost:%d' % kafka_server[1],
- 'group_id': kafka_consumer_group,
- 'prefix': kafka_prefix,
- 'max_messages': 1,
- }
- client = JournalClient(**config)
-
+ client = JournalClient(
+ brokers='localhost:%d' % kafka_server[1],
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ max_messages=1,
+ )
worker_fn = MagicMock()
client.process(worker_fn)
@@ -73,13 +71,13 @@
)
producer.flush()
- config = {
- 'brokers': 'localhost:%d' % kafka_server[1],
- 'group_id': kafka_consumer_group,
- 'prefix': kafka_prefix,
- 'stop_on_eof': True,
- }
- client = JournalClient(**config)
+ client = JournalClient(
+ brokers='localhost:%d' % kafka_server[1],
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ max_messages=1,
+ stop_on_eof=True,
+ )
worker_fn = MagicMock()
client.process(worker_fn)
diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py
--- a/swh/journal/tests/test_replay.py
+++ b/swh/journal/tests/test_replay.py
@@ -72,13 +72,12 @@
producer.flush()
# Fill the storage from Kafka
- config = {
- 'brokers': 'localhost:%d' % kafka_server[1],
- 'group_id': kafka_consumer_group,
- 'prefix': kafka_prefix,
- 'max_messages': nb_sent,
- }
- replayer = JournalClient(**config)
+ replayer = JournalClient(
+ brokers='localhost:%d' % kafka_server[1],
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ max_messages=nb_sent,
+ )
worker_fn = functools.partial(process_replay_objects, storage=storage)
nb_inserted = 0
while nb_inserted < nb_sent:
@@ -147,7 +146,7 @@
writer.send('origin_visit', 'foo', visit)
queue_size = len(queue)
- assert replayer.max_messages == 0
+ assert replayer.max_messages is None
replayer.max_messages = queue_size
storage = get_storage(**storage_config)
diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py
--- a/swh/journal/tests/test_write_replay.py
+++ b/swh/journal/tests/test_write_replay.py
@@ -79,7 +79,7 @@
pass
queue_size = len(queue)
- assert replayer.max_messages == 0
+ assert replayer.max_messages is None
replayer.max_messages = queue_size
storage2 = get_storage(**storage_config)
@@ -135,7 +135,7 @@
contents.append(obj)
queue_size = len(queue)
- assert replayer.max_messages == 0
+ assert replayer.max_messages is None
replayer.max_messages = queue_size
storage2 = get_storage(**storage_config)
diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py
--- a/swh/journal/tests/utils.py
+++ b/swh/journal/tests/utils.py
@@ -69,7 +69,7 @@
def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES):
self._object_types = object_types
self.consumer = MockedKafkaConsumer(queue)
- self.process_timeout = 0
- self.max_messages = 0
+ self.process_timeout = None
+ self.max_messages = None
self.value_deserializer = kafka_to_value
self.stop_on_eof = False

File Metadata

Mime Type
text/plain
Expires
Mar 17 2025, 7:05 PM (7 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217134

Event Timeline