Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
Show All 24 Lines | ACCEPTED_OBJECT_TYPES = [ | ||||
'revision', | 'revision', | ||||
'release', | 'release', | ||||
'snapshot', | 'snapshot', | ||||
'origin', | 'origin', | ||||
'origin_visit' | 'origin_visit' | ||||
] | ] | ||||
def _error_cb(error): | |||||
if error.fatal(): | |||||
raise KafkaException(error) | |||||
logger.info('Received non-fatal kafka error: %s', error) | |||||
def _on_commit(error, partitions): | |||||
if error is not None: | |||||
_error_cb(error) | |||||
class JournalClient: | class JournalClient: | ||||
"""A base client for the Software Heritage journal. | """A base client for the Software Heritage journal. | ||||
The current implementation of the journal uses Apache Kafka | The current implementation of the journal uses Apache Kafka | ||||
brokers to publish messages under a given topic prefix, with each | brokers to publish messages under a given topic prefix, with each | ||||
object type using a specific topic under that prefix. If the 'prefix' | object type using a specific topic under that prefix. If the 'prefix' | ||||
argument is None (default value), it will take the default value | argument is None (default value), it will take the default value | ||||
'swh.journal.objects'. | 'swh.journal.objects'. | ||||
Show All 35 Lines | def __init__( | ||||
if isinstance(brokers, str): | if isinstance(brokers, str): | ||||
brokers = [brokers] | brokers = [brokers] | ||||
consumer_settings = { | consumer_settings = { | ||||
**kwargs, | **kwargs, | ||||
'bootstrap.servers': ','.join(brokers), | 'bootstrap.servers': ','.join(brokers), | ||||
'auto.offset.reset': auto_offset_reset, | 'auto.offset.reset': auto_offset_reset, | ||||
'group.id': group_id, | 'group.id': group_id, | ||||
'on_commit': self._on_commit, | 'on_commit': _on_commit, | ||||
'error_cb': self._error_cb, | 'error_cb': _error_cb, | ||||
'enable.auto.commit': False, | 'enable.auto.commit': False, | ||||
'logger': logger, | 'logger': logger, | ||||
} | } | ||||
logger.debug('Consumer settings: %s', consumer_settings) | logger.debug('Consumer settings: %s', consumer_settings) | ||||
self.consumer = Consumer(consumer_settings, logger=logger) | self.consumer = Consumer(consumer_settings, logger=logger) | ||||
topics = ['%s.%s' % (prefix, object_type) | topics = ['%s.%s' % (prefix, object_type) | ||||
for object_type in object_types] | for object_type in object_types] | ||||
logger.debug('Upstream topics: %s', | logger.debug('Upstream topics: %s', | ||||
self.consumer.list_topics(timeout=10)) | self.consumer.list_topics(timeout=10)) | ||||
logger.debug('Subscribing to: %s', topics) | logger.debug('Subscribing to: %s', topics) | ||||
self.consumer.subscribe(topics=topics) | self.consumer.subscribe(topics=topics) | ||||
self.max_messages = max_messages | self.max_messages = max_messages | ||||
self.process_timeout = process_timeout | self.process_timeout = process_timeout | ||||
self._object_types = object_types | self._object_types = object_types | ||||
def _error_cb(self, error): | |||||
if error.fatal(): | |||||
raise KafkaException(error) | |||||
logger.info('Received non-fatal kafka error: %s', error) | |||||
def _on_commit(self, error, partitions): | |||||
if error is not None: | |||||
self._error_cb(error) | |||||
def process(self, worker_fn): | def process(self, worker_fn): | ||||
"""Polls Kafka for a batch of messages, and calls the worker_fn | """Polls Kafka for a batch of messages, and calls the worker_fn | ||||
with these messages. | with these messages. | ||||
Args: | Args: | ||||
worker_fn Callable[Dict[str, List[dict]]]: Function called with | worker_fn Callable[Dict[str, List[dict]]]: Function called with | ||||
the messages as | the messages as | ||||
argument. | argument. | ||||
▲ Show 20 Lines • Show All 45 Lines • Show Last 20 Lines |