diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -5,6 +5,7 @@ from collections import defaultdict from importlib import import_module +from itertools import cycle import logging import os from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union @@ -293,22 +294,35 @@ batch_size = min( self.stop_after_objects - total_objects_processed, batch_size, ) - set_status("waiting") - while True: + for i in cycle(reversed(range(10))): messages = self.consumer.consume( timeout=timeout, num_messages=batch_size ) if messages: break - set_status("processing") - batch_processed, at_eof = self.handle_messages(messages, worker_fn) + # do check for an EOF condition iff we already consumed + # messages, otherwise we could detect an EOF condition + # before messages had a chance to reach us (e.g. in tests) + if total_objects_processed > 0 and self.stop_on_eof and i == 0: + at_eof = all( + (tp.topic, tp.partition) in self.eof_reached + for tp in self.consumer.assignment() + ) + if at_eof: + break + if messages: + set_status("processing") + batch_processed, at_eof = self.handle_messages(messages, worker_fn) + + set_status("idle") + # report the number of handled messages + statsd.increment( + JOURNAL_MESSAGE_NUMBER_METRIC, value=batch_processed + ) + total_objects_processed += batch_processed - set_status("idle") - # report the number of handled messages - statsd.increment(JOURNAL_MESSAGE_NUMBER_METRIC, value=batch_processed) - total_objects_processed += batch_processed if at_eof: break