Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/pytest_plugin.py
Show All 21 Lines | def consume_messages(consumer, kafka_prefix, expected_messages): | ||||
Sort them all into a consumed_objects dict""" | Sort them all into a consumed_objects dict""" | ||||
consumed_messages = defaultdict(list) | consumed_messages = defaultdict(list) | ||||
fetched_messages = 0 | fetched_messages = 0 | ||||
retries_left = 1000 | retries_left = 1000 | ||||
while fetched_messages < expected_messages: | while fetched_messages < expected_messages: | ||||
if retries_left == 0: | if retries_left == 0: | ||||
raise ValueError("Timed out fetching messages from kafka") | raise ValueError( | ||||
"Timed out fetching messages from kafka. " | |||||
f"Only {fetched_messages}/{expected_messages} fetched" | |||||
) | |||||
msg = consumer.poll(timeout=0.01) | msg = consumer.poll(timeout=0.01) | ||||
if not msg: | if not msg: | ||||
retries_left -= 1 | retries_left -= 1 | ||||
continue | continue | ||||
error = msg.error() | error = msg.error() | ||||
▲ Show 20 Lines • Show All 115 Lines • Show Last 20 Lines |