Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
Show First 20 Lines • Show All 199 Lines • ▼ Show 20 Lines | ): | ||||
"enable.auto.commit": False, | "enable.auto.commit": False, | ||||
"logger": rdkafka_logger, | "logger": rdkafka_logger, | ||||
} | } | ||||
self.stop_on_eof = stop_on_eof | self.stop_on_eof = stop_on_eof | ||||
if self.stop_on_eof: | if self.stop_on_eof: | ||||
consumer_settings["enable.partition.eof"] = True | consumer_settings["enable.partition.eof"] = True | ||||
logger.debug("Consumer settings: %s", consumer_settings) | if logger.isEnabledFor(logging.DEBUG): | ||||
filtered_keys = {"sasl.password"} | |||||
logger.debug("Consumer settings:") | |||||
for k, v in consumer_settings.items(): | |||||
if k in filtered_keys: | |||||
v = "**filtered**" | |||||
logger.debug(" %s: %s", k, v) | |||||
self.consumer = Consumer(consumer_settings) | self.consumer = Consumer(consumer_settings) | ||||
if privileged: | if privileged: | ||||
privileged_prefix = f"{prefix}_privileged" | privileged_prefix = f"{prefix}_privileged" | ||||
else: # do not attempt to subscribe to privileged topics | else: # do not attempt to subscribe to privileged topics | ||||
privileged_prefix = f"{prefix}" | privileged_prefix = f"{prefix}" | ||||
existing_topics = [ | existing_topics = [ | ||||
topic | topic | ||||
▲ Show 20 Lines • Show All 154 Lines • Show Last 20 Lines |