Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7343066
D2795.id.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
4 KB
Subscribers
None
D2795.id.diff
View Options
diff --git a/swh/journal/client.py b/swh/journal/client.py
--- a/swh/journal/client.py
+++ b/swh/journal/client.py
@@ -7,6 +7,7 @@
import logging
import os
import time
+from typing import Any, Dict, List, Optional, Set, Tuple, Union
from confluent_kafka import Consumer, KafkaException, KafkaError
@@ -75,9 +76,17 @@
"""
def __init__(
- self, brokers, group_id, prefix=None, object_types=None,
- max_messages=0, process_timeout=0, auto_offset_reset='earliest',
- stop_on_eof=False, **kwargs):
+ self,
+ brokers: Union[str, List[str]],
+ group_id: str,
+ prefix: Optional[str] = None,
+ object_types: Optional[List[str]] = None,
+ max_messages: Optional[int] = None,
+ process_timeout: Optional[float] = None,
+ auto_offset_reset: str = 'earliest',
+ stop_on_eof: bool = False,
+ **kwargs
+ ):
if prefix is None:
prefix = DEFAULT_PREFIX
if object_types is None:
@@ -161,7 +170,7 @@
self.max_messages = max_messages
self.process_timeout = process_timeout
- self.eof_reached = set()
+ self.eof_reached: Set[Tuple[str, str]] = set()
self._object_types = object_types
@@ -214,7 +223,7 @@
return nb_messages
def handle_messages(self, messages, worker_fn):
- objects = defaultdict(list)
+ objects: Dict[str, List[Any]] = defaultdict(list)
nb_processed = 0
for message in messages:
diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py
--- a/swh/journal/tests/test_client.py
+++ b/swh/journal/tests/test_client.py
@@ -37,14 +37,12 @@
)
producer.flush()
- config = {
- 'brokers': 'localhost:%d' % kafka_server[1],
- 'group_id': kafka_consumer_group,
- 'prefix': kafka_prefix,
- 'max_messages': 1,
- }
- client = JournalClient(**config)
-
+ client = JournalClient(
+ brokers='localhost:%d' % kafka_server[1],
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ max_messages=1,
+ )
worker_fn = MagicMock()
client.process(worker_fn)
@@ -73,13 +71,13 @@
)
producer.flush()
- config = {
- 'brokers': 'localhost:%d' % kafka_server[1],
- 'group_id': kafka_consumer_group,
- 'prefix': kafka_prefix,
- 'stop_on_eof': True,
- }
- client = JournalClient(**config)
+ client = JournalClient(
+ brokers='localhost:%d' % kafka_server[1],
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ max_messages=1,
+ stop_on_eof=True,
+ )
worker_fn = MagicMock()
client.process(worker_fn)
diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py
--- a/swh/journal/tests/test_replay.py
+++ b/swh/journal/tests/test_replay.py
@@ -72,13 +72,12 @@
producer.flush()
# Fill the storage from Kafka
- config = {
- 'brokers': 'localhost:%d' % kafka_server[1],
- 'group_id': kafka_consumer_group,
- 'prefix': kafka_prefix,
- 'max_messages': nb_sent,
- }
- replayer = JournalClient(**config)
+ replayer = JournalClient(
+ brokers='localhost:%d' % kafka_server[1],
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ max_messages=nb_sent,
+ )
worker_fn = functools.partial(process_replay_objects, storage=storage)
nb_inserted = 0
while nb_inserted < nb_sent:
@@ -147,7 +146,7 @@
writer.send('origin_visit', 'foo', visit)
queue_size = len(queue)
- assert replayer.max_messages == 0
+ assert replayer.max_messages is None
replayer.max_messages = queue_size
storage = get_storage(**storage_config)
diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py
--- a/swh/journal/tests/test_write_replay.py
+++ b/swh/journal/tests/test_write_replay.py
@@ -79,7 +79,7 @@
pass
queue_size = len(queue)
- assert replayer.max_messages == 0
+ assert replayer.max_messages is None
replayer.max_messages = queue_size
storage2 = get_storage(**storage_config)
@@ -135,7 +135,7 @@
contents.append(obj)
queue_size = len(queue)
- assert replayer.max_messages == 0
+ assert replayer.max_messages is None
replayer.max_messages = queue_size
storage2 = get_storage(**storage_config)
diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py
--- a/swh/journal/tests/utils.py
+++ b/swh/journal/tests/utils.py
@@ -69,7 +69,7 @@
def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES):
self._object_types = object_types
self.consumer = MockedKafkaConsumer(queue)
- self.process_timeout = 0
- self.max_messages = 0
+ self.process_timeout = None
+ self.max_messages = None
self.value_deserializer = kafka_to_value
self.stop_on_eof = False
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mar 17 2025, 7:05 PM (7 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217134
Attached To
D2795: Add type annotations to swh.journal.client arguments
Event Timeline
Log In to Comment