Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
Show All 33 Lines | |||||
# Errors that Kafka raises too often and are not useful; therefore they | # Errors that Kafka raises too often and are not useful; therefore they | ||||
# we lower their log level to DEBUG instead of INFO. | # we lower their log level to DEBUG instead of INFO. | ||||
_SPAMMY_ERRORS = [ | _SPAMMY_ERRORS = [ | ||||
KafkaError._NO_OFFSET, | KafkaError._NO_OFFSET, | ||||
] | ] | ||||
def get_journal_client(cfg, **kwargs): | |||||
conf = cfg.get("journal", {}) | |||||
conf.update({k: v for (k, v) in kwargs.items() if v not in (None, ())}) | |||||
if not conf.get("brokers"): | |||||
raise ValueError("You must specify at least one kafka broker.") | |||||
if not isinstance(conf["brokers"], (list, tuple)): | |||||
conf["brokers"] = [conf["brokers"]] | |||||
return JournalClient(**conf) | |||||
def _error_cb(error): | def _error_cb(error): | ||||
if error.fatal(): | if error.fatal(): | ||||
raise KafkaException(error) | raise KafkaException(error) | ||||
if error.code() in _SPAMMY_ERRORS: | if error.code() in _SPAMMY_ERRORS: | ||||
logger.debug("Received non-fatal kafka error: %s", error) | logger.debug("Received non-fatal kafka error: %s", error) | ||||
else: | else: | ||||
logger.info("Received non-fatal kafka error: %s", error) | logger.info("Received non-fatal kafka error: %s", error) | ||||
▲ Show 20 Lines • Show All 232 Lines • Show Last 20 Lines |