Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/pytest_plugin.py
Show First 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | def consume_messages(consumer, kafka_prefix, expected_messages): | ||||
while fetched_messages < expected_messages: | while fetched_messages < expected_messages: | ||||
if retries_left == 0: | if retries_left == 0: | ||||
raise ValueError( | raise ValueError( | ||||
"Timed out fetching messages from kafka. " | "Timed out fetching messages from kafka. " | ||||
f"Only {fetched_messages}/{expected_messages} fetched" | f"Only {fetched_messages}/{expected_messages} fetched" | ||||
) | ) | ||||
msg = consumer.poll(timeout=0.01) | msg = consumer.poll(timeout=0.1) | ||||
if not msg: | if not msg: | ||||
retries_left -= 1 | retries_left -= 1 | ||||
continue | continue | ||||
error = msg.error() | error = msg.error() | ||||
if error is not None: | if error is not None: | ||||
if error.fatal(): | if error.fatal(): | ||||
▲ Show 20 Lines • Show All 199 Lines • Show Last 20 Lines |