Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/writer/kafka.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
import time | import time | ||||
from typing import ( | from typing import ( | ||||
Any, | Any, | ||||
Callable, | Callable, | ||||
Dict, | Dict, | ||||
Generic, | Generic, | ||||
Iterable, | Iterable, | ||||
List, | List, | ||||
NamedTuple, | NamedTuple, | ||||
Optional, | Optional, | ||||
Type, | Type, | ||||
TypeVar, | |||||
) | ) | ||||
from confluent_kafka import KafkaException, Producer | from confluent_kafka import KafkaException, Producer | ||||
from swh.journal.serializers import KeyType, key_to_kafka, pprint_key, value_to_kafka | from swh.journal.serializers import KeyType, key_to_kafka, pprint_key, value_to_kafka | ||||
from . import ValueProtocol | from .interface import TValue | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
class DeliveryTag(NamedTuple): | class DeliveryTag(NamedTuple): | ||||
"""Unique tag allowing us to check for a message delivery""" | """Unique tag allowing us to check for a message delivery""" | ||||
topic: str | topic: str | ||||
Show All 26 Lines | def pretty_failures(self) -> str: | ||||
f"{f.object_type} {pprint_key(f.key)} ({f.message})" | f"{f.object_type} {pprint_key(f.key)} ({f.message})" | ||||
for f in self.delivery_failures | for f in self.delivery_failures | ||||
) | ) | ||||
def __str__(self): | def __str__(self): | ||||
return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" | return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" | ||||
TValue = TypeVar("TValue", bound=ValueProtocol) | |||||
class KafkaJournalWriter(Generic[TValue]): | class KafkaJournalWriter(Generic[TValue]): | ||||
"""This class is used to write serialized versions of value objects to a | """This class is used to write serialized versions of value objects to a | ||||
series of Kafka topics. The type parameter `TValue`, which must implement the | series of Kafka topics. The type parameter `TValue`, which must implement the | ||||
`ValueProtocol`, is the type of values this writer will write. | `ValueProtocol`, is the type of values this writer will write. | ||||
Typically, `TValue` will be `swh.model.model.BaseModel`. | Typically, `TValue` will be `swh.model.model.BaseModel`. | ||||
Topics used to send objects representations are built from a ``prefix`` plus the | Topics used to send objects representations are built from a ``prefix`` plus the | ||||
type of the object: | type of the object: | ||||
▲ Show 20 Lines • Show All 138 Lines • ▼ Show 20 Lines | def delivery_error(self, message) -> KafkaDeliveryError: | ||||
orig_key, | orig_key, | ||||
"No delivery before flush() timeout", | "No delivery before flush() timeout", | ||||
"SWH_FLUSH_TIMEOUT", | "SWH_FLUSH_TIMEOUT", | ||||
) | ) | ||||
) | ) | ||||
return KafkaDeliveryError(message, ret) | return KafkaDeliveryError(message, ret) | ||||
def flush(self): | def flush(self) -> None: | ||||
start = time.monotonic() | start = time.monotonic() | ||||
self.producer.flush(self.flush_timeout) | self.producer.flush(self.flush_timeout) | ||||
while self.deliveries_pending: | while self.deliveries_pending: | ||||
if time.monotonic() - start > self.flush_timeout: | if time.monotonic() - start > self.flush_timeout: | ||||
break | break | ||||
self.producer.poll(0.1) | self.producer.poll(0.1) | ||||
Show All 26 Lines | def _write_addition(self, object_type: str, object_: TValue) -> None: | ||||
logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) | logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) | ||||
self.send(topic, key=key, value=dict_) | self.send(topic, key=key, value=dict_) | ||||
def write_addition(self, object_type: str, object_: TValue) -> None: | def write_addition(self, object_type: str, object_: TValue) -> None: | ||||
"""Write a single object to the journal""" | """Write a single object to the journal""" | ||||
self._write_addition(object_type, object_) | self._write_addition(object_type, object_) | ||||
self.flush() | self.flush() | ||||
write_update = write_addition | |||||
def write_additions(self, object_type: str, objects: Iterable[TValue]) -> None: | def write_additions(self, object_type: str, objects: Iterable[TValue]) -> None: | ||||
"""Write a set of objects to the journal""" | """Write a set of objects to the journal""" | ||||
for object_ in objects: | for object_ in objects: | ||||
self._write_addition(object_type, object_) | self._write_addition(object_type, object_) | ||||
self.flush() | self.flush() |