diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging from typing import Iterable from confluent_kafka import Consumer, Producer @@ -170,3 +171,64 @@ delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_FLUSH_TIMEOUT" + + +class MockBufferErrorProducer(Producer): + """A Kafka producer that returns a BufferError on the `n_buffererrors` + first calls to produce.""" + + def __init__(self, *args, **kwargs): + self.n_buffererrors = kwargs.pop("n_bufferrors", 0) + self.produce_calls = 0 + + super().__init__(*args, **kwargs) + + def produce(self, **kwargs): + self.produce_calls += 1 + if self.produce_calls <= self.n_buffererrors: + raise BufferError("Local: Queue full") + + self.produce_calls = 0 + return super().produce(**kwargs) + + +def test_write_BufferError_retry(kafka_prefix: str, kafka_server: str, caplog): + writer = KafkaJournalWriter[BaseModel]( + brokers=[kafka_server], + client_id="kafka_writer", + prefix=kafka_prefix, + value_sanitizer=model_object_dict_sanitizer, + flush_timeout=1, + producer_class=MockBufferErrorProducer, + ) + + writer.producer.n_buffererrors = 4 + + empty_dir = Directory(entries=()) + + caplog.set_level(logging.DEBUG, "swh.journal.writer.kafka") + writer.write_addition("directory", empty_dir) + records = [] + for record in caplog.records: + if "BufferError" in record.getMessage(): + records.append(record) + + assert len(records) == writer.producer.n_buffererrors + + +def test_write_BufferError_give_up(kafka_prefix: str, kafka_server: str, caplog): + writer = KafkaJournalWriter[BaseModel]( + brokers=[kafka_server], + client_id="kafka_writer", + prefix=kafka_prefix, + value_sanitizer=model_object_dict_sanitizer, + flush_timeout=1, + producer_class=MockBufferErrorProducer, + ) + + writer.producer.n_buffererrors = 5 + + empty_dir = Directory(entries=()) + + with pytest.raises(KafkaDeliveryError): + writer.write_addition("directory", empty_dir) diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -171,12 +171,36 @@ def send(self, topic: str, key: KeyType, value): kafka_key = key_to_kafka(key) - self.producer.produce( - topic=topic, key=kafka_key, value=value_to_kafka(value), + max_attempts = 5 + last_exception: Optional[Exception] = None + for attempt in range(max_attempts): + try: + self.producer.produce( + topic=topic, key=kafka_key, value=value_to_kafka(value), + ) + except BufferError as e: + last_exception = e + wait = 1 + 3 * attempt + + if logger.isEnabledFor(logging.DEBUG): # pprint_key is expensive + logger.debug( + "BufferError producing %s %s; waiting for %ss", + get_object_type(topic), + pprint_key(kafka_key), + wait, + ) + self.producer.poll(wait) + else: + self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key + return + + # We reach this point if all delivery attempts have failed + self.delivery_failures.append( + DeliveryFailureInfo( + get_object_type(topic), key, str(last_exception), "SWH_BUFFER_ERROR" + ) ) - self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key - def delivery_error(self, message) -> KafkaDeliveryError: """Get all failed deliveries, and clear them""" ret = self.delivery_failures