diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -6,7 +6,6 @@ from collections import defaultdict import logging import os -import time from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union from confluent_kafka import Consumer, KafkaError, KafkaException @@ -220,10 +219,16 @@ self.subscribe() self.stop_after_objects = stop_after_objects - self.process_timeout = process_timeout + self.eof_reached: Set[Tuple[str, str]] = set() self.batch_size = batch_size + if process_timeout is not None: + raise DeprecationWarning( + "'process_timeout' argument is not supported anymore by " + "JournalClient; please remove it from your configuration.", + ) + def subscribe(self): """Subscribe to topics listed in self.subscription @@ -241,26 +246,11 @@ the messages as argument. """ - start_time = time.monotonic() total_objects_processed = 0 + # timeout for message poll + timeout = 1.0 while True: - # timeout for message poll - timeout = 1.0 - - elapsed = time.monotonic() - start_time - if self.process_timeout: - # +0.01 to prevent busy-waiting on / spamming consumer.poll. - # consumer.consume() returns shortly before X expired - # (a matter of milliseconds), so after it returns a first - # time, it would then be called with a timeout in the order - # of milliseconds, therefore returning immediately, then be - # called again, etc. - if elapsed + 0.01 >= self.process_timeout: - break - - timeout = self.process_timeout - elapsed - batch_size = self.batch_size if self.stop_after_objects: if total_objects_processed >= self.stop_after_objects: