Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/writer/kafka.py
Show First 20 Lines • Show All 154 Lines • ▼ Show 20 Lines | def _on_delivery(self, error, message): | ||||
if error is not None: | if error is not None: | ||||
self.delivery_failures.append( | self.delivery_failures.append( | ||||
DeliveryFailureInfo( | DeliveryFailureInfo( | ||||
get_object_type(topic), sent_key, error.str(), error.name() | get_object_type(topic), sent_key, error.str(), error.name() | ||||
) | ) | ||||
) | ) | ||||
def send(self, topic: str, key: KeyType, value): | def reliable_produce(self, topic: str, key: KeyType, kafka_value: Optional[bytes]): | ||||
kafka_key = key_to_kafka(key) | kafka_key = key_to_kafka(key) | ||||
max_attempts = 5 | max_attempts = 5 | ||||
last_exception: Optional[Exception] = None | last_exception: Optional[Exception] = None | ||||
for attempt in range(max_attempts): | for attempt in range(max_attempts): | ||||
try: | try: | ||||
self.producer.produce( | self.producer.produce( | ||||
topic=topic, | topic=topic, | ||||
key=kafka_key, | key=kafka_key, | ||||
value=value_to_kafka(value), | value=kafka_value, | ||||
) | ) | ||||
except BufferError as e: | except BufferError as e: | ||||
last_exception = e | last_exception = e | ||||
wait = 1 + 3 * attempt | wait = 1 + 3 * attempt | ||||
if logger.isEnabledFor(logging.DEBUG): # pprint_key is expensive | if logger.isEnabledFor(logging.DEBUG): # pprint_key is expensive | ||||
logger.debug( | logger.debug( | ||||
"BufferError producing %s %s; waiting for %ss", | "BufferError producing %s %s; waiting for %ss", | ||||
get_object_type(topic), | get_object_type(topic), | ||||
pprint_key(kafka_key), | pprint_key(kafka_key), | ||||
wait, | wait, | ||||
) | ) | ||||
self.producer.poll(wait) | self.producer.poll(wait) | ||||
else: | else: | ||||
self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key | self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key | ||||
return | return | ||||
# We reach this point if all delivery attempts have failed | # We reach this point if all delivery attempts have failed | ||||
self.delivery_failures.append( | self.delivery_failures.append( | ||||
DeliveryFailureInfo( | DeliveryFailureInfo( | ||||
get_object_type(topic), key, str(last_exception), "SWH_BUFFER_ERROR" | get_object_type(topic), key, str(last_exception), "SWH_BUFFER_ERROR" | ||||
) | ) | ||||
) | ) | ||||
def send(self, topic: str, key: KeyType, value): | |||||
kafka_value = value_to_kafka(value) | |||||
return self.reliable_produce(topic, key, kafka_value) | |||||
def delivery_error(self, message) -> KafkaDeliveryError: | def delivery_error(self, message) -> KafkaDeliveryError: | ||||
"""Get all failed deliveries, and clear them""" | """Get all failed deliveries, and clear them""" | ||||
ret = self.delivery_failures | ret = self.delivery_failures | ||||
self.delivery_failures = [] | self.delivery_failures = [] | ||||
while self.deliveries_pending: | while self.deliveries_pending: | ||||
delivery_tag, orig_key = self.deliveries_pending.popitem() | delivery_tag, orig_key = self.deliveries_pending.popitem() | ||||
(topic, kafka_key) = delivery_tag | (topic, kafka_key) = delivery_tag | ||||
▲ Show 20 Lines • Show All 56 Lines • ▼ Show 20 Lines | def write_additions( | ||||
self, object_type: str, objects: Iterable[ValueProtocol] | self, object_type: str, objects: Iterable[ValueProtocol] | ||||
) -> None: | ) -> None: | ||||
"""Write a set of objects to the journal""" | """Write a set of objects to the journal""" | ||||
for object_ in objects: | for object_ in objects: | ||||
self._write_addition(object_type, object_) | self._write_addition(object_type, object_) | ||||
if self.auto_flush: | if self.auto_flush: | ||||
self.flush() | self.flush() | ||||
def delete(self, object_type: str, object_keys: Iterable[KeyType]) -> None: | |||||
topic = f"{self._prefix}.{object_type}" | |||||
for key in object_keys: | |||||
self.reliable_produce(topic, key, None) |