diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -48,6 +48,8 @@ entry_points=''' [console_scripts] swh-journal=swh.journal.cli:main + [swh.cli.subcommands] + journal=swh.journal.cli:cli ''', install_requires=parse_requirements() + parse_requirements('swh'), setup_requires=['vcversioner'], diff --git a/swh/journal/__init__.py b/swh/journal/__init__.py --- a/swh/journal/__init__.py +++ b/swh/journal/__init__.py @@ -0,0 +1,7 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# the default prefix for kafka's topics +DEFAULT_PREFIX = 'swh.journal.objects' diff --git a/swh/journal/backfill.py b/swh/journal/backfill.py --- a/swh/journal/backfill.py +++ b/swh/journal/backfill.py @@ -366,7 +366,7 @@ yield record -MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'final_prefix', 'client_id'] +MANDATORY_KEYS = ['brokers', 'storage_dbconn', 'prefix', 'client_id'] class JournalBackfiller: @@ -430,7 +430,7 @@ db = BaseDb.connect(self.config['storage_dbconn']) writer = DirectKafkaWriter( brokers=self.config['brokers'], - prefix=self.config['final_prefix'], + prefix=self.config['prefix'], client_id=self.config['client_id'] ) for range_start, range_end in RANGE_GENERATORS[object_type]( diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -9,52 +9,43 @@ import os from swh.core import config +from swh.core.cli import CONTEXT_SETTINGS from swh.storage import get_storage from swh.journal.client import JournalClient from swh.journal.replay import process_replay_objects from swh.journal.backfill import JournalBackfiller -CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) - -@click.group(context_settings=CONTEXT_SETTINGS) +@click.group(name='journal', context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") -@click.option('--log-level', '-l', default='INFO', - type=click.Choice(logging._nameToLevel.keys()), - help="Log level (default to INFO)") @click.pass_context -def cli(ctx, config_file, log_level): - """Software Heritage Scheduler CLI interface +def cli(ctx, config_file): + """Software Heritage Journal tools. - Default to use the the local scheduler instance (plugged to the - main scheduler db). + The journal is a persistent logger of changes to the archive, with + publish-subscribe support. """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') - if not config_file: - raise ValueError('You must either pass a config-file parameter ' - 'or set SWH_CONFIG_FILENAME to target ' - 'the config-file') - if not os.path.exists(config_file): - raise ValueError('%s does not exist' % config_file) + if config_file: + if not os.path.exists(config_file): + raise ValueError('%s does not exist' % config_file) + conf = config.read(config_file) + else: + conf = {} - conf = config.read(config_file) ctx.ensure_object(dict) - logging.basicConfig( - level=log_level, - format='%(asctime)s %(levelname)s %(name)s %(message)s', - ) - + log_level = ctx.obj.get('log_level', logging.INFO) + logging.root.setLevel(log_level) logging.getLogger('kafka').setLevel(logging.INFO) ctx.obj['config'] = conf - ctx.obj['loglevel'] = log_level @cli.command() @@ -62,22 +53,44 @@ help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, + hidden=True, # prefer config file help='Kafka broker to connect to.') -@click.option('--prefix', type=str, default='swh.journal.objects', +@click.option('--prefix', type=str, default=None, + hidden=True, # prefer config file help='Prefix of Kafka topic names to read from.') -@click.option('--consumer-id', type=str, +@click.option('--group-id', '--consumer-id', type=str, + hidden=True, # prefer config file help='Name of the consumer/group id for reading from Kafka.') @click.pass_context -def replay(ctx, brokers, prefix, consumer_id, max_messages): - """Fill a new storage by reading a journal. +def replay(ctx, brokers, prefix, group_id, max_messages): + """Fill a Storage by reading a Journal. + There can be several 'replayers' filling a Storage as long as they use + the same `group-id`. """ - conf = ctx.obj['config'] logger = logging.getLogger(__name__) - logger.setLevel(ctx.obj['loglevel']) - storage = get_storage(**conf.pop('storage')) - client = JournalClient(brokers, prefix, consumer_id) + conf = ctx.obj['config'] + try: + storage = get_storage(**conf.pop('storage')) + except KeyError: + ctx.fail('You must have a storage configured in your config file.') + + if brokers is None: + brokers = conf.get('journal', {}).get('brokers') + if not brokers: + ctx.fail('You must specify at least one kafka broker.') + if not isinstance(brokers, list): + brokers = [brokers] + + if prefix is None: + prefix = conf.get('journal', {}).get('prefix') + + if group_id is None: + group_id = conf.get('journal', {}).get('group_id') + + client = JournalClient(brokers=brokers, group_id=group_id, prefix=prefix) worker_fn = functools.partial(process_replay_objects, storage=storage) + try: nb_messages = 0 while not max_messages or nb_messages < max_messages: @@ -96,7 +109,20 @@ @click.option('--dry-run', is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): - """Manipulate backfiller + """Run the backfiller + + The backfiller list objects from a Storage and produce journal entries from + there. + + Typically used to rebuild a journal or compensate for missing objects in a + journal (eg. due to a downtime of this later). + + The configuration file requires the following entries: + - brokers: a list of kafka endpoints (the journal) in which entries will be + added. + - storage_dbconn: URL to connect to the storage DB. + - prefix: the prefix of the topics (topics will be .). + - client_id: the kafka client ID. """ conf = ctx.obj['config'] @@ -111,6 +137,7 @@ def main(): + logging.basicConfig() return cli(auto_envvar_prefix='SWH_JOURNAL') diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -7,7 +7,7 @@ import logging from .serializers import kafka_to_key, kafka_to_value - +from swh.journal import DEFAULT_PREFIX logger = logging.getLogger(__name__) @@ -46,7 +46,7 @@ """ def __init__( - self, brokers, topic_prefix, consumer_id, + self, brokers, group_id, prefix=DEFAULT_PREFIX, object_types=ACCEPTED_OBJECT_TYPES, max_messages=0, auto_offset_reset='earliest'): @@ -67,23 +67,17 @@ value_deserializer=kafka_to_value, auto_offset_reset=auto_offset_reset, enable_auto_commit=False, - group_id=consumer_id, + group_id=group_id, ) self.consumer.subscribe( - topics=['%s.%s' % (topic_prefix, object_type) + topics=['%s.%s' % (prefix, object_type) for object_type in object_types], ) self.max_messages = max_messages self._object_types = object_types - def poll(self): - return self.consumer.poll() - - def commit(self): - self.consumer.commit() - def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. @@ -94,7 +88,7 @@ argument. """ nb_messages = 0 - polled = self.poll() + polled = self.consumer.poll() for (partition, messages) in polled.items(): object_type = partition.topic.split('.')[-1] # Got a message from a topic we did not subscribe to. @@ -104,5 +98,5 @@ nb_messages += len(messages) - self.commit() + self.consumer.commit() return nb_messages diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -163,8 +163,6 @@ TEST_CONFIG = { - 'temporary_prefix': 'swh.tmp_journal.new', - 'final_prefix': 'swh.journal.objects', 'consumer_id': 'swh.journal.consumer', 'object_types': OBJECT_TYPE_KEYS.keys(), 'max_messages': 1, # will read 1 message and stops @@ -182,8 +180,7 @@ return { **TEST_CONFIG, 'brokers': ['localhost:{}'.format(port)], - 'temporary_prefix': kafka_prefix + '.swh.tmp_journal.new', - 'final_prefix': kafka_prefix + '.swh.journal.objects', + 'prefix': kafka_prefix + '.swh.journal.objects', } @@ -194,7 +191,7 @@ """ kafka_topics = [ - '%s.%s' % (test_config['final_prefix'], object_type) + '%s.%s' % (test_config['prefix'], object_type) for object_type in test_config['object_types']] _, kafka_port = kafka_server consumer = KafkaConsumer( diff --git a/swh/journal/tests/test_backfill.py b/swh/journal/tests/test_backfill.py --- a/swh/journal/tests/test_backfill.py +++ b/swh/journal/tests/test_backfill.py @@ -12,7 +12,7 @@ TEST_CONFIG = { 'brokers': ['localhost'], - 'final_prefix': 'swh.tmp_journal.new', + 'prefix': 'swh.tmp_journal.new', 'client_id': 'swh.journal.client.test', 'storage_dbconn': 'service=swh-dev', } diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -41,7 +41,7 @@ with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) - args = ['-C' + config_fd.name, '-l', 'DEBUG'] + args + args = ['-C' + config_fd.name] + args result = runner.invoke(cli, args) if not catch_exceptions and result.exception: print(result.output) @@ -75,7 +75,7 @@ result = invoke(False, [ 'replay', '--broker', 'localhost:%d' % port, - '--consumer-id', 'test-cli-consumer', + '--group-id', 'test-cli-consumer', '--prefix', kafka_prefix, '--max-messages', '1', ]) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -57,8 +57,8 @@ # Fill the storage from Kafka config = { 'brokers': 'localhost:%d' % kafka_server[1], - 'consumer_id': 'replayer', - 'topic_prefix': kafka_prefix, + 'group_id': 'replayer', + 'prefix': kafka_prefix, 'max_messages': nb_sent, } replayer = JournalClient(**config) diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -26,102 +26,41 @@ class MockedDirectKafkaWriter(DirectKafkaWriter): def __init__(self): self._prefix = 'prefix' + self.queue = [] + self.committed = False - -class MockedJournalClient(JournalClient): - def __init__(self, object_types=ACCEPTED_OBJECT_TYPES): - self._object_types = object_types - - -@given(lists(object_dicts(), min_size=1)) -@settings(suppress_health_check=[HealthCheck.too_slow]) -def test_write_replay_same_order(objects): - committed = False - queue = [] - - def send(topic, key, value): + def send(self, topic, key, value): key = kafka_to_key(key_to_kafka(key)) value = kafka_to_value(value_to_kafka(value)) - queue.append({ - FakeKafkaPartition(topic): - [FakeKafkaMessage(key=key, value=value)] - }) - - def poll(): - return queue.pop(0) - - def commit(): - nonlocal committed - if queue == []: - committed = True - - storage1 = Storage() - storage1.journal_writer = MockedDirectKafkaWriter() - storage1.journal_writer.send = send - - for (obj_type, obj) in objects: - obj = obj.copy() - if obj_type == 'origin_visit': - origin_id = storage1.origin_add_one(obj.pop('origin')) - if 'visit' in obj: - del obj['visit'] - storage1.origin_visit_add(origin_id, **obj) + partition = FakeKafkaPartition(topic) + msg = FakeKafkaMessage(key=key, value=value) + if self.queue and {partition} == set(self.queue[-1]): + # The last message is of the same object type, groupping them + self.queue[-1][partition].append(msg) else: - method = getattr(storage1, obj_type + '_add') - try: - method([obj]) - except HashCollision: - pass + self.queue.append({partition: [msg]}) - storage2 = Storage() - worker_fn = functools.partial(process_replay_objects, storage=storage2) - replayer = MockedJournalClient() - replayer.poll = poll - replayer.commit = commit - queue_size = len(queue) - nb_messages = 0 - while nb_messages < queue_size: - nb_messages += replayer.process(worker_fn) + def poll(self): + return self.queue.pop(0) - assert nb_messages == queue_size - assert committed + def commit(self): + if self.queue == []: + self.committed = True - for attr_name in ('_contents', '_directories', '_revisions', '_releases', - '_snapshots', '_origin_visits', '_origins'): - assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ - attr_name + +class MockedJournalClient(JournalClient): + def __init__(self, object_types=ACCEPTED_OBJECT_TYPES): + self._object_types = object_types + self.consumer = MockedDirectKafkaWriter() @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_same_order_batches(objects): - committed = False - queue = [] - - def send(topic, key, value): - key = kafka_to_key(key_to_kafka(key)) - value = kafka_to_value(value_to_kafka(value)) - partition = FakeKafkaPartition(topic) - msg = FakeKafkaMessage(key=key, value=value) - if queue and {partition} == set(queue[-1]): - # The last message is of the same object type, groupping them - queue[-1][partition].append(msg) - else: - queue.append({ - FakeKafkaPartition(topic): [msg] - }) - - def poll(): - return queue.pop(0) - - def commit(): - nonlocal committed - if queue == []: - committed = True + replayer = MockedJournalClient() storage1 = Storage() - storage1.journal_writer = MockedDirectKafkaWriter() - storage1.journal_writer.send = send + storage1.journal_writer = replayer.consumer for (obj_type, obj) in objects: obj = obj.copy() @@ -138,19 +77,16 @@ pass queue_size = sum(len(partition) - for batch in queue + for batch in replayer.consumer.queue for partition in batch.values()) storage2 = Storage() worker_fn = functools.partial(process_replay_objects, storage=storage2) - replayer = MockedJournalClient() - replayer.poll = poll - replayer.commit = commit nb_messages = 0 while nb_messages < queue_size: nb_messages += replayer.process(worker_fn) - assert committed + assert replayer.consumer.committed for attr_name in ('_contents', '_directories', '_revisions', '_releases', '_snapshots', '_origin_visits', '_origins'):