Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
# Copyright (C) 2017 The Software Heritage developers | # Copyright (C) 2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import logging | import logging | ||||
import os | import os | ||||
import time | import time | ||||
from typing import Any, Dict, List, Optional, Set, Tuple, Union | from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union | ||||
from confluent_kafka import Consumer, KafkaError, KafkaException | from confluent_kafka import Consumer, KafkaError, KafkaException | ||||
from swh.journal import DEFAULT_PREFIX | from swh.journal import DEFAULT_PREFIX | ||||
from .serializers import kafka_to_value | from .serializers import kafka_to_value | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | class JournalClient: | ||||
the prefix). | the prefix). | ||||
Clients can be sharded by setting the `group_id` to a common | Clients can be sharded by setting the `group_id` to a common | ||||
value across instances. The journal will share the message | value across instances. The journal will share the message | ||||
throughput across the nodes sharing the same group_id. | throughput across the nodes sharing the same group_id. | ||||
Messages are processed by the `worker_fn` callback passed to the `process` | Messages are processed by the `worker_fn` callback passed to the `process` | ||||
method, in batches of maximum `batch_size` messages (defaults to 200). | method, in batches of maximum `batch_size` messages (defaults to 200). | ||||
The objects passed to the `worker_fn` callback are the result of the kafka | |||||
message converted by the `value_deserializer` function. By default (if this | |||||
argument is not given), it will produce dicts (using the `kafka_to_value` | |||||
vlorentz: You should document the arguments of the function (since its signature is not the same as… | |||||
Done Inline ActionsAh yes, I had this in mind then forgot. thx douardda: Ah yes, I had this in mind then forgot. thx | |||||
function). This signature of the function is: | |||||
`value_deserializer(object_type: str, kafka_msg: bytes) -> Any` | |||||
If the value returned by `value_deserializer` is None, it is ignored and | |||||
not passed the `worker_fn` function. | |||||
If set, the processing stops after processing `stop_after_objects` messages | If set, the processing stops after processing `stop_after_objects` messages | ||||
in total. | in total. | ||||
`stop_on_eof` stops the processing when the client has reached the end of | `stop_on_eof` stops the processing when the client has reached the end of | ||||
each partition in turn. | each partition in turn. | ||||
`auto_offset_reset` sets the behavior of the client when the consumer group | `auto_offset_reset` sets the behavior of the client when the consumer group | ||||
initializes: `'earliest'` (the default) processes all objects since the | initializes: `'earliest'` (the default) processes all objects since the | ||||
Show All 10 Lines | def __init__( | ||||
prefix: Optional[str] = None, | prefix: Optional[str] = None, | ||||
object_types: Optional[List[str]] = None, | object_types: Optional[List[str]] = None, | ||||
privileged: bool = False, | privileged: bool = False, | ||||
stop_after_objects: Optional[int] = None, | stop_after_objects: Optional[int] = None, | ||||
batch_size: int = 200, | batch_size: int = 200, | ||||
process_timeout: Optional[float] = None, | process_timeout: Optional[float] = None, | ||||
auto_offset_reset: str = "earliest", | auto_offset_reset: str = "earliest", | ||||
stop_on_eof: bool = False, | stop_on_eof: bool = False, | ||||
value_deserializer: Optional[Callable[[str, bytes], Any]] = None, | |||||
**kwargs, | **kwargs, | ||||
): | ): | ||||
if prefix is None: | if prefix is None: | ||||
prefix = DEFAULT_PREFIX | prefix = DEFAULT_PREFIX | ||||
if auto_offset_reset not in ACCEPTED_OFFSET_RESET: | if auto_offset_reset not in ACCEPTED_OFFSET_RESET: | ||||
raise ValueError( | raise ValueError( | ||||
"Option 'auto_offset_reset' only accept %s, not %s" | "Option 'auto_offset_reset' only accept %s, not %s" | ||||
% (ACCEPTED_OFFSET_RESET, auto_offset_reset) | % (ACCEPTED_OFFSET_RESET, auto_offset_reset) | ||||
) | ) | ||||
if batch_size <= 0: | if batch_size <= 0: | ||||
raise ValueError("Option 'batch_size' needs to be positive") | raise ValueError("Option 'batch_size' needs to be positive") | ||||
if value_deserializer: | |||||
self.value_deserializer = kafka_to_value | self.value_deserializer = value_deserializer | ||||
else: | |||||
self.value_deserializer = lambda _, value: kafka_to_value(value) | |||||
if isinstance(brokers, str): | if isinstance(brokers, str): | ||||
brokers = [brokers] | brokers = [brokers] | ||||
debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) | debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) | ||||
if debug_logging and "debug" not in kwargs: | if debug_logging and "debug" not in kwargs: | ||||
kwargs["debug"] = "consumer" | kwargs["debug"] = "consumer" | ||||
▲ Show 20 Lines • Show All 157 Lines • ▼ Show 20 Lines | def handle_messages(self, messages, worker_fn): | ||||
else: | else: | ||||
_error_cb(error) | _error_cb(error) | ||||
continue | continue | ||||
if message.value() is None: | if message.value() is None: | ||||
# ignore message with no payload, these can be generated in tests | # ignore message with no payload, these can be generated in tests | ||||
continue | continue | ||||
nb_processed += 1 | nb_processed += 1 | ||||
object_type = message.topic().split(".")[-1] | object_type = message.topic().split(".")[-1] | ||||
objects[object_type].append(self.deserialize_message(message)) | deserialized_object = self.deserialize_message( | ||||
message, object_type=object_type | |||||
) | |||||
if deserialized_object is not None: | |||||
objects[object_type].append(deserialized_object) | |||||
if objects: | if objects: | ||||
worker_fn(dict(objects)) | worker_fn(dict(objects)) | ||||
self.consumer.commit() | self.consumer.commit() | ||||
at_eof = self.stop_on_eof and all( | at_eof = self.stop_on_eof and all( | ||||
(tp.topic, tp.partition) in self.eof_reached | (tp.topic, tp.partition) in self.eof_reached | ||||
for tp in self.consumer.assignment() | for tp in self.consumer.assignment() | ||||
) | ) | ||||
return nb_processed, at_eof | return nb_processed, at_eof | ||||
def deserialize_message(self, message): | def deserialize_message(self, message, object_type=None): | ||||
return self.value_deserializer(message.value()) | return self.value_deserializer(object_type, message.value()) | ||||
def close(self): | def close(self): | ||||
self.consumer.close() | self.consumer.close() |
You should document the arguments of the function (since its signature is not the same as kafka_to_value)