Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
Show First 20 Lines • Show All 159 Lines • ▼ Show 20 Lines | ): | ||||
self.stop_on_eof = stop_on_eof | self.stop_on_eof = stop_on_eof | ||||
if self.stop_on_eof: | if self.stop_on_eof: | ||||
consumer_settings["enable.partition.eof"] = True | consumer_settings["enable.partition.eof"] = True | ||||
logger.debug("Consumer settings: %s", consumer_settings) | logger.debug("Consumer settings: %s", consumer_settings) | ||||
self.consumer = Consumer(consumer_settings) | self.consumer = Consumer(consumer_settings) | ||||
existing_topics = self.consumer.list_topics(timeout=10).topics.keys() | privileged_prefix = f"{prefix}_privileged" | ||||
if not any(topic.startswith(f"{prefix}.") for topic in existing_topics): | existing_topics = [ | ||||
topic | |||||
for topic in self.consumer.list_topics(timeout=10).topics.keys() | |||||
if ( | |||||
topic.startswith(f"{prefix}.") | |||||
or topic.startswith(f"{privileged_prefix}.") | |||||
) | |||||
] | |||||
if not existing_topics: | |||||
raise ValueError( | raise ValueError( | ||||
f"The prefix {prefix} does not match any existing topic " | f"The prefix {prefix} does not match any existing topic " | ||||
"on the kafka broker" | "on the kafka broker" | ||||
) | ) | ||||
if object_types: | if not object_types: | ||||
unknown_topics = [] | object_types = list({topic.split(".")[-1] for topic in existing_topics}) | ||||
self.subscription = [] | |||||
unknown_types = [] | |||||
for object_type in object_types: | for object_type in object_types: | ||||
topic = f"{prefix}.{object_type}" | topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}") | ||||
if topic not in existing_topics: | for topic in topics: | ||||
unknown_topics.append(topic) | if topic in existing_topics: | ||||
if unknown_topics: | self.subscription.append(topic) | ||||
break | |||||
else: | |||||
unknown_types.append(object_type) | |||||
if unknown_types: | |||||
raise ValueError( | raise ValueError( | ||||
f"Topic(s) {','.join(unknown_topics)} " | f"Topic(s) for object types {','.join(unknown_types)} " | ||||
"are unknown on the kafka broker" | "are unknown on the kafka broker" | ||||
) | ) | ||||
self.subscription = [ | |||||
f"{prefix}.{object_type}" for object_type in object_types | |||||
] | |||||
else: | |||||
# subscribe to every topic under the prefix | |||||
self.subscription = [ | |||||
topic for topic in existing_topics if topic.startswith(prefix) | |||||
] | |||||
logger.debug(f"Upstream topics: {existing_topics}") | logger.debug(f"Upstream topics: {existing_topics}") | ||||
self.subscribe() | self.subscribe() | ||||
self.stop_after_objects = stop_after_objects | self.stop_after_objects = stop_after_objects | ||||
self.process_timeout = process_timeout | self.process_timeout = process_timeout | ||||
self.eof_reached: Set[Tuple[str, str]] = set() | self.eof_reached: Set[Tuple[str, str]] = set() | ||||
self.batch_size = batch_size | self.batch_size = batch_size | ||||
▲ Show 20 Lines • Show All 94 Lines • Show Last 20 Lines |