Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
# Copyright (C) 2017 The Software Heritage developers | # Copyright (C) 2017-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import logging | import logging | ||||
import os | import os | ||||
import time | import time | ||||
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union | from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union | ||||
from confluent_kafka import Consumer, KafkaError, KafkaException | from confluent_kafka import Consumer, KafkaError, KafkaException | ||||
from swh.core.statsd import statsd | |||||
from swh.journal import DEFAULT_PREFIX | from swh.journal import DEFAULT_PREFIX | ||||
from .serializers import kafka_to_value | from .serializers import kafka_to_value | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
rdkafka_logger = logging.getLogger(__name__ + ".rdkafka") | rdkafka_logger = logging.getLogger(__name__ + ".rdkafka") | ||||
# Only accepted offset reset policy accepted | # Only accepted offset reset policy accepted | ||||
ACCEPTED_OFFSET_RESET = ["earliest", "latest"] | ACCEPTED_OFFSET_RESET = ["earliest", "latest"] | ||||
# Errors that Kafka raises too often and are not useful; therefore they | # Errors that Kafka raises too often and are not useful; therefore they | ||||
# we lower their log level to DEBUG instead of INFO. | # we lower their log level to DEBUG instead of INFO. | ||||
_SPAMMY_ERRORS = [ | _SPAMMY_ERRORS = [ | ||||
KafkaError._NO_OFFSET, | KafkaError._NO_OFFSET, | ||||
] | ] | ||||
JOURNAL_BLOCKED_DURATION_METRIC = "swh_journal_client_blocked_duration_seconds" | |||||
JOURNAL_CONSUME_DURATION_METRIC = "swh_journal_client_consume_duration_seconds" | |||||
JOURNAL_HANDLE_DURATION_METRIC = "swh_journal_client_handle_duration_seconds" | |||||
JOURNAL_MESSAGE_NUMBER_METRIC = "swh_journal_client_handle_message_number" | |||||
olasd: Should be plural and end with `_total` rather than "number" (ref. https://prometheus. | |||||
douarddaAuthorUnsubmitted Done Inline Actionsah yes, i wanted to go & check this but hen I forgot! douardda: ah yes, i wanted to go & check this but hen I forgot! | |||||
def get_journal_client(cls: str, **kwargs: Any): | def get_journal_client(cls: str, **kwargs: Any): | ||||
"""Factory function to instantiate a journal client object. | """Factory function to instantiate a journal client object. | ||||
Currently, only the "kafka" journal client is supported. | Currently, only the "kafka" journal client is supported. | ||||
""" | """ | ||||
if cls == "kafka": | if cls == "kafka": | ||||
return JournalClient(**kwargs) | return JournalClient(**kwargs) | ||||
▲ Show 20 Lines • Show All 198 Lines • ▼ Show 20 Lines | def process(self, worker_fn): | ||||
with these messages. | with these messages. | ||||
Args: | Args: | ||||
worker_fn Callable[Dict[str, List[dict]]]: Function called with | worker_fn Callable[Dict[str, List[dict]]]: Function called with | ||||
the messages as | the messages as | ||||
argument. | argument. | ||||
""" | """ | ||||
start_time = time.monotonic() | start_time = time.monotonic() | ||||
last_object_time = 0 | |||||
total_objects_processed = 0 | total_objects_processed = 0 | ||||
while True: | while True: | ||||
# timeout for message poll | # timeout for message poll | ||||
timeout = 1.0 | timeout = 1.0 | ||||
elapsed = time.monotonic() - start_time | elapsed = time.monotonic() - start_time | ||||
if self.process_timeout: | if self.process_timeout: | ||||
Show All 16 Lines | def process(self, worker_fn): | ||||
# clamp batch size to avoid overrunning stop_after_objects | # clamp batch size to avoid overrunning stop_after_objects | ||||
batch_size = min( | batch_size = min( | ||||
self.stop_after_objects - total_objects_processed, batch_size, | self.stop_after_objects - total_objects_processed, batch_size, | ||||
) | ) | ||||
messages = self.consumer.consume(timeout=timeout, num_messages=batch_size) | messages = self.consumer.consume(timeout=timeout, num_messages=batch_size) | ||||
if not messages: | if not messages: | ||||
continue | continue | ||||
# report the time we stayed blocked waiting for kafka; | |||||
# more or less the time it took to have a successful call to consume() | |||||
statsd.timing(JOURNAL_BLOCKED_DURATION_METRIC, elapsed - last_object_time) | |||||
vlorentzUnsubmitted Not Done Inline ActionsI don't understand how this one works. On the first loop run, last_object_time is 0 and elapsed is the time it took to get from line 250 to line 258, which is essentially this code: start_time = time.monotonic() last_object_time = 0 total_objects_processed = 0 # while True: # timeout for message poll timeout = 1.0 elapsed = time.monotonic() - start_time On other loop runs, elapsed - last_object_time is the time it takes from line 304 to line 258, which would be this code: last_object_time = time.monotonic() - start_time # while True: # timeout for message poll timeout = 1.0 elapsed = time.monotonic() - start_time vlorentz: I don't understand how this one works.
If I'm not mistaken, this only times negligible pure… | |||||
douarddaAuthorUnsubmitted Done Inline ActionsI made a better version of this part, but in the end, it won't be merged, this is not really the metrics we want and the produced histogram are pretty much useless. douardda: I made a better version of this part, but in the end, it won't be merged, this is not really… | |||||
# report the time it took to retrieve the bunch of journal messages; | |||||
# more or less the time the (successful) call to consume() took | |||||
statsd.timing( | |||||
JOURNAL_CONSUME_DURATION_METRIC, | |||||
(time.monotonic() - start_time) - elapsed, | |||||
) | |||||
# report the time it takes to handle messages | |||||
with statsd.timed(JOURNAL_HANDLE_DURATION_METRIC): | |||||
batch_processed, at_eof = self.handle_messages(messages, worker_fn) | batch_processed, at_eof = self.handle_messages(messages, worker_fn) | ||||
# report the number of handled messages | |||||
statsd.increment(JOURNAL_MESSAGE_NUMBER_METRIC, value=batch_processed) | |||||
total_objects_processed += batch_processed | total_objects_processed += batch_processed | ||||
if at_eof: | if at_eof: | ||||
break | break | ||||
last_object_time = time.monotonic() - start_time | |||||
olasdUnsubmitted Not Done Inline Actionsshouldn't this just be time.monotonic()? olasd: shouldn't this just be `time.monotonic()`? | |||||
douarddaAuthorUnsubmitted Done Inline Actionswould probably make things a bit easier to follow yes... will try douardda: would probably make things a bit easier to follow yes... will try | |||||
return total_objects_processed | return total_objects_processed | ||||
def handle_messages(self, messages, worker_fn): | def handle_messages(self, messages, worker_fn): | ||||
objects: Dict[str, List[Any]] = defaultdict(list) | objects: Dict[str, List[Any]] = defaultdict(list) | ||||
nb_processed = 0 | nb_processed = 0 | ||||
for message in messages: | for message in messages: | ||||
error = message.error() | error = message.error() | ||||
Show All 33 Lines |
Should be plural and end with _total rather than "number" (ref. https://prometheus.io/docs/practices/naming/)