Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/writer/kafka.py
Show All 13 Lines | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
class KafkaJournalWriter: | class KafkaJournalWriter: | ||||
"""This class is instantiated and used by swh-storage to write incoming | """This class is instantiated and used by swh-storage to write incoming | ||||
new objects to Kafka before adding them to the storage backend | new objects to Kafka before adding them to the storage backend | ||||
(eg. postgresql) itself.""" | (eg. postgresql) itself.""" | ||||
def __init__(self, brokers, prefix, client_id): | def __init__(self, brokers, prefix, client_id, producer_config=None): | ||||
self._prefix = prefix | self._prefix = prefix | ||||
if isinstance(brokers, str): | if isinstance(brokers, str): | ||||
brokers = [brokers] | brokers = [brokers] | ||||
if not producer_config: | |||||
producer_config = {} | |||||
self.producer = Producer({ | self.producer = Producer({ | ||||
'bootstrap.servers': ','.join(brokers), | 'bootstrap.servers': ','.join(brokers), | ||||
'client.id': client_id, | 'client.id': client_id, | ||||
'on_delivery': self._on_delivery, | 'on_delivery': self._on_delivery, | ||||
'error_cb': self._error_cb, | 'error_cb': self._error_cb, | ||||
'logger': logger, | 'logger': logger, | ||||
'enable.idempotence': 'true', | 'enable.idempotence': 'true', | ||||
**producer_config, | |||||
}) | }) | ||||
def _error_cb(self, error): | def _error_cb(self, error): | ||||
if error.fatal(): | if error.fatal(): | ||||
raise KafkaException(error) | raise KafkaException(error) | ||||
logger.info('Received non-fatal kafka error: %s', error) | logger.info('Received non-fatal kafka error: %s', error) | ||||
def _on_delivery(self, error, message): | def _on_delivery(self, error, message): | ||||
▲ Show 20 Lines • Show All 68 Lines • Show Last 20 Lines |