Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/client.py
# Copyright (C) 2017 The Software Heritage developers | # Copyright (C) 2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from importlib import import_module | |||||
import logging | import logging | ||||
import os | import os | ||||
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union | from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union | ||||
from confluent_kafka import Consumer, KafkaError, KafkaException | from confluent_kafka import Consumer, KafkaError, KafkaException | ||||
from swh.journal import DEFAULT_PREFIX | from swh.journal import DEFAULT_PREFIX | ||||
Show All 14 Lines | |||||
def get_journal_client(cls: str, **kwargs: Any): | def get_journal_client(cls: str, **kwargs: Any): | ||||
"""Factory function to instantiate a journal client object. | """Factory function to instantiate a journal client object. | ||||
Currently, only the "kafka" journal client is supported. | Currently, only the "kafka" journal client is supported. | ||||
""" | """ | ||||
if cls == "kafka": | if cls == "kafka": | ||||
if "stats_cb" in kwargs: | |||||
stats_cb = kwargs["stats_cb"] | |||||
if isinstance(stats_cb, str): | |||||
try: | |||||
module_path, func_name = stats_cb.split(":") | |||||
except ValueError: | |||||
raise ValueError( | |||||
"Invalid stats_cb configuration option: " | |||||
"it should be a string like 'path.to.module:function'" | |||||
) | |||||
try: | |||||
module = import_module(module_path, package=__package__) | |||||
except ModuleNotFoundError: | |||||
raise ValueError( | |||||
"Invalid stats_cb configuration option: " | |||||
f"module {module_path} not found" | |||||
) | |||||
try: | |||||
kwargs["stats_cb"] = getattr(module, func_name) | |||||
except AttributeError: | |||||
raise ValueError( | |||||
"Invalid stats_cb configuration option: " | |||||
f"function {func_name} not found in module {module_path}" | |||||
) | |||||
return JournalClient(**kwargs) | return JournalClient(**kwargs) | ||||
raise ValueError("Unknown journal client class `%s`" % cls) | raise ValueError("Unknown journal client class `%s`" % cls) | ||||
def _error_cb(error): | def _error_cb(error): | ||||
if error.fatal(): | if error.fatal(): | ||||
raise KafkaException(error) | raise KafkaException(error) | ||||
if error.code() in _SPAMMY_ERRORS: | if error.code() in _SPAMMY_ERRORS: | ||||
▲ Show 20 Lines • Show All 269 Lines • Show Last 20 Lines |