Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
Show All 15 Lines | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
rdkafka_logger = logging.getLogger(__name__ + ".rdkafka") | rdkafka_logger = logging.getLogger(__name__ + ".rdkafka") | ||||
# Only accepted offset reset policy accepted | # Only accepted offset reset policy accepted | ||||
ACCEPTED_OFFSET_RESET = ["earliest", "latest"] | ACCEPTED_OFFSET_RESET = ["earliest", "latest"] | ||||
# Only accepted object types | |||||
ACCEPTED_OBJECT_TYPES = [ | |||||
"content", | |||||
"directory", | |||||
"revision", | |||||
"release", | |||||
"snapshot", | |||||
"origin", | |||||
"origin_visit", | |||||
] | |||||
# Errors that Kafka raises too often and are not useful; therefore they | # Errors that Kafka raises too often and are not useful; therefore they | ||||
# we lower their log level to DEBUG instead of INFO. | # we lower their log level to DEBUG instead of INFO. | ||||
_SPAMMY_ERRORS = [ | _SPAMMY_ERRORS = [ | ||||
KafkaError._NO_OFFSET, | KafkaError._NO_OFFSET, | ||||
] | ] | ||||
def get_journal_client(cls: str, **kwargs: Any): | def get_journal_client(cls: str, **kwargs: Any): | ||||
Show All 25 Lines | class JournalClient: | ||||
The current implementation of the journal uses Apache Kafka | The current implementation of the journal uses Apache Kafka | ||||
brokers to publish messages under a given topic prefix, with each | brokers to publish messages under a given topic prefix, with each | ||||
object type using a specific topic under that prefix. If the `prefix` | object type using a specific topic under that prefix. If the `prefix` | ||||
argument is None (default value), it will take the default value | argument is None (default value), it will take the default value | ||||
`'swh.journal.objects'`. | `'swh.journal.objects'`. | ||||
Clients subscribe to events specific to each object type as listed in the | Clients subscribe to events specific to each object type as listed in the | ||||
`object_types` argument (if unset, defaults to all accepted object types). | `object_types` argument (if unset, defaults to all existing kafka topic under | ||||
the prefix). | |||||
Clients can be sharded by setting the `group_id` to a common | Clients can be sharded by setting the `group_id` to a common | ||||
value across instances. The journal will share the message | value across instances. The journal will share the message | ||||
throughput across the nodes sharing the same group_id. | throughput across the nodes sharing the same group_id. | ||||
Messages are processed by the `worker_fn` callback passed to the `process` | Messages are processed by the `worker_fn` callback passed to the `process` | ||||
method, in batches of maximum `batch_size` messages (defaults to 200). | method, in batches of maximum `batch_size` messages (defaults to 200). | ||||
Show All 21 Lines | def __init__( | ||||
batch_size: int = 200, | batch_size: int = 200, | ||||
process_timeout: Optional[float] = None, | process_timeout: Optional[float] = None, | ||||
auto_offset_reset: str = "earliest", | auto_offset_reset: str = "earliest", | ||||
stop_on_eof: bool = False, | stop_on_eof: bool = False, | ||||
**kwargs, | **kwargs, | ||||
): | ): | ||||
if prefix is None: | if prefix is None: | ||||
prefix = DEFAULT_PREFIX | prefix = DEFAULT_PREFIX | ||||
if object_types is None: | |||||
object_types = ACCEPTED_OBJECT_TYPES | |||||
if auto_offset_reset not in ACCEPTED_OFFSET_RESET: | if auto_offset_reset not in ACCEPTED_OFFSET_RESET: | ||||
raise ValueError( | raise ValueError( | ||||
"Option 'auto_offset_reset' only accept %s, not %s" | "Option 'auto_offset_reset' only accept %s, not %s" | ||||
% (ACCEPTED_OFFSET_RESET, auto_offset_reset) | % (ACCEPTED_OFFSET_RESET, auto_offset_reset) | ||||
) | ) | ||||
for object_type in object_types: | |||||
if object_type not in ACCEPTED_OBJECT_TYPES: | |||||
raise ValueError( | |||||
"Option 'object_types' only accepts %s, not %s." | |||||
% (ACCEPTED_OBJECT_TYPES, object_type) | |||||
) | |||||
if batch_size <= 0: | if batch_size <= 0: | ||||
raise ValueError("Option 'batch_size' needs to be positive") | raise ValueError("Option 'batch_size' needs to be positive") | ||||
self.value_deserializer = kafka_to_value | self.value_deserializer = kafka_to_value | ||||
if isinstance(brokers, str): | if isinstance(brokers, str): | ||||
brokers = [brokers] | brokers = [brokers] | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | ): | ||||
self.stop_on_eof = stop_on_eof | self.stop_on_eof = stop_on_eof | ||||
if self.stop_on_eof: | if self.stop_on_eof: | ||||
consumer_settings["enable.partition.eof"] = True | consumer_settings["enable.partition.eof"] = True | ||||
logger.debug("Consumer settings: %s", consumer_settings) | logger.debug("Consumer settings: %s", consumer_settings) | ||||
self.consumer = Consumer(consumer_settings) | self.consumer = Consumer(consumer_settings) | ||||
existing_topics = self.consumer.list_topics(timeout=10).topics.keys() | |||||
if not any(topic.startswith(f"{prefix}.") for topic in existing_topics): | |||||
raise ValueError( | |||||
f"The prefix {prefix} does not match any existing topic " | |||||
"on the kafka broker" | |||||
) | |||||
if object_types: | |||||
unknown_topics = [] | |||||
for object_type in object_types: | |||||
topic = f"{prefix}.{object_type}" | |||||
if topic not in existing_topics: | |||||
unknown_topics.append(topic) | |||||
if unknown_topics: | |||||
raise ValueError( | |||||
f"Topic(s) {','.join(unknown_topics)} " | |||||
"are unknown on the kafka broker" | |||||
) | |||||
self.subscription = [ | |||||
f"{prefix}.{object_type}" for object_type in object_types | |||||
] | |||||
else: | |||||
# subscribe to every topic under the prefix | |||||
self.subscription = [ | self.subscription = [ | ||||
"%s.%s" % (prefix, object_type) for object_type in object_types | topic for topic in existing_topics if topic.startswith(prefix) | ||||
] | ] | ||||
logger.debug(f"Upstream topics: {existing_topics}") | |||||
self.subscribe() | self.subscribe() | ||||
self.stop_after_objects = stop_after_objects | self.stop_after_objects = stop_after_objects | ||||
self.process_timeout = process_timeout | self.process_timeout = process_timeout | ||||
self.eof_reached: Set[Tuple[str, str]] = set() | self.eof_reached: Set[Tuple[str, str]] = set() | ||||
self.batch_size = batch_size | self.batch_size = batch_size | ||||
self._object_types = object_types | |||||
def subscribe(self): | def subscribe(self): | ||||
logger.debug("Upstream topics: %s", self.consumer.list_topics(timeout=10)) | """Subscribe to topics listed in self.subscription | ||||
logger.debug("Subscribing to: %s", self.subscription) | |||||
This can be overridden if you need, for instance, to manually assign partitions. | |||||
""" | |||||
logger.debug(f"Subscribing to: {self.subscription}") | |||||
self.consumer.subscribe(topics=self.subscription) | self.consumer.subscribe(topics=self.subscription) | ||||
def process(self, worker_fn): | def process(self, worker_fn): | ||||
"""Polls Kafka for a batch of messages, and calls the worker_fn | """Polls Kafka for a batch of messages, and calls the worker_fn | ||||
with these messages. | with these messages. | ||||
Args: | Args: | ||||
worker_fn Callable[Dict[str, List[dict]]]: Function called with | worker_fn Callable[Dict[str, List[dict]]]: Function called with | ||||
▲ Show 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | def handle_messages(self, messages, worker_fn): | ||||
if error is not None: | if error is not None: | ||||
if error.code() == KafkaError._PARTITION_EOF: | if error.code() == KafkaError._PARTITION_EOF: | ||||
self.eof_reached.add((message.topic(), message.partition())) | self.eof_reached.add((message.topic(), message.partition())) | ||||
else: | else: | ||||
_error_cb(error) | _error_cb(error) | ||||
continue | continue | ||||
nb_processed += 1 | nb_processed += 1 | ||||
object_type = message.topic().split(".")[-1] | object_type = message.topic().split(".")[-1] | ||||
# Got a message from a topic we did not subscribe to. | |||||
assert object_type in self._object_types, object_type | |||||
objects[object_type].append(self.deserialize_message(message)) | objects[object_type].append(self.deserialize_message(message)) | ||||
if objects: | if objects: | ||||
worker_fn(dict(objects)) | worker_fn(dict(objects)) | ||||
self.consumer.commit() | self.consumer.commit() | ||||
at_eof = self.stop_on_eof and all( | at_eof = self.stop_on_eof and all( | ||||
(tp.topic, tp.partition) in self.eof_reached | (tp.topic, tp.partition) in self.eof_reached | ||||
Show All 10 Lines |