Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
# Copyright (C) 2017 The Software Heritage developers | # Copyright (C) 2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import logging | import logging | ||||
import time | import time | ||||
from confluent_kafka import Consumer, KafkaException | from confluent_kafka import Consumer, KafkaException | ||||
from .serializers import kafka_to_value | from .serializers import kafka_to_value | ||||
from swh.journal import DEFAULT_PREFIX | from swh.journal import DEFAULT_PREFIX | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
rdkafka_logger = logging.getLogger(__name__ + '.rdkafka') | |||||
# Only accepted offset reset policy accepted | # Only accepted offset reset policy accepted | ||||
ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] | ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] | ||||
# Only accepted object types | # Only accepted object types | ||||
ACCEPTED_OBJECT_TYPES = [ | ACCEPTED_OBJECT_TYPES = [ | ||||
'content', | 'content', | ||||
▲ Show 20 Lines • Show All 66 Lines • ▼ Show 20 Lines | def __init__( | ||||
consumer_settings = { | consumer_settings = { | ||||
**kwargs, | **kwargs, | ||||
'bootstrap.servers': ','.join(brokers), | 'bootstrap.servers': ','.join(brokers), | ||||
'auto.offset.reset': auto_offset_reset, | 'auto.offset.reset': auto_offset_reset, | ||||
'group.id': group_id, | 'group.id': group_id, | ||||
'on_commit': _on_commit, | 'on_commit': _on_commit, | ||||
'error_cb': _error_cb, | 'error_cb': _error_cb, | ||||
'enable.auto.commit': False, | 'enable.auto.commit': False, | ||||
'logger': logger, | 'logger': rdkafka_logger, | ||||
} | } | ||||
logger.debug('Consumer settings: %s', consumer_settings) | logger.debug('Consumer settings: %s', consumer_settings) | ||||
self.consumer = Consumer(consumer_settings, logger=logger) | self.consumer = Consumer(consumer_settings) | ||||
topics = ['%s.%s' % (prefix, object_type) | topics = ['%s.%s' % (prefix, object_type) | ||||
for object_type in object_types] | for object_type in object_types] | ||||
logger.debug('Upstream topics: %s', | logger.debug('Upstream topics: %s', | ||||
self.consumer.list_topics(timeout=10)) | self.consumer.list_topics(timeout=10)) | ||||
logger.debug('Subscribing to: %s', topics) | logger.debug('Subscribing to: %s', topics) | ||||
▲ Show 20 Lines • Show All 71 Lines • Show Last 20 Lines |