Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
# Copyright (C) 2017 The Software Heritage developers | # Copyright (C) 2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from kafka import KafkaConsumer | from kafka import KafkaConsumer | ||||
import logging | import logging | ||||
from .serializers import kafka_to_key, kafka_to_value | from .serializers import kafka_to_key, kafka_to_value | ||||
from swh.journal import DEFAULT_PREFIX | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
# Only accepted offset reset policy accepted | # Only accepted offset reset policy accepted | ||||
ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] | ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] | ||||
# Only accepted object types | # Only accepted object types | ||||
Show All 22 Lines | class JournalClient: | ||||
value across instances. The journal will share the message | value across instances. The journal will share the message | ||||
throughput across the nodes sharing the same client_id. | throughput across the nodes sharing the same client_id. | ||||
Messages are processed by the `process_objects` method in batches | Messages are processed by the `process_objects` method in batches | ||||
of maximum `max_messages`. | of maximum `max_messages`. | ||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, brokers, topic_prefix, consumer_id, | self, brokers, group_id, prefix=DEFAULT_PREFIX, | ||||
object_types=ACCEPTED_OBJECT_TYPES, | object_types=ACCEPTED_OBJECT_TYPES, | ||||
max_messages=0, auto_offset_reset='earliest'): | max_messages=0, auto_offset_reset='earliest'): | ||||
if auto_offset_reset not in ACCEPTED_OFFSET_RESET: | if auto_offset_reset not in ACCEPTED_OFFSET_RESET: | ||||
raise ValueError( | raise ValueError( | ||||
'Option \'auto_offset_reset\' only accept %s.' % | 'Option \'auto_offset_reset\' only accept %s.' % | ||||
ACCEPTED_OFFSET_RESET) | ACCEPTED_OFFSET_RESET) | ||||
for object_type in object_types: | for object_type in object_types: | ||||
if object_type not in ACCEPTED_OBJECT_TYPES: | if object_type not in ACCEPTED_OBJECT_TYPES: | ||||
raise ValueError( | raise ValueError( | ||||
'Option \'object_types\' only accepts %s.' % | 'Option \'object_types\' only accepts %s.' % | ||||
ACCEPTED_OFFSET_RESET) | ACCEPTED_OFFSET_RESET) | ||||
self.consumer = KafkaConsumer( | self.consumer = KafkaConsumer( | ||||
bootstrap_servers=brokers, | bootstrap_servers=brokers, | ||||
key_deserializer=kafka_to_key, | key_deserializer=kafka_to_key, | ||||
value_deserializer=kafka_to_value, | value_deserializer=kafka_to_value, | ||||
auto_offset_reset=auto_offset_reset, | auto_offset_reset=auto_offset_reset, | ||||
enable_auto_commit=False, | enable_auto_commit=False, | ||||
group_id=consumer_id, | group_id=group_id, | ||||
) | ) | ||||
self.consumer.subscribe( | self.consumer.subscribe( | ||||
topics=['%s.%s' % (topic_prefix, object_type) | topics=['%s.%s' % (prefix, object_type) | ||||
for object_type in object_types], | for object_type in object_types], | ||||
) | ) | ||||
self.max_messages = max_messages | self.max_messages = max_messages | ||||
self._object_types = object_types | self._object_types = object_types | ||||
def poll(self): | |||||
return self.consumer.poll() | |||||
def commit(self): | |||||
self.consumer.commit() | |||||
def process(self, worker_fn): | def process(self, worker_fn): | ||||
"""Polls Kafka for a batch of messages, and calls the worker_fn | """Polls Kafka for a batch of messages, and calls the worker_fn | ||||
with these messages. | with these messages. | ||||
Args: | Args: | ||||
worker_fn Callable[Dict[str, List[dict]]]: Function called with | worker_fn Callable[Dict[str, List[dict]]]: Function called with | ||||
the messages as | the messages as | ||||
argument. | argument. | ||||
""" | """ | ||||
nb_messages = 0 | nb_messages = 0 | ||||
polled = self.poll() | polled = self.consumer.poll() | ||||
for (partition, messages) in polled.items(): | for (partition, messages) in polled.items(): | ||||
object_type = partition.topic.split('.')[-1] | object_type = partition.topic.split('.')[-1] | ||||
# Got a message from a topic we did not subscribe to. | # Got a message from a topic we did not subscribe to. | ||||
assert object_type in self._object_types, object_type | assert object_type in self._object_types, object_type | ||||
worker_fn({object_type: [msg.value for msg in messages]}) | worker_fn({object_type: [msg.value for msg in messages]}) | ||||
nb_messages += len(messages) | nb_messages += len(messages) | ||||
self.commit() | self.consumer.commit() | ||||
return nb_messages | return nb_messages |