Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/writer/kafka.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
import time | import time | ||||
from typing import Any, Callable, Dict, Iterable, List, NamedTuple, Optional, Type | from typing import ( | ||||
Any, | |||||
Callable, | |||||
Dict, | |||||
Generic, | |||||
Iterable, | |||||
List, | |||||
NamedTuple, | |||||
Optional, | |||||
Type, | |||||
TypeVar, | |||||
) | |||||
from confluent_kafka import KafkaException, Producer | from confluent_kafka import KafkaException, Producer | ||||
from swh.journal.serializers import ( | from swh.journal.serializers import KeyType, key_to_kafka, pprint_key, value_to_kafka | ||||
KeyType, | |||||
ModelObject, | from . import ValueProtocol | ||||
key_to_kafka, | |||||
pprint_key, | |||||
value_to_kafka, | |||||
) | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
class DeliveryTag(NamedTuple): | class DeliveryTag(NamedTuple): | ||||
"""Unique tag allowing us to check for a message delivery""" | """Unique tag allowing us to check for a message delivery""" | ||||
topic: str | topic: str | ||||
Show All 26 Lines | def pretty_failures(self) -> str: | ||||
f"{f.object_type} {pprint_key(f.key)} ({f.message})" | f"{f.object_type} {pprint_key(f.key)} ({f.message})" | ||||
for f in self.delivery_failures | for f in self.delivery_failures | ||||
) | ) | ||||
def __str__(self): | def __str__(self): | ||||
return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" | return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" | ||||
class KafkaJournalWriter: | TValue = TypeVar("TValue", bound=ValueProtocol) | ||||
class KafkaJournalWriter(Generic[TValue]): | |||||
"""This class is used to write serialized versions of swh.model.model objects to a | """This class is used to write serialized versions of swh.model.model objects to a | ||||
douardda: The docstring should be updated to document the generic aspect of it. | |||||
series of Kafka topics. | series of Kafka topics. | ||||
Topics used to send objects representations are built from a ``prefix`` plus the | Topics used to send objects representations are built from a ``prefix`` plus the | ||||
type of the object: | type of the object: | ||||
``{prefix}.{object_type}`` | ``{prefix}.{object_type}`` | ||||
Objects can be sent as is, or can be anonymized. The anonymization feature, when | Objects can be sent as is, or can be anonymized. The anonymization feature, when | ||||
▲ Show 20 Lines • Show All 122 Lines • ▼ Show 20 Lines | def flush(self): | ||||
if self.deliveries_pending: | if self.deliveries_pending: | ||||
# Delivery timeout | # Delivery timeout | ||||
raise self.delivery_error( | raise self.delivery_error( | ||||
"flush() exceeded timeout (%ss)" % self.flush_timeout, | "flush() exceeded timeout (%ss)" % self.flush_timeout, | ||||
) | ) | ||||
elif self.delivery_failures: | elif self.delivery_failures: | ||||
raise self.delivery_error("Failed deliveries after flush()") | raise self.delivery_error("Failed deliveries after flush()") | ||||
def _write_addition(self, object_type: str, object_: ModelObject) -> None: | def _write_addition(self, object_type: str, object_: TValue) -> None: | ||||
"""Write a single object to the journal""" | """Write a single object to the journal""" | ||||
key = object_.unique_key() | key = object_.unique_key() | ||||
if self.anonymize: | if self.anonymize: | ||||
anon_object_ = object_.anonymize() | anon_object_ = object_.anonymize() | ||||
if anon_object_: # can be either None, or an anonymized object | if anon_object_: # can be either None, or an anonymized object | ||||
# if the object is anonymizable, send the non-anonymized version in the | # if the object is anonymizable, send the non-anonymized version in the | ||||
# privileged channel | # privileged channel | ||||
topic = f"{self._prefix_privileged}.{object_type}" | topic = f"{self._prefix_privileged}.{object_type}" | ||||
dict_ = self.value_sanitizer(object_type, object_.to_dict()) | dict_ = self.value_sanitizer(object_type, object_.to_dict()) | ||||
logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) | logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) | ||||
self.send(topic, key=key, value=dict_) | self.send(topic, key=key, value=dict_) | ||||
object_ = anon_object_ | object_ = anon_object_ | ||||
topic = f"{self._prefix}.{object_type}" | topic = f"{self._prefix}.{object_type}" | ||||
dict_ = self.value_sanitizer(object_type, object_.to_dict()) | dict_ = self.value_sanitizer(object_type, object_.to_dict()) | ||||
logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) | logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) | ||||
self.send(topic, key=key, value=dict_) | self.send(topic, key=key, value=dict_) | ||||
def write_addition(self, object_type: str, object_: ModelObject) -> None: | def write_addition(self, object_type: str, object_: TValue) -> None: | ||||
"""Write a single object to the journal""" | """Write a single object to the journal""" | ||||
self._write_addition(object_type, object_) | self._write_addition(object_type, object_) | ||||
self.flush() | self.flush() | ||||
write_update = write_addition | write_update = write_addition | ||||
def write_additions(self, object_type: str, objects: Iterable[ModelObject]) -> None: | def write_additions(self, object_type: str, objects: Iterable[TValue]) -> None: | ||||
"""Write a set of objects to the journal""" | """Write a set of objects to the journal""" | ||||
for object_ in objects: | for object_ in objects: | ||||
self._write_addition(object_type, object_) | self._write_addition(object_type, object_) | ||||
self.flush() | self.flush() |
The docstring should be updated to document the generic aspect of it.