diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1 +1,2 @@ +swh.core >= 1.1 swh.model >= 0.12.0 diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,2 +1,3 @@ pytest hypothesis +swh.core[testing] diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,4 +1,4 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -11,6 +11,7 @@ from confluent_kafka import Consumer, KafkaError, KafkaException +from swh.core.statsd import statsd from swh.journal import DEFAULT_PREFIX from .serializers import kafka_to_value @@ -28,6 +29,9 @@ KafkaError._NO_OFFSET, ] +JOURNAL_MESSAGE_NUMBER_METRIC = "swh_journal_client_handle_message_total" +JOURNAL_STATUS_METRIC = "swh_journal_client_status" + def get_journal_client(cls: str, **kwargs: Any): """Factory function to instantiate a journal client object. @@ -275,25 +279,38 @@ # timeout for message poll timeout = 1.0 - while True: - batch_size = self.batch_size - if self.stop_after_objects: - if total_objects_processed >= self.stop_after_objects: - break + with statsd.status_gauge( + JOURNAL_STATUS_METRIC, statuses=["idle", "processing", "waiting"] + ) as set_status: + set_status("idle") + while True: + batch_size = self.batch_size + if self.stop_after_objects: + if total_objects_processed >= self.stop_after_objects: + break + + # clamp batch size to avoid overrunning stop_after_objects + batch_size = min( + self.stop_after_objects - total_objects_processed, batch_size, + ) - # clamp batch size to avoid overrunning stop_after_objects - batch_size = min( - self.stop_after_objects - total_objects_processed, batch_size, - ) + set_status("waiting") + while True: + messages = self.consumer.consume( + timeout=timeout, num_messages=batch_size + ) + if messages: + break - messages = self.consumer.consume(timeout=timeout, num_messages=batch_size) - if not messages: - continue + set_status("processing") + batch_processed, at_eof = self.handle_messages(messages, worker_fn) - batch_processed, at_eof = self.handle_messages(messages, worker_fn) - total_objects_processed += batch_processed - if at_eof: - break + set_status("idle") + # report the number of handled messages + statsd.increment(JOURNAL_MESSAGE_NUMBER_METRIC, value=batch_processed) + total_objects_processed += batch_processed + if at_eof: + break return total_objects_processed