diff --git a/swh/journal/cli.py b/swh/journal/cli.py index 45a963d..d5aecdb 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,206 +1,207 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import functools import logging import os from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.storage import get_storage from swh.objstorage import get_objstorage -from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES +from swh.journal.client import JournalClient from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content from swh.journal.backfill import JournalBackfiller @click.group(name='journal', context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.pass_context def cli(ctx, config_file): """Software Heritage Journal tools. The journal is a persistent logger of changes to the archive, with publish-subscribe support. """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') if config_file: if not os.path.exists(config_file): raise ValueError('%s does not exist' % config_file) conf = config.read(config_file) else: conf = {} ctx.ensure_object(dict) log_level = ctx.obj.get('log_level', logging.INFO) logging.root.setLevel(log_level) logging.getLogger('kafka').setLevel(logging.INFO) ctx.obj['config'] = conf -def get_journal_client(ctx, brokers, prefix, group_id, - object_types=ACCEPTED_OBJECT_TYPES): +def get_journal_client(ctx, brokers, prefix, group_id, object_types=None): conf = ctx.obj['config'] if brokers is None: brokers = conf.get('journal', {}).get('brokers') if not brokers: ctx.fail('You must specify at least one kafka broker.') if not isinstance(brokers, (list, tuple)): brokers = [brokers] if prefix is None: prefix = conf.get('journal', {}).get('prefix') if group_id is None: group_id = conf.get('journal', {}).get('group_id') - return JournalClient( - brokers=brokers, group_id=group_id, prefix=prefix, - object_types=object_types) + kwargs = dict(brokers=brokers, group_id=group_id, prefix=prefix) + if object_types: + kwargs['object_types'] = object_types + return JournalClient(**kwargs) @cli.command() @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, hidden=True, # prefer config file help='Kafka broker to connect to.') @click.option('--prefix', type=str, default=None, hidden=True, # prefer config file help='Prefix of Kafka topic names to read from.') @click.option('--group-id', '--consumer-id', type=str, hidden=True, # prefer config file help='Name of the consumer/group id for reading from Kafka.') @click.pass_context def replay(ctx, brokers, prefix, group_id, max_messages): """Fill a Storage by reading a Journal. There can be several 'replayers' filling a Storage as long as they use the same `group-id`. """ logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: storage = get_storage(**conf.pop('storage')) except KeyError: ctx.fail('You must have a storage configured in your config file.') client = get_journal_client(ctx, brokers, prefix, group_id) worker_fn = functools.partial(process_replay_objects, storage=storage) try: nb_messages = 0 while not max_messages or nb_messages < max_messages: nb_messages += client.process(worker_fn) logger.info('Processed %d messages.' % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') @cli.command() @click.argument('object_type') @click.option('--start-object', default=None) @click.option('--end-object', default=None) @click.option('--dry-run', is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): """Run the backfiller The backfiller list objects from a Storage and produce journal entries from there. Typically used to rebuild a journal or compensate for missing objects in a journal (eg. due to a downtime of this later). The configuration file requires the following entries: - brokers: a list of kafka endpoints (the journal) in which entries will be added. - storage_dbconn: URL to connect to the storage DB. - prefix: the prefix of the topics (topics will be .). - client_id: the kafka client ID. """ conf = ctx.obj['config'] backfiller = JournalBackfiller(conf) try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run) except KeyboardInterrupt: ctx.exit(0) @cli.command() @click.option('--broker', 'brokers', type=str, multiple=True, hidden=True, # prefer config file help='Kafka broker to connect to.') @click.option('--prefix', type=str, default=None, hidden=True, # prefer config file help='Prefix of Kafka topic names to read from.') @click.option('--group-id', '--consumer-id', type=str, hidden=True, # prefer config file help='Name of the consumer/group id for reading from Kafka.') @click.pass_context def content_replay(ctx, brokers, prefix, group_id): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they use the same `group-id`. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka nmessage has the status:visible set. """ logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: objstorage_src = get_objstorage(**conf.pop('objstorage_src')) except KeyError: ctx.fail('You must have a source objstorage configured in ' 'your config file.') try: objstorage_dst = get_objstorage(**conf.pop('objstorage_dst')) except KeyError: ctx.fail('You must have a destination objstorage configured ' 'in your config file.') - client = get_journal_client(ctx, brokers, prefix, group_id) + client = get_journal_client(ctx, brokers, prefix, group_id, + object_types=('content',)) worker_fn = functools.partial(process_replay_objects_content, src=objstorage_src, dst=objstorage_dst) try: nb_messages = 0 while True: nb_messages += client.process(worker_fn) logger.info('Processed %d messages.' % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') def main(): logging.basicConfig() return cli(auto_envvar_prefix='SWH_JOURNAL') if __name__ == '__main__': main() diff --git a/swh/journal/client.py b/swh/journal/client.py index 9446944..2024e78 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,105 +1,106 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from kafka import KafkaConsumer import logging from .serializers import kafka_to_key, kafka_to_value from swh.journal import DEFAULT_PREFIX logger = logging.getLogger(__name__) # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] # Only accepted object types ACCEPTED_OBJECT_TYPES = [ 'content', 'directory', 'revision', 'release', 'snapshot', 'origin', 'origin_visit' ] class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the 'prefix' argument is None (default value), it will take the default value 'swh.journal.objects'. - Clients subscribe to events specific to each object type by using - the `object_types` configuration variable. + Clients subscribe to events specific to each object type as listed in the + `object_types` argument (if unset, defaults to all accepted objet types). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. Messages are processed by the `process_objects` method in batches of maximum `max_messages`. """ def __init__( - self, brokers, group_id, prefix=None, - object_types=ACCEPTED_OBJECT_TYPES, + self, brokers, group_id, prefix=None, object_types=None, max_messages=0, auto_offset_reset='earliest'): if prefix is None: prefix = DEFAULT_PREFIX + if object_types is None: + object_types = ACCEPTED_OBJECT_TYPES if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( 'Option \'auto_offset_reset\' only accept %s.' % ACCEPTED_OFFSET_RESET) for object_type in object_types: if object_type not in ACCEPTED_OBJECT_TYPES: raise ValueError( 'Option \'object_types\' only accepts %s.' % ACCEPTED_OFFSET_RESET) self.consumer = KafkaConsumer( bootstrap_servers=brokers, key_deserializer=kafka_to_key, value_deserializer=kafka_to_value, auto_offset_reset=auto_offset_reset, enable_auto_commit=False, group_id=group_id, ) self.consumer.subscribe( topics=['%s.%s' % (prefix, object_type) for object_type in object_types], ) self.max_messages = max_messages self._object_types = object_types def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ nb_messages = 0 polled = self.consumer.poll() for (partition, messages) in polled.items(): object_type = partition.topic.split('.')[-1] # Got a message from a topic we did not subscribe to. assert object_type in self._object_types, object_type worker_fn({object_type: [msg.value for msg in messages]}) nb_messages += len(messages) self.consumer.commit() return nb_messages