Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
# Copyright (C) 2017 The Software Heritage developers | # Copyright (C) 2017-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from importlib import import_module | from importlib import import_module | ||||
import logging | import logging | ||||
import os | import os | ||||
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union | from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union | ||||
from confluent_kafka import Consumer, KafkaError, KafkaException | from confluent_kafka import Consumer, KafkaError, KafkaException | ||||
from swh.core.statsd import statsd | |||||
from swh.journal import DEFAULT_PREFIX | from swh.journal import DEFAULT_PREFIX | ||||
from .serializers import kafka_to_value | from .serializers import kafka_to_value | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
rdkafka_logger = logging.getLogger(__name__ + ".rdkafka") | rdkafka_logger = logging.getLogger(__name__ + ".rdkafka") | ||||
# Only accepted offset reset policy accepted | # Only accepted offset reset policy accepted | ||||
ACCEPTED_OFFSET_RESET = ["earliest", "latest"] | ACCEPTED_OFFSET_RESET = ["earliest", "latest"] | ||||
# Errors that Kafka raises too often and are not useful; therefore they | # Errors that Kafka raises too often and are not useful; therefore they | ||||
# we lower their log level to DEBUG instead of INFO. | # we lower their log level to DEBUG instead of INFO. | ||||
_SPAMMY_ERRORS = [ | _SPAMMY_ERRORS = [ | ||||
KafkaError._NO_OFFSET, | KafkaError._NO_OFFSET, | ||||
] | ] | ||||
JOURNAL_MESSAGE_NUMBER_METRIC = "swh_journal_client_handle_message_total" | |||||
JOURNAL_STATUS_METRIC = "swh_journal_client_status" | |||||
def get_journal_client(cls: str, **kwargs: Any): | def get_journal_client(cls: str, **kwargs: Any): | ||||
"""Factory function to instantiate a journal client object. | """Factory function to instantiate a journal client object. | ||||
Currently, only the "kafka" journal client is supported. | Currently, only the "kafka" journal client is supported. | ||||
""" | """ | ||||
if cls == "kafka": | if cls == "kafka": | ||||
if "stats_cb" in kwargs: | if "stats_cb" in kwargs: | ||||
▲ Show 20 Lines • Show All 231 Lines • ▼ Show 20 Lines | def process(self, worker_fn): | ||||
worker_fn Callable[Dict[str, List[dict]]]: Function called with | worker_fn Callable[Dict[str, List[dict]]]: Function called with | ||||
the messages as | the messages as | ||||
argument. | argument. | ||||
""" | """ | ||||
total_objects_processed = 0 | total_objects_processed = 0 | ||||
# timeout for message poll | # timeout for message poll | ||||
timeout = 1.0 | timeout = 1.0 | ||||
with statsd.status_gauge( | |||||
JOURNAL_STATUS_METRIC, statuses=["idle", "processing", "waiting"] | |||||
) as set_status: | |||||
set_status("idle") | |||||
while True: | while True: | ||||
batch_size = self.batch_size | batch_size = self.batch_size | ||||
if self.stop_after_objects: | if self.stop_after_objects: | ||||
if total_objects_processed >= self.stop_after_objects: | if total_objects_processed >= self.stop_after_objects: | ||||
break | break | ||||
# clamp batch size to avoid overrunning stop_after_objects | # clamp batch size to avoid overrunning stop_after_objects | ||||
batch_size = min( | batch_size = min( | ||||
self.stop_after_objects - total_objects_processed, batch_size, | self.stop_after_objects - total_objects_processed, batch_size, | ||||
) | ) | ||||
messages = self.consumer.consume(timeout=timeout, num_messages=batch_size) | set_status("waiting") | ||||
if not messages: | while True: | ||||
continue | messages = self.consumer.consume( | ||||
timeout=timeout, num_messages=batch_size | |||||
) | |||||
if messages: | |||||
break | |||||
set_status("processing") | |||||
batch_processed, at_eof = self.handle_messages(messages, worker_fn) | batch_processed, at_eof = self.handle_messages(messages, worker_fn) | ||||
set_status("idle") | |||||
# report the number of handled messages | |||||
statsd.increment(JOURNAL_MESSAGE_NUMBER_METRIC, value=batch_processed) | |||||
total_objects_processed += batch_processed | total_objects_processed += batch_processed | ||||
if at_eof: | if at_eof: | ||||
break | break | ||||
return total_objects_processed | return total_objects_processed | ||||
def handle_messages(self, messages, worker_fn): | def handle_messages(self, messages, worker_fn): | ||||
objects: Dict[str, List[Any]] = defaultdict(list) | objects: Dict[str, List[Any]] = defaultdict(list) | ||||
nb_processed = 0 | nb_processed = 0 | ||||
for message in messages: | for message in messages: | ||||
Show All 24 Lines | def handle_messages(self, messages, worker_fn): | ||||
for tp in self.consumer.assignment() | for tp in self.consumer.assignment() | ||||
) | ) | ||||
return nb_processed, at_eof | return nb_processed, at_eof | ||||
def deserialize_message(self, message, object_type=None): | def deserialize_message(self, message, object_type=None): | ||||
return self.value_deserializer(object_type, message.value()) | return self.value_deserializer(object_type, message.value()) | ||||
def close(self): | def close(self): | ||||
olasd: looks like a spurious change | |||||
self.consumer.close() | self.consumer.close() |
looks like a spurious change