diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,4 +1,4 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -11,6 +11,7 @@ from confluent_kafka import Consumer, KafkaError, KafkaException +from swh.core.statsd import statsd from swh.journal import DEFAULT_PREFIX from .serializers import kafka_to_value @@ -28,6 +29,9 @@ KafkaError._NO_OFFSET, ] +JOURNAL_MESSAGE_NUMBER_METRIC = "swh_journal_client_handle_message_total" +JOURNAL_STATUS_METRIC = "swh_journal_client_status" + def get_journal_client(cls: str, **kwargs: Any): """Factory function to instantiate a journal client object. @@ -275,25 +279,42 @@ # timeout for message poll timeout = 1.0 - while True: - batch_size = self.batch_size - if self.stop_after_objects: - if total_objects_processed >= self.stop_after_objects: - break - - # clamp batch size to avoid overrunning stop_after_objects - batch_size = min( - self.stop_after_objects - total_objects_processed, batch_size, - ) - - messages = self.consumer.consume(timeout=timeout, num_messages=batch_size) - if not messages: - continue + statsd.gauge(JOURNAL_STATUS_METRIC, 0, tags={"status": "waiting"}) + statsd.gauge(JOURNAL_STATUS_METRIC, 0, tags={"status": "processing"}) + try: + while True: + batch_size = self.batch_size + if self.stop_after_objects: + if total_objects_processed >= self.stop_after_objects: + break + + # clamp batch size to avoid overrunning stop_after_objects + batch_size = min( + self.stop_after_objects - total_objects_processed, batch_size, + ) - batch_processed, at_eof = self.handle_messages(messages, worker_fn) - total_objects_processed += batch_processed - if at_eof: - break + statsd.gauge(JOURNAL_STATUS_METRIC, 1, tags={"status": "waiting"}) + while True: + messages = self.consumer.consume( + timeout=timeout, num_messages=batch_size + ) + if messages: + break + + statsd.gauge(JOURNAL_STATUS_METRIC, 0, tags={"status": "waiting"}) + statsd.gauge(JOURNAL_STATUS_METRIC, 1, tags={"status": "processing"}) + batch_processed, at_eof = self.handle_messages(messages, worker_fn) + statsd.gauge(JOURNAL_STATUS_METRIC, 0, tags={"status": "processing"}) + + # report the number of handled messages + statsd.increment(JOURNAL_MESSAGE_NUMBER_METRIC, value=batch_processed) + total_objects_processed += batch_processed + if at_eof: + break + finally: + # attempt to make gauges not derive too much + statsd.gauge(JOURNAL_STATUS_METRIC, 0, tags={"status": "waiting"}) + statsd.gauge(JOURNAL_STATUS_METRIC, 0, tags={"status": "processing"}) return total_objects_processed @@ -335,4 +356,5 @@ return self.value_deserializer(object_type, message.value()) def close(self): + logger.warning("Closing kafka consumer connection") self.consumer.close()