Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
Show First 20 Lines • Show All 64 Lines • ▼ Show 20 Lines | class JournalClient: | ||||
Clients subscribe to events specific to each object type as listed in the | Clients subscribe to events specific to each object type as listed in the | ||||
`object_types` argument (if unset, defaults to all accepted object types). | `object_types` argument (if unset, defaults to all accepted object types). | ||||
Clients can be sharded by setting the `group_id` to a common | Clients can be sharded by setting the `group_id` to a common | ||||
value across instances. The journal will share the message | value across instances. The journal will share the message | ||||
throughput across the nodes sharing the same group_id. | throughput across the nodes sharing the same group_id. | ||||
Messages are processed by the `worker_fn` callback passed to the `process` | Messages are processed by the `worker_fn` callback passed to the `process` | ||||
method, in batches of maximum 20 messages (currently hardcoded). If set, | method, in batches of maximum `batch_size` messages (defaults to 200). | ||||
the processing stops after processing `stop_after_objects` messages in | |||||
total. | If set, the processing stops after processing `stop_after_objects` messages | ||||
in total. | |||||
`stop_on_eof` stops the processing when the client has reached the end of | `stop_on_eof` stops the processing when the client has reached the end of | ||||
each partition in turn. | each partition in turn. | ||||
`auto_offset_reset` sets the behavior of the client when the consumer group | `auto_offset_reset` sets the behavior of the client when the consumer group | ||||
initializes: `'earliest'` (the default) processes all objects since the | initializes: `'earliest'` (the default) processes all objects since the | ||||
inception of the topics; `''` | inception of the topics; `''` | ||||
Any other named argument is passed directly to KafkaConsumer(). | Any other named argument is passed directly to KafkaConsumer(). | ||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
brokers: Union[str, List[str]], | brokers: Union[str, List[str]], | ||||
group_id: str, | group_id: str, | ||||
prefix: Optional[str] = None, | prefix: Optional[str] = None, | ||||
object_types: Optional[List[str]] = None, | object_types: Optional[List[str]] = None, | ||||
stop_after_objects: Optional[int] = None, | stop_after_objects: Optional[int] = None, | ||||
batch_size: int = 200, | |||||
process_timeout: Optional[float] = None, | process_timeout: Optional[float] = None, | ||||
auto_offset_reset: str = 'earliest', | auto_offset_reset: str = 'earliest', | ||||
stop_on_eof: bool = False, | stop_on_eof: bool = False, | ||||
**kwargs | **kwargs | ||||
): | ): | ||||
if prefix is None: | if prefix is None: | ||||
prefix = DEFAULT_PREFIX | prefix = DEFAULT_PREFIX | ||||
if object_types is None: | if object_types is None: | ||||
object_types = ACCEPTED_OBJECT_TYPES | object_types = ACCEPTED_OBJECT_TYPES | ||||
if auto_offset_reset not in ACCEPTED_OFFSET_RESET: | if auto_offset_reset not in ACCEPTED_OFFSET_RESET: | ||||
raise ValueError( | raise ValueError( | ||||
'Option \'auto_offset_reset\' only accept %s, not %s' % | 'Option \'auto_offset_reset\' only accept %s, not %s' % | ||||
(ACCEPTED_OFFSET_RESET, auto_offset_reset)) | (ACCEPTED_OFFSET_RESET, auto_offset_reset)) | ||||
for object_type in object_types: | for object_type in object_types: | ||||
if object_type not in ACCEPTED_OBJECT_TYPES: | if object_type not in ACCEPTED_OBJECT_TYPES: | ||||
raise ValueError( | raise ValueError( | ||||
'Option \'object_types\' only accepts %s, not %s.' % | 'Option \'object_types\' only accepts %s, not %s.' % | ||||
(ACCEPTED_OBJECT_TYPES, object_type)) | (ACCEPTED_OBJECT_TYPES, object_type)) | ||||
if batch_size <= 0: | |||||
raise ValueError("Option 'batch_size' needs to be positive") | |||||
self.value_deserializer = kafka_to_value | self.value_deserializer = kafka_to_value | ||||
if isinstance(brokers, str): | if isinstance(brokers, str): | ||||
brokers = [brokers] | brokers = [brokers] | ||||
debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) | debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) | ||||
if debug_logging and 'debug' not in kwargs: | if debug_logging and 'debug' not in kwargs: | ||||
kwargs['debug'] = 'consumer' | kwargs['debug'] = 'consumer' | ||||
▲ Show 20 Lines • Show All 53 Lines • ▼ Show 20 Lines | ): | ||||
self.consumer.list_topics(timeout=10)) | self.consumer.list_topics(timeout=10)) | ||||
logger.debug('Subscribing to: %s', topics) | logger.debug('Subscribing to: %s', topics) | ||||
self.consumer.subscribe(topics=topics) | self.consumer.subscribe(topics=topics) | ||||
self.stop_after_objects = stop_after_objects | self.stop_after_objects = stop_after_objects | ||||
self.process_timeout = process_timeout | self.process_timeout = process_timeout | ||||
self.eof_reached: Set[Tuple[str, str]] = set() | self.eof_reached: Set[Tuple[str, str]] = set() | ||||
self.batch_size = batch_size | |||||
self._object_types = object_types | self._object_types = object_types | ||||
def process(self, worker_fn): | def process(self, worker_fn): | ||||
"""Polls Kafka for a batch of messages, and calls the worker_fn | """Polls Kafka for a batch of messages, and calls the worker_fn | ||||
with these messages. | with these messages. | ||||
Args: | Args: | ||||
Show All 16 Lines | def process(self, worker_fn): | ||||
# time, it would then be called with a timeout in the order | # time, it would then be called with a timeout in the order | ||||
# of milliseconds, therefore returning immediately, then be | # of milliseconds, therefore returning immediately, then be | ||||
# called again, etc. | # called again, etc. | ||||
if elapsed + 0.01 >= self.process_timeout: | if elapsed + 0.01 >= self.process_timeout: | ||||
break | break | ||||
timeout = self.process_timeout - elapsed | timeout = self.process_timeout - elapsed | ||||
batch_size = 20 | batch_size = self.batch_size | ||||
if self.stop_after_objects: | if self.stop_after_objects: | ||||
if total_objects_processed >= self.stop_after_objects: | if total_objects_processed >= self.stop_after_objects: | ||||
break | break | ||||
# clamp batch size to avoid overrunning stop_after_objects | # clamp batch size to avoid overrunning stop_after_objects | ||||
batch_size = min( | batch_size = min( | ||||
self.stop_after_objects-total_objects_processed, | self.stop_after_objects-total_objects_processed, | ||||
batch_size, | batch_size, | ||||
▲ Show 20 Lines • Show All 53 Lines • Show Last 20 Lines |