diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -21,7 +21,7 @@ from swh.storage import get_storage from swh.objstorage import get_objstorage -from swh.journal.client import JournalClient +from swh.journal.client import get_journal_client as get_client from swh.journal.replay import is_hash_in_bytearray from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content @@ -60,13 +60,10 @@ def get_journal_client(ctx, **kwargs): - conf = ctx.obj["config"].get("journal", {}) - conf.update({k: v for (k, v) in kwargs.items() if v not in (None, ())}) - if not conf.get("brokers"): - ctx.fail("You must specify at least one kafka broker.") - if not isinstance(conf["brokers"], (list, tuple)): - conf["brokers"] = [conf["brokers"]] - return JournalClient(**conf) + try: + return get_client(ctx.obj["config"], **kwargs) + except ValueError as exc: + ctx.fail(exc) @cli.command() diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -39,6 +39,16 @@ ] +def get_journal_client(cfg, **kwargs): + conf = cfg.get("journal", {}) + conf.update({k: v for (k, v) in kwargs.items() if v not in (None, ())}) + if not conf.get("brokers"): + raise ValueError("You must specify at least one kafka broker.") + if not isinstance(conf["brokers"], (list, tuple)): + conf["brokers"] = [conf["brokers"]] + return JournalClient(**conf) + + def _error_cb(error): if error.fatal(): raise KafkaException(error)