diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1 +1,2 @@ +swh.core swh.model >= 0.12.0 diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,2 +1,3 @@ pytest hypothesis +swh.core[testing] diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,4 +1,4 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -11,6 +11,7 @@ from confluent_kafka import Consumer, KafkaError, KafkaException +from swh.core.statsd import statsd from swh.journal import DEFAULT_PREFIX from .serializers import kafka_to_value @@ -28,6 +29,11 @@ KafkaError._NO_OFFSET, ] +JOURNAL_BLOCKED_DURATION_METRIC = "swh_journal_client_blocked_duration_seconds" +JOURNAL_CONSUME_DURATION_METRIC = "swh_journal_client_consume_duration_seconds" +JOURNAL_HANDLE_DURATION_METRIC = "swh_journal_client_handle_duration_seconds" +JOURNAL_MESSAGE_NUMBER_METRIC = "swh_journal_client_handle_message_number" + def get_journal_client(cls: str, **kwargs: Any): """Factory function to instantiate a journal client object. @@ -242,6 +248,7 @@ argument. """ start_time = time.monotonic() + last_object_time = 0 total_objects_processed = 0 while True: @@ -274,12 +281,28 @@ messages = self.consumer.consume(timeout=timeout, num_messages=batch_size) if not messages: continue + # report the time we stayed blocked waiting for kafka; + # more or less the time it took to have a successful call to consume() + statsd.timing(JOURNAL_BLOCKED_DURATION_METRIC, elapsed - last_object_time) + # report the time it took to retrieve the bunch of journal messages; + # more or less the time the (successful) call to consume() took + statsd.timing( + JOURNAL_CONSUME_DURATION_METRIC, + (time.monotonic() - start_time) - elapsed, + ) - batch_processed, at_eof = self.handle_messages(messages, worker_fn) + # report the time it takes to handle messages + with statsd.timed(JOURNAL_HANDLE_DURATION_METRIC): + batch_processed, at_eof = self.handle_messages(messages, worker_fn) + + # report the number of handled messages + statsd.increment(JOURNAL_MESSAGE_NUMBER_METRIC, value=batch_processed) total_objects_processed += batch_processed if at_eof: break + last_object_time = time.monotonic() - start_time + return total_objects_processed def handle_messages(self, messages, worker_fn): diff --git a/swh/journal/tests/test_client_statsd.py b/swh/journal/tests/test_client_statsd.py new file mode 100644 --- /dev/null +++ b/swh/journal/tests/test_client_statsd.py @@ -0,0 +1,68 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import re +from typing import List, cast +from unittest.mock import MagicMock + +from confluent_kafka import Producer + +import swh.journal.client as jclient +from swh.journal.serializers import value_to_kafka +from swh.model.model import Revision +from swh.model.tests.swh_model_data import TEST_OBJECTS + + +def test_client_stop_after_objects( + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: str, + statsd, + monkeypatch, +): + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test producer", + "acks": "all", + } + ) + + # Fill Kafka + revisions = cast(List[Revision], TEST_OBJECTS["revision"]) + for rev in revisions: + producer.produce( + topic=kafka_prefix + ".revision", + key=rev.id, + value=value_to_kafka(rev.to_dict()), + ) + producer.flush() + + monkeypatch.setattr(jclient, "statsd", statsd) + + client = jclient.JournalClient( + brokers=[kafka_server], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_on_eof=True, + ) + + worker_fn = MagicMock() + client.process(worker_fn) + assert re.match( + r"^swh_journal_client_blocked_duration_seconds:\d+.\d+\|ms$", + statsd.socket.recv(), + ) + assert re.match( + r"^swh_journal_client_consume_duration_seconds:\d+.\d+\|ms$", + statsd.socket.recv(), + ) + assert re.match( + r"^swh_journal_client_handle_duration_seconds:\d+.\d+\|ms$", + statsd.socket.recv(), + ) + assert re.match( + r"^swh_journal_client_handle_message_number:2\|c$", statsd.socket.recv() + )