diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information from collections import defaultdict +from importlib import import_module import logging import os from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union @@ -34,6 +35,30 @@ Currently, only the "kafka" journal client is supported. """ if cls == "kafka": + if "stats_cb" in kwargs: + stats_cb = kwargs["stats_cb"] + if isinstance(stats_cb, str): + try: + module_path, func_name = stats_cb.split(":") + except ValueError: + raise ValueError( + "Invalid stats_cb configuration option: " + "it should be a string like 'path.to.module:function'" + ) + try: + module = import_module(module_path, package=__package__) + except ModuleNotFoundError: + raise ValueError( + "Invalid stats_cb configuration option: " + f"module {module_path} not found" + ) + try: + kwargs["stats_cb"] = getattr(module, func_name) + except AttributeError: + raise ValueError( + "Invalid stats_cb configuration option: " + f"function {func_name} not found in module {module_path}" + ) return JournalClient(**kwargs) raise ValueError("Unknown journal client class `%s`" % cls)