Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
Show First 20 Lines • Show All 86 Lines • ▼ Show 20 Lines | class JournalClient: | ||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
brokers: Union[str, List[str]], | brokers: Union[str, List[str]], | ||||
group_id: str, | group_id: str, | ||||
prefix: Optional[str] = None, | prefix: Optional[str] = None, | ||||
object_types: Optional[List[str]] = None, | object_types: Optional[List[str]] = None, | ||||
privileged: bool = False, | |||||
stop_after_objects: Optional[int] = None, | stop_after_objects: Optional[int] = None, | ||||
batch_size: int = 200, | batch_size: int = 200, | ||||
process_timeout: Optional[float] = None, | process_timeout: Optional[float] = None, | ||||
auto_offset_reset: str = "earliest", | auto_offset_reset: str = "earliest", | ||||
stop_on_eof: bool = False, | stop_on_eof: bool = False, | ||||
**kwargs, | **kwargs, | ||||
): | ): | ||||
if prefix is None: | if prefix is None: | ||||
▲ Show 20 Lines • Show All 56 Lines • ▼ Show 20 Lines | ): | ||||
self.stop_on_eof = stop_on_eof | self.stop_on_eof = stop_on_eof | ||||
if self.stop_on_eof: | if self.stop_on_eof: | ||||
consumer_settings["enable.partition.eof"] = True | consumer_settings["enable.partition.eof"] = True | ||||
logger.debug("Consumer settings: %s", consumer_settings) | logger.debug("Consumer settings: %s", consumer_settings) | ||||
self.consumer = Consumer(consumer_settings) | self.consumer = Consumer(consumer_settings) | ||||
if privileged: | |||||
existing_topics = self.consumer.list_topics(timeout=10).topics.keys() | privileged_prefix = f"{prefix}_privileged" | ||||
if not any(topic.startswith(f"{prefix}.") for topic in existing_topics): | else: # do not attempt to subscribe to privileged topics | ||||
privileged_prefix = f"{prefix}" | |||||
existing_topics = [ | |||||
topic | |||||
for topic in self.consumer.list_topics(timeout=10).topics.keys() | |||||
if ( | |||||
topic.startswith(f"{prefix}.") | |||||
or topic.startswith(f"{privileged_prefix}.") | |||||
) | |||||
] | |||||
if not existing_topics: | |||||
raise ValueError( | raise ValueError( | ||||
f"The prefix {prefix} does not match any existing topic " | f"The prefix {prefix} does not match any existing topic " | ||||
"on the kafka broker" | "on the kafka broker" | ||||
) | ) | ||||
if object_types: | if not object_types: | ||||
unknown_topics = [] | object_types = list({topic.split(".")[-1] for topic in existing_topics}) | ||||
self.subscription = [] | |||||
unknown_types = [] | |||||
for object_type in object_types: | for object_type in object_types: | ||||
topic = f"{prefix}.{object_type}" | topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}") | ||||
if topic not in existing_topics: | for topic in topics: | ||||
unknown_topics.append(topic) | if topic in existing_topics: | ||||
if unknown_topics: | self.subscription.append(topic) | ||||
break | |||||
else: | |||||
unknown_types.append(object_type) | |||||
if unknown_types: | |||||
raise ValueError( | raise ValueError( | ||||
f"Topic(s) {','.join(unknown_topics)} " | f"Topic(s) for object types {','.join(unknown_types)} " | ||||
"are unknown on the kafka broker" | "are unknown on the kafka broker" | ||||
) | ) | ||||
self.subscription = [ | |||||
f"{prefix}.{object_type}" for object_type in object_types | |||||
] | |||||
else: | |||||
# subscribe to every topic under the prefix | |||||
self.subscription = [ | |||||
topic for topic in existing_topics if topic.startswith(prefix) | |||||
] | |||||
logger.debug(f"Upstream topics: {existing_topics}") | logger.debug(f"Upstream topics: {existing_topics}") | ||||
self.subscribe() | self.subscribe() | ||||
self.stop_after_objects = stop_after_objects | self.stop_after_objects = stop_after_objects | ||||
self.process_timeout = process_timeout | self.process_timeout = process_timeout | ||||
self.eof_reached: Set[Tuple[str, str]] = set() | self.eof_reached: Set[Tuple[str, str]] = set() | ||||
self.batch_size = batch_size | self.batch_size = batch_size | ||||
▲ Show 20 Lines • Show All 94 Lines • Show Last 20 Lines |