Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
Show First 20 Lines • Show All 168 Lines • ▼ Show 20 Lines | ): | ||||
self.stop_on_eof = stop_on_eof | self.stop_on_eof = stop_on_eof | ||||
if self.stop_on_eof: | if self.stop_on_eof: | ||||
consumer_settings["enable.partition.eof"] = True | consumer_settings["enable.partition.eof"] = True | ||||
logger.debug("Consumer settings: %s", consumer_settings) | logger.debug("Consumer settings: %s", consumer_settings) | ||||
self.consumer = Consumer(consumer_settings) | self.consumer = Consumer(consumer_settings) | ||||
topics = ["%s.%s" % (prefix, object_type) for object_type in object_types] | self.subscription = [ | ||||
"%s.%s" % (prefix, object_type) for object_type in object_types | |||||
logger.debug("Upstream topics: %s", self.consumer.list_topics(timeout=10)) | ] | ||||
logger.debug("Subscribing to: %s", topics) | self.subscribe() | ||||
self.consumer.subscribe(topics=topics) | |||||
self.stop_after_objects = stop_after_objects | self.stop_after_objects = stop_after_objects | ||||
self.process_timeout = process_timeout | self.process_timeout = process_timeout | ||||
self.eof_reached: Set[Tuple[str, str]] = set() | self.eof_reached: Set[Tuple[str, str]] = set() | ||||
self.batch_size = batch_size | self.batch_size = batch_size | ||||
self._object_types = object_types | self._object_types = object_types | ||||
def subscribe(self): | |||||
logger.debug("Upstream topics: %s", self.consumer.list_topics(timeout=10)) | |||||
logger.debug("Subscribing to: %s", self.subscription) | |||||
self.consumer.subscribe(topics=self.subscription) | |||||
def process(self, worker_fn): | def process(self, worker_fn): | ||||
"""Polls Kafka for a batch of messages, and calls the worker_fn | """Polls Kafka for a batch of messages, and calls the worker_fn | ||||
with these messages. | with these messages. | ||||
Args: | Args: | ||||
worker_fn Callable[Dict[str, List[dict]]]: Function called with | worker_fn Callable[Dict[str, List[dict]]]: Function called with | ||||
the messages as | the messages as | ||||
argument. | argument. | ||||
▲ Show 20 Lines • Show All 79 Lines • Show Last 20 Lines |