Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
# Copyright (C) 2017 The Software Heritage developers | # Copyright (C) 2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import logging | import logging | ||||
import time | import time | ||||
from confluent_kafka import Consumer, KafkaException | from confluent_kafka import Consumer, KafkaException, KafkaError | ||||
from .serializers import kafka_to_value | from .serializers import kafka_to_value | ||||
from swh.journal import DEFAULT_PREFIX | from swh.journal import DEFAULT_PREFIX | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
rdkafka_logger = logging.getLogger(__name__ + '.rdkafka') | rdkafka_logger = logging.getLogger(__name__ + '.rdkafka') | ||||
Show All 10 Lines | ACCEPTED_OBJECT_TYPES = [ | ||||
'origin', | 'origin', | ||||
'origin_visit' | 'origin_visit' | ||||
] | ] | ||||
def _error_cb(error): | def _error_cb(error): | ||||
if error.fatal(): | if error.fatal(): | ||||
raise KafkaException(error) | raise KafkaException(error) | ||||
if error.code() == KafkaError._NO_OFFSET: | |||||
return # this "error" is useless and too spammy | |||||
logger.info('Received non-fatal kafka error: %s', error) | logger.info('Received non-fatal kafka error: %s', error) | ||||
def _on_commit(error, partitions): | def _on_commit(error, partitions): | ||||
if error is not None: | if error is not None: | ||||
_error_cb(error) | _error_cb(error) | ||||
▲ Show 20 Lines • Show All 143 Lines • Show Last 20 Lines |