diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -50,25 +50,14 @@ ctx.obj['config'] = conf -def get_journal_client(ctx, brokers, prefix, group_id, object_types=None): - conf = ctx.obj['config'] - if not brokers: - brokers = conf.get('journal', {}).get('brokers') - if not brokers: +def get_journal_client(ctx, **kwargs): + conf = ctx.obj['config'].get('journal', {}) + conf.update({k: v for (k, v) in kwargs.items() if v not in (None, ())}) + if not conf.get('brokers'): ctx.fail('You must specify at least one kafka broker.') - if not isinstance(brokers, (list, tuple)): - brokers = [brokers] - - if prefix is None: - prefix = conf.get('journal', {}).get('prefix') - - if group_id is None: - group_id = conf.get('journal', {}).get('group_id') - - kwargs = dict(brokers=brokers, group_id=group_id, prefix=prefix) - if object_types: - kwargs['object_types'] = object_types - return JournalClient(**kwargs) + if not isinstance(conf['brokers'], (list, tuple)): + conf['brokers'] = [conf['brokers']] + return JournalClient(**conf) @cli.command() @@ -98,7 +87,8 @@ except KeyError: ctx.fail('You must have a storage configured in your config file.') - client = get_journal_client(ctx, brokers, prefix, group_id) + client = get_journal_client( + ctx, brokers=brokers, prefix=prefix, group_id=group_id) worker_fn = functools.partial(process_replay_objects, storage=storage) try: @@ -181,8 +171,9 @@ ctx.fail('You must have a destination objstorage configured ' 'in your config file.') - client = get_journal_client(ctx, brokers, prefix, group_id, - object_types=('content',)) + client = get_journal_client( + ctx, brokers=brokers, prefix=prefix, group_id=group_id, + object_types=('content',)) worker_fn = functools.partial(process_replay_objects_content, src=objstorage_src, dst=objstorage_dst) diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -46,10 +46,12 @@ Messages are processed by the `process_objects` method in batches of maximum `max_messages`. + Any other named argument is passed directly to KafkaConsumer(). + """ def __init__( self, brokers, group_id, prefix=None, object_types=None, - max_messages=0, auto_offset_reset='earliest'): + max_messages=0, auto_offset_reset='earliest', **kwargs): if prefix is None: prefix = DEFAULT_PREFIX if object_types is None: @@ -72,7 +74,7 @@ auto_offset_reset=auto_offset_reset, enable_auto_commit=False, group_id=group_id, - ) + **kwargs) self.consumer.subscribe( topics=['%s.%s' % (prefix, object_type)