diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -48,6 +48,8 @@ entry_points=''' [console_scripts] swh-journal=swh.journal.cli:main + [swh.cli.subcommands] + journal=swh.journal.cli:cli ''', install_requires=parse_requirements() + parse_requirements('swh'), setup_requires=['vcversioner'], diff --git a/swh/journal/__init__.py b/swh/journal/__init__.py --- a/swh/journal/__init__.py +++ b/swh/journal/__init__.py @@ -0,0 +1,7 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# the default prefix for kafka's topics +DEFAULT_PREFIX = 'swh.journal.objects' diff --git a/swh/journal/backfill.py b/swh/journal/backfill.py --- a/swh/journal/backfill.py +++ b/swh/journal/backfill.py @@ -366,7 +366,7 @@ yield record -MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'final_prefix', 'client_id'] +MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'prefix', 'client_id'] class JournalBackfiller: @@ -430,7 +430,7 @@ db = BaseDb.connect(self.config['storage_dbconn']) writer = DirectKafkaWriter( brokers=self.config['brokers'], - prefix=self.config['final_prefix'], + prefix=self.config['prefix'], client_id=self.config['client_id'] ) for range_start, range_end in RANGE_GENERATORS[object_type]( diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -9,52 +9,44 @@ import os from swh.core import config +from swh.core.cli import CONTEXT_SETTINGS from swh.storage import get_storage +from swh.journal import DEFAULT_PREFIX from swh.journal.client import JournalClient from swh.journal.replay import process_replay_objects from swh.journal.backfill import JournalBackfiller -CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) - -@click.group(context_settings=CONTEXT_SETTINGS) +@click.group(name='journal', context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") -@click.option('--log-level', '-l', default='INFO', - type=click.Choice(logging._nameToLevel.keys()), - help="Log level (default to INFO)") @click.pass_context -def cli(ctx, config_file, log_level): - """Software Heritage Scheduler CLI interface +def cli(ctx, config_file): + """Software Heritage Journal tools. - Default to use the the local scheduler instance (plugged to the - main scheduler db). + The journal is a persistent logger of changes to the archive, with + publish-subscribe support. """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') - if not config_file: - raise ValueError('You must either pass a config-file parameter ' - 'or set SWH_CONFIG_FILENAME to target ' - 'the config-file') - if not os.path.exists(config_file): - raise ValueError('%s does not exist' % config_file) + if config_file: + if not os.path.exists(config_file): + raise ValueError('%s does not exist' % config_file) + conf = config.read(config_file) + else: + conf = {} - conf = config.read(config_file) ctx.ensure_object(dict) - logging.basicConfig( - level=log_level, - format='%(asctime)s %(levelname)s %(name)s %(message)s', - ) - + log_level = ctx.obj.get('log_level', logging.INFO) + logging.root.setLevel(log_level) logging.getLogger('kafka').setLevel(logging.INFO) ctx.obj['config'] = conf - ctx.obj['loglevel'] = log_level @cli.command() @@ -62,22 +54,42 @@ help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, + hidden=True, # prefer config file help='Kafka broker to connect to.') -@click.option('--prefix', type=str, default='swh.journal.objects', +@click.option('--prefix', type=str, default=DEFAULT_PREFIX, + hidden=True, # prefer config file help='Prefix of Kafka topic names to read from.') -@click.option('--consumer-id', type=str, +@click.option('--group-id', '--consumer-id', type=str, + hidden=True, # prefer config file help='Name of the consumer/group id for reading from Kafka.') @click.pass_context -def replay(ctx, brokers, prefix, consumer_id, max_messages): - """Fill a new storage by reading a journal. +def replay(ctx, brokers, prefix, group_id, max_messages): + """Fill a Storage by reading a Journal. + There can be several 'replayers' filling a Storage as long as they use + the same `group-id`. """ - conf = ctx.obj['config'] logger = logging.getLogger(__name__) - logger.setLevel(ctx.obj['loglevel']) - storage = get_storage(**conf.pop('storage')) - client = JournalClient(brokers, prefix, consumer_id) + conf = ctx.obj['config'] + try: + storage = get_storage(**conf.pop('storage')) + except KeyError: + ctx.fail('You must have a storage configured in your config file.') + + if brokers is None: + brokers = conf.get('journal', {}).get('brokers') + if not brokers: + ctx.fail('You must specify at least one kafka broker.') + + if prefix is None: + prefix = conf.get('journal', {}).get('prefix') + + if group_id is None: + group_id = conf.get('journal', {}).get('group_id') + + client = JournalClient(brokers, prefix, group_id) worker_fn = functools.partial(process_replay_objects, storage=storage) + try: nb_messages = 0 while not max_messages or nb_messages < max_messages: @@ -96,7 +108,20 @@ @click.option('--dry-run', is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): - """Manipulate backfiller + """Run the backfiller + + The backfiller list objects from a Storage and produce journal entries from + there. + + Typically used to rebuild a journal or compensate for missing objects in a + journal (eg. due to a downtime of this later). + + The configuration file requires the following entries: + - brokers: a list of kafka endpoints (the journal) in which entries will be + added. + - storage_dbconn: URL to connect to the storage DB. + - prefix: the prefix of the topics (topics will be .). + - client_id: the kafka client ID. """ conf = ctx.obj['config'] @@ -111,6 +136,7 @@ def main(): + logging.basicConfig() return cli(auto_envvar_prefix='SWH_JOURNAL') diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -7,7 +7,6 @@ from swh.storage import HashCollision - logger = logging.getLogger(__name__) diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -163,8 +163,6 @@ TEST_CONFIG = { - 'temporary_prefix': 'swh.tmp_journal.new', - 'final_prefix': 'swh.journal.objects', 'consumer_id': 'swh.journal.consumer', 'object_types': OBJECT_TYPE_KEYS.keys(), 'max_messages': 1, # will read 1 message and stops @@ -182,8 +180,7 @@ return { **TEST_CONFIG, 'brokers': ['localhost:{}'.format(port)], - 'temporary_prefix': kafka_prefix + '.swh.tmp_journal.new', - 'final_prefix': kafka_prefix + '.swh.journal.objects', + 'prefix': kafka_prefix + '.swh.journal.objects', } @@ -194,7 +191,7 @@ """ kafka_topics = [ - '%s.%s' % (test_config['final_prefix'], object_type) + '%s.%s' % (test_config['prefix'], object_type) for object_type in test_config['object_types']] _, kafka_port = kafka_server consumer = KafkaConsumer( diff --git a/swh/journal/tests/test_backfill.py b/swh/journal/tests/test_backfill.py --- a/swh/journal/tests/test_backfill.py +++ b/swh/journal/tests/test_backfill.py @@ -12,7 +12,7 @@ TEST_CONFIG = { 'brokers': ['localhost'], - 'final_prefix': 'swh.tmp_journal.new', + 'prefix': 'swh.tmp_journal.new', 'client_id': 'swh.journal.client.test', 'storage_dbconn': 'service=swh-dev', } diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -41,7 +41,7 @@ with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) - args = ['-C' + config_fd.name, '-l', 'DEBUG'] + args + args = ['-C' + config_fd.name] + args result = runner.invoke(cli, args) if not catch_exceptions and result.exception: print(result.output)