Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/writer/kafka.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
import time | import time | ||||
from typing import ( | from typing import Any, Callable, Dict, Iterable, List, NamedTuple, Optional, Type | ||||
Any, | |||||
Callable, | |||||
Dict, | |||||
Generic, | |||||
Iterable, | |||||
List, | |||||
NamedTuple, | |||||
Optional, | |||||
Type, | |||||
TypeVar, | |||||
) | |||||
from confluent_kafka import KafkaException, Producer | from confluent_kafka import KafkaException, Producer | ||||
from swh.journal.serializers import KeyType, key_to_kafka, pprint_key, value_to_kafka | from swh.journal.serializers import KeyType, key_to_kafka, pprint_key, value_to_kafka | ||||
from . import ValueProtocol | from .interface import ValueProtocol | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
class DeliveryTag(NamedTuple): | class DeliveryTag(NamedTuple): | ||||
"""Unique tag allowing us to check for a message delivery""" | """Unique tag allowing us to check for a message delivery""" | ||||
topic: str | topic: str | ||||
Show All 26 Lines | def pretty_failures(self) -> str: | ||||
f"{f.object_type} {pprint_key(f.key)} ({f.message})" | f"{f.object_type} {pprint_key(f.key)} ({f.message})" | ||||
for f in self.delivery_failures | for f in self.delivery_failures | ||||
) | ) | ||||
def __str__(self): | def __str__(self): | ||||
return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" | return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" | ||||
TValue = TypeVar("TValue", bound=ValueProtocol) | class KafkaJournalWriter: | ||||
"""This class is used to write serialized versions of value objects to a series | |||||
of Kafka topics. The type parameter of value objects, which must implement | |||||
class KafkaJournalWriter(Generic[TValue]): | the `ValueProtocol`, is the type of values this writer will write. | ||||
"""This class is used to write serialized versions of value objects to a | Typically, `ValueProtocol` will be `swh.model.model.BaseModel`. | ||||
series of Kafka topics. The type parameter `TValue`, which must implement the | |||||
`ValueProtocol`, is the type of values this writer will write. | |||||
Typically, `TValue` will be `swh.model.model.BaseModel`. | |||||
Topics used to send objects representations are built from a ``prefix`` plus the | Topics used to send objects representations are built from a ``prefix`` plus the | ||||
type of the object: | type of the object: | ||||
``{prefix}.{object_type}`` | ``{prefix}.{object_type}`` | ||||
Objects can be sent as is, or can be anonymized. The anonymization feature, when | Objects can be sent as is, or can be anonymized. The anonymization feature, when | ||||
activated, will write anonymized versions of value objects in the main topic, and | activated, will write anonymized versions of value objects in the main topic, and | ||||
▲ Show 20 Lines • Show All 133 Lines • ▼ Show 20 Lines | def delivery_error(self, message) -> KafkaDeliveryError: | ||||
orig_key, | orig_key, | ||||
"No delivery before flush() timeout", | "No delivery before flush() timeout", | ||||
"SWH_FLUSH_TIMEOUT", | "SWH_FLUSH_TIMEOUT", | ||||
) | ) | ||||
) | ) | ||||
return KafkaDeliveryError(message, ret) | return KafkaDeliveryError(message, ret) | ||||
def flush(self): | def flush(self) -> None: | ||||
start = time.monotonic() | start = time.monotonic() | ||||
self.producer.flush(self.flush_timeout) | self.producer.flush(self.flush_timeout) | ||||
while self.deliveries_pending: | while self.deliveries_pending: | ||||
if time.monotonic() - start > self.flush_timeout: | if time.monotonic() - start > self.flush_timeout: | ||||
break | break | ||||
self.producer.poll(0.1) | self.producer.poll(0.1) | ||||
if self.deliveries_pending: | if self.deliveries_pending: | ||||
# Delivery timeout | # Delivery timeout | ||||
raise self.delivery_error( | raise self.delivery_error( | ||||
"flush() exceeded timeout (%ss)" % self.flush_timeout, | "flush() exceeded timeout (%ss)" % self.flush_timeout, | ||||
) | ) | ||||
elif self.delivery_failures: | elif self.delivery_failures: | ||||
raise self.delivery_error("Failed deliveries after flush()") | raise self.delivery_error("Failed deliveries after flush()") | ||||
def _write_addition(self, object_type: str, object_: TValue) -> None: | def _write_addition(self, object_type: str, object_: ValueProtocol) -> None: | ||||
"""Write a single object to the journal""" | """Write a single object to the journal""" | ||||
key = object_.unique_key() | key = object_.unique_key() | ||||
if self.anonymize: | if self.anonymize: | ||||
anon_object_ = object_.anonymize() | anon_object_ = object_.anonymize() | ||||
if anon_object_: # can be either None, or an anonymized object | if anon_object_: # can be either None, or an anonymized object | ||||
# if the object is anonymizable, send the non-anonymized version in the | # if the object is anonymizable, send the non-anonymized version in the | ||||
# privileged channel | # privileged channel | ||||
topic = f"{self._prefix_privileged}.{object_type}" | topic = f"{self._prefix_privileged}.{object_type}" | ||||
dict_ = self.value_sanitizer(object_type, object_.to_dict()) | dict_ = self.value_sanitizer(object_type, object_.to_dict()) | ||||
logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) | logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) | ||||
self.send(topic, key=key, value=dict_) | self.send(topic, key=key, value=dict_) | ||||
object_ = anon_object_ | object_ = anon_object_ | ||||
topic = f"{self._prefix}.{object_type}" | topic = f"{self._prefix}.{object_type}" | ||||
dict_ = self.value_sanitizer(object_type, object_.to_dict()) | dict_ = self.value_sanitizer(object_type, object_.to_dict()) | ||||
logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) | logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) | ||||
self.send(topic, key=key, value=dict_) | self.send(topic, key=key, value=dict_) | ||||
def write_addition(self, object_type: str, object_: TValue) -> None: | def write_addition(self, object_type: str, object_: ValueProtocol) -> None: | ||||
"""Write a single object to the journal""" | """Write a single object to the journal""" | ||||
self._write_addition(object_type, object_) | self._write_addition(object_type, object_) | ||||
self.flush() | self.flush() | ||||
write_update = write_addition | def write_additions( | ||||
self, object_type: str, objects: Iterable[ValueProtocol] | |||||
def write_additions(self, object_type: str, objects: Iterable[TValue]) -> None: | ) -> None: | ||||
"""Write a set of objects to the journal""" | """Write a set of objects to the journal""" | ||||
for object_ in objects: | for object_ in objects: | ||||
self._write_addition(object_type, object_) | self._write_addition(object_type, object_) | ||||
self.flush() | self.flush() |