diff --git a/swh/journal/cli.py b/swh/journal/cli.py index 13455d6..9c12f18 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,263 +1,264 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import logging import mmap import os import time import click try: from systemd.daemon import notify except ImportError: notify = None from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.model.model import SHA1_SIZE from swh.storage import get_storage from swh.objstorage import get_objstorage from swh.journal.client import JournalClient from swh.journal.replay import is_hash_in_bytearray from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content from swh.journal.backfill import JournalBackfiller @click.group(name='journal', context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.pass_context def cli(ctx, config_file): """Software Heritage Journal tools. The journal is a persistent logger of changes to the archive, with publish-subscribe support. """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') if config_file: if not os.path.exists(config_file): raise ValueError('%s does not exist' % config_file) conf = config.read(config_file) else: conf = {} ctx.ensure_object(dict) ctx.obj['config'] = conf def get_journal_client(ctx, **kwargs): conf = ctx.obj['config'].get('journal', {}) conf.update({k: v for (k, v) in kwargs.items() if v not in (None, ())}) if not conf.get('brokers'): ctx.fail('You must specify at least one kafka broker.') if not isinstance(conf['brokers'], (list, tuple)): conf['brokers'] = [conf['brokers']] return JournalClient(**conf) @cli.command() @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to. ' '(deprecated, use the config file instead)') @click.option('--prefix', type=str, default=None, help='Prefix of Kafka topic names to read from. ' '(deprecated, use the config file instead)') @click.option('--group-id', type=str, help='Name of the group id for reading from Kafka. ' '(deprecated, use the config file instead)') @click.pass_context def replay(ctx, brokers, prefix, group_id, max_messages): """Fill a Storage by reading a Journal. There can be several 'replayers' filling a Storage as long as they use the same `group-id`. """ logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: storage = get_storage(**conf.pop('storage')) except KeyError: ctx.fail('You must have a storage configured in your config file.') client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id, max_messages=max_messages) worker_fn = functools.partial(process_replay_objects, storage=storage) if notify: notify('READY=1') try: nb_messages = 0 last_log_time = 0 while not max_messages or nb_messages < max_messages: nb_messages += client.process(worker_fn) if notify: notify('WATCHDOG=1') if time.time() - last_log_time >= 60: # Log at most once per minute. logger.info('Processed %d messages.' % nb_messages) last_log_time = time.time() except KeyboardInterrupt: ctx.exit(0) else: print('Done.') finally: if notify: notify('STOPPING=1') client.close() @cli.command() @click.argument('object_type') @click.option('--start-object', default=None) @click.option('--end-object', default=None) @click.option('--dry-run', is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): """Run the backfiller The backfiller list objects from a Storage and produce journal entries from there. Typically used to rebuild a journal or compensate for missing objects in a journal (eg. due to a downtime of this later). The configuration file requires the following entries: - brokers: a list of kafka endpoints (the journal) in which entries will be added. - storage_dbconn: URL to connect to the storage DB. - prefix: the prefix of the topics (topics will be .). - client_id: the kafka client ID. """ conf = ctx.obj['config'] backfiller = JournalBackfiller(conf) if notify: notify('READY=1') try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run) except KeyboardInterrupt: if notify: notify('STOPPING=1') ctx.exit(0) @cli.command('content-replay') @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to.' '(deprecated, use the config file instead)') @click.option('--prefix', type=str, default=None, help='Prefix of Kafka topic names to read from.' '(deprecated, use the config file instead)') @click.option('--group-id', type=str, help='Name of the group id for reading from Kafka.' '(deprecated, use the config file instead)') @click.option('--exclude-sha1-file', default=None, type=click.File('rb'), help='File containing a sorted array of hashes to be excluded.') @click.pass_context def content_replay(ctx, max_messages, brokers, prefix, group_id, exclude_sha1_file): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they - use the same `group-id`. + use the same `group-id`. You can use the `KAFKA_GROUP_INSTANCE_ID` + environment variable to use KIP-345 static group membership. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka nmessage has the status:visible set. `--exclude-sha1-file` may be used to exclude some hashes to speed-up the replay in case many of the contents are already in the destination objstorage. It must contain a concatenation of all (sha1) hashes, and it must be sorted. This file will not be fully loaded into memory at any given time, so it can be arbitrarily large. """ logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: objstorage_src = get_objstorage(**conf.pop('objstorage_src')) except KeyError: ctx.fail('You must have a source objstorage configured in ' 'your config file.') try: objstorage_dst = get_objstorage(**conf.pop('objstorage_dst')) except KeyError: ctx.fail('You must have a destination objstorage configured ' 'in your config file.') if exclude_sha1_file: map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) if map_.size() % SHA1_SIZE != 0: ctx.fail('--exclude-sha1 must link to a file whose size is an ' 'exact multiple of %d bytes.' % SHA1_SIZE) nb_excluded_hashes = int(map_.size()/SHA1_SIZE) def exclude_fn(obj): return is_hash_in_bytearray(obj['sha1'], map_, nb_excluded_hashes) else: exclude_fn = None client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id, max_messages=max_messages, object_types=('content',)) worker_fn = functools.partial(process_replay_objects_content, src=objstorage_src, dst=objstorage_dst, exclude_fn=exclude_fn) if notify: notify('READY=1') try: nb_messages = 0 last_log_time = 0 while not max_messages or nb_messages < max_messages: nb_messages += client.process(worker_fn) if notify: notify('WATCHDOG=1') if time.time() - last_log_time >= 60: # Log at most once per minute. logger.info('Processed %d messages.' % nb_messages) last_log_time = time.time() except KeyboardInterrupt: ctx.exit(0) else: print('Done.') finally: if notify: notify('STOPPING=1') client.close() def main(): logging.basicConfig() return cli(auto_envvar_prefix='SWH_JOURNAL') if __name__ == '__main__': main() diff --git a/swh/journal/client.py b/swh/journal/client.py index 23aa734..a7a7b68 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,224 +1,254 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging +import os import time from confluent_kafka import Consumer, KafkaException, KafkaError from .serializers import kafka_to_value from swh.journal import DEFAULT_PREFIX logger = logging.getLogger(__name__) rdkafka_logger = logging.getLogger(__name__ + '.rdkafka') # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] # Only accepted object types ACCEPTED_OBJECT_TYPES = [ 'content', 'directory', 'revision', 'release', 'snapshot', 'origin', 'origin_visit' ] # Errors that Kafka raises too often and are not useful; therefore they # we lower their log level to DEBUG instead of INFO. _SPAMMY_ERRORS = [ KafkaError._NO_OFFSET, ] def _error_cb(error): if error.fatal(): raise KafkaException(error) if error.code() in _SPAMMY_ERRORS: logger.debug('Received non-fatal kafka error: %s', error) else: logger.info('Received non-fatal kafka error: %s', error) def _on_commit(error, partitions): if error is not None: _error_cb(error) class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the 'prefix' argument is None (default value), it will take the default value 'swh.journal.objects'. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all accepted object types). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. Messages are processed by the `worker_fn` callback passed to the `process` method, in batches of maximum `max_messages`. Any other named argument is passed directly to KafkaConsumer(). """ def __init__( self, brokers, group_id, prefix=None, object_types=None, max_messages=0, process_timeout=0, auto_offset_reset='earliest', stop_on_eof=False, **kwargs): if prefix is None: prefix = DEFAULT_PREFIX if object_types is None: object_types = ACCEPTED_OBJECT_TYPES if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( 'Option \'auto_offset_reset\' only accept %s, not %s' % (ACCEPTED_OFFSET_RESET, auto_offset_reset)) for object_type in object_types: if object_type not in ACCEPTED_OBJECT_TYPES: raise ValueError( 'Option \'object_types\' only accepts %s, not %s.' % (ACCEPTED_OBJECT_TYPES, object_type)) self.value_deserializer = kafka_to_value if isinstance(brokers, str): brokers = [brokers] debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) if debug_logging and 'debug' not in kwargs: kwargs['debug'] = 'consumer' + # Static group instance id management + group_instance_id = os.environ.get('KAFKA_GROUP_INSTANCE_ID') + if group_instance_id: + kwargs['group.instance.id'] = group_instance_id + + if 'group.instance.id' in kwargs: + # When doing static consumer group membership, set a higher default + # session timeout. The session timeout is the duration after which + # the broker considers that a consumer has left the consumer group + # for good, and triggers a rebalance. Considering our current + # processing pattern, 10 minutes gives the consumer ample time to + # restart before that happens. + if 'session.timeout.ms' not in kwargs: + kwargs['session.timeout.ms'] = 10 * 60 * 1000 # 10 minutes + + if 'session.timeout.ms' in kwargs: + # When the session timeout is set, rdkafka requires the max poll + # interval to be set to a higher value; the max poll interval is + # rdkafka's way of figuring out whether the client's message + # processing thread has stalled: when the max poll interval lapses + # between two calls to consumer.poll(), rdkafka leaves the consumer + # group and terminates the connection to the brokers. + # + # We default to 1.5 times the session timeout + if 'max.poll.interval.ms' not in kwargs: + kwargs['max.poll.interval.ms'] = ( + kwargs['session.timeout.ms'] // 2 * 3 + ) + consumer_settings = { **kwargs, 'bootstrap.servers': ','.join(brokers), 'auto.offset.reset': auto_offset_reset, 'group.id': group_id, 'on_commit': _on_commit, 'error_cb': _error_cb, 'enable.auto.commit': False, 'logger': rdkafka_logger, } self.stop_on_eof = stop_on_eof if self.stop_on_eof: consumer_settings['enable.partition.eof'] = True logger.debug('Consumer settings: %s', consumer_settings) self.consumer = Consumer(consumer_settings) topics = ['%s.%s' % (prefix, object_type) for object_type in object_types] logger.debug('Upstream topics: %s', self.consumer.list_topics(timeout=10)) logger.debug('Subscribing to: %s', topics) self.consumer.subscribe(topics=topics) self.max_messages = max_messages self.process_timeout = process_timeout self.eof_reached = set() self._object_types = object_types def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ start_time = time.monotonic() nb_messages = 0 while True: # timeout for message poll timeout = 1.0 elapsed = time.monotonic() - start_time if self.process_timeout: # +0.01 to prevent busy-waiting on / spamming consumer.poll. # consumer.consume() returns shortly before X expired # (a matter of milliseconds), so after it returns a first # time, it would then be called with a timeout in the order # of milliseconds, therefore returning immediately, then be # called again, etc. if elapsed + 0.01 >= self.process_timeout: break timeout = self.process_timeout - elapsed num_messages = 20 if self.max_messages: if nb_messages >= self.max_messages: break num_messages = min(num_messages, self.max_messages-nb_messages) messages = self.consumer.consume( timeout=timeout, num_messages=num_messages) if not messages: continue nb_processed, at_eof = self.handle_messages(messages, worker_fn) nb_messages += nb_processed if at_eof: break return nb_messages def handle_messages(self, messages, worker_fn): objects = defaultdict(list) nb_processed = 0 for message in messages: error = message.error() if error is not None: if error.code() == KafkaError._PARTITION_EOF: self.eof_reached.add( (message.topic(), message.partition()) ) else: _error_cb(error) continue nb_processed += 1 object_type = message.topic().split('.')[-1] # Got a message from a topic we did not subscribe to. assert object_type in self._object_types, object_type objects[object_type].append(self.deserialize_message(message)) if objects: worker_fn(dict(objects)) self.consumer.commit() at_eof = (self.stop_on_eof and all( (tp.topic, tp.partition) in self.eof_reached for tp in self.consumer.assignment() )) return nb_processed, at_eof def deserialize_message(self, message): return self.value_deserializer(message.value()) def close(self): self.consumer.close() diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py index a396f7f..1d00b89 100644 --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -1,223 +1,271 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import logging import re import tempfile from subprocess import Popen from typing import Any, Dict, Tuple from unittest.mock import patch from click.testing import CliRunner from confluent_kafka import Producer import pytest from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.storage import get_storage from swh.journal.cli import cli from swh.journal.serializers import key_to_kafka, value_to_kafka logger = logging.getLogger(__name__) CLI_CONFIG = ''' storage: cls: memory args: {} objstorage_src: cls: mocked args: name: src objstorage_dst: cls: mocked args: name: dst ''' @pytest.fixture def storage(): """An swh-storage object that gets injected into the CLI functions.""" storage_config = { 'cls': 'pipeline', 'steps': [ {'cls': 'validate'}, {'cls': 'memory'}, ] } storage = get_storage(**storage_config) with patch('swh.journal.cli.get_storage') as get_storage_mock: get_storage_mock.return_value = storage yield storage -def invoke(catch_exceptions, args): +def invoke(catch_exceptions, args, env=None): runner = CliRunner() with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) args = ['-C' + config_fd.name] + args - result = runner.invoke(cli, args, obj={'log_level': logging.DEBUG}) + result = runner.invoke( + cli, args, obj={'log_level': logging.DEBUG}, env=env, + ) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_replay( storage, kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test-producer', 'enable.idempotence': 'true', }) snapshot = {'id': b'foo', 'branches': { b'HEAD': { 'target_type': 'revision', 'target': b'\x01'*20, } }} # type: Dict[str, Any] producer.produce( topic=kafka_prefix+'.snapshot', key=key_to_kafka(snapshot['id']), value=value_to_kafka(snapshot), ) producer.flush() logger.debug('Flushed producer') result = invoke(False, [ 'replay', '--broker', '127.0.0.1:%d' % port, '--group-id', 'test-cli-consumer', '--prefix', kafka_prefix, '--max-messages', '1', ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output assert storage.snapshot_get(snapshot['id']) == { **snapshot, 'next_branch': None} def _patch_objstorages(names): objstorages = {name: InMemoryObjStorage() for name in names} def get_mock_objstorage(cls, args): assert cls == 'mocked', cls return objstorages[args['name']] def decorator(f): @functools.wraps(f) @patch('swh.journal.cli.get_objstorage') def newf(get_objstorage_mock, *args, **kwargs): get_objstorage_mock.side_effect = get_mock_objstorage f(*args, objstorages=objstorages, **kwargs) return newf return decorator def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages): producer = Producer({ 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), 'client.id': 'test-producer', 'enable.idempotence': 'true', }) contents = {} for i in range(10): content = b'\x00'*19 + bytes([i]) sha1 = objstorages['src'].add(content) contents[sha1] = content producer.produce( topic=kafka_prefix+'.content', key=key_to_kafka(sha1), value=key_to_kafka({ 'sha1': sha1, 'status': 'visible', }), ) producer.flush() return contents @_patch_objstorages(['src', 'dst']) def test_replay_content( objstorages, storage, kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) result = invoke(False, [ 'content-replay', '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', 'test-cli-consumer', '--prefix', kafka_prefix, '--max-messages', '10', ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content +@_patch_objstorages(['src', 'dst']) +def test_replay_content_static_group_id( + objstorages, + storage, + kafka_prefix: str, + kafka_server: Tuple[Popen, int], + caplog): + (_, kafka_port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + contents = _fill_objstorage_and_kafka( + kafka_port, kafka_prefix, objstorages) + + # Setup log capture to fish the consumer settings out of the log messages + caplog.set_level(logging.DEBUG, 'swh.journal.client') + + result = invoke(False, [ + 'content-replay', + '--broker', '127.0.0.1:%d' % kafka_port, + '--group-id', 'test-cli-consumer', + '--prefix', kafka_prefix, + '--max-messages', '10', + ], {'KAFKA_GROUP_INSTANCE_ID': 'static-group-instance-id'}) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + consumer_settings = None + for record in caplog.records: + if 'Consumer settings' in record.message: + consumer_settings = record.args + break + + assert consumer_settings is not None, ( + 'Failed to get consumer settings out of the consumer log. ' + 'See log capture for details.' + ) + assert consumer_settings['group.instance.id'] == 'static-group-instance-id' + assert consumer_settings['session.timeout.ms'] == 60 * 10 * 1000 + assert consumer_settings['max.poll.interval.ms'] == 90 * 10 * 1000 + + for (sha1, content) in contents.items(): + assert sha1 in objstorages['dst'], sha1 + assert objstorages['dst'].get(sha1) == content + + @_patch_objstorages(['src', 'dst']) def test_replay_content_exclude( objstorages, storage, kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) excluded_contents = list(contents)[0::2] # picking half of them with tempfile.NamedTemporaryFile(mode='w+b') as fd: fd.write(b''.join(sorted(excluded_contents))) fd.seek(0) result = invoke(False, [ 'content-replay', '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', 'test-cli-consumer', '--prefix', kafka_prefix, '--max-messages', '10', '--exclude-sha1-file', fd.name, ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): if sha1 in excluded_contents: assert sha1 not in objstorages['dst'], sha1 else: assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content