diff --git a/swh/journal/cli.py b/swh/journal/cli.py index df44939..5c1a02a 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,89 +1,88 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging import os from swh.core import config from swh.storage import get_storage from swh.journal.replay import StorageReplayer CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @click.group(context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.option('--log-level', '-l', default='INFO', type=click.Choice(logging._nameToLevel.keys()), help="Log level (default to INFO)") @click.pass_context def cli(ctx, config_file, log_level): """Software Heritage Scheduler CLI interface Default to use the the local scheduler instance (plugged to the main scheduler db). """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') if not config_file: raise ValueError('You must either pass a config-file parameter ' 'or set SWH_CONFIG_FILENAME to target ' 'the config-file') if not os.path.exists(config_file): raise ValueError('%s does not exist' % config_file) conf = config.read(config_file) ctx.ensure_object(dict) logging.basicConfig( level=log_level, format='%(asctime)s %(levelname)s %(name)s %(message)s', ) - _log = logging.getLogger('kafka') - _log.setLevel(logging.INFO) + logging.getLogger('kafka').setLevel(logging.INFO) ctx.obj['config'] = conf ctx.obj['loglevel'] = log_level @cli.command() @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to.') @click.option('--prefix', type=str, default='swh.journal.objects', help='Prefix of Kafka topic names to read from.') @click.option('--consumer-id', type=str, help='Name of the consumer/group id for reading from Kafka.') @click.pass_context def replay(ctx, brokers, prefix, consumer_id, max_messages): """Fill a new storage by reading a journal. """ conf = ctx.obj['config'] storage = get_storage(**conf.pop('storage')) replayer = StorageReplayer(brokers, prefix, consumer_id) try: replayer.fill(storage, max_messages=max_messages) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') def main(): return cli(auto_envvar_prefix='SWH_JOURNAL') if __name__ == '__main__': main() diff --git a/swh/journal/client.py b/swh/journal/client.py index fda8348..27dc2d9 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,134 +1,129 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import logging - from abc import ABCMeta, abstractmethod from collections import defaultdict from kafka import KafkaConsumer from swh.core.config import SWHConfig from .serializers import kafka_to_key, kafka_to_value - # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] # Only accepted object types ACCEPTED_OBJECT_TYPES = [ 'content', 'revision', 'release', 'occurrence', 'origin', 'origin_visit' ] class JournalClient(SWHConfig, metaclass=ABCMeta): """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. Clients subscribe to events specific to each object type by using the `object_types` configuration variable. Clients can be sharded by setting the `client_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same client_id. Messages are processed by the `process_objects` method in batches of maximum `max_messages`. """ DEFAULT_CONFIG = { # Broker to connect to 'brokers': ('list[str]', ['localhost']), # Prefix topic to receive notification from 'topic_prefix': ('str', 'swh.journal.objects'), # Consumer identifier 'consumer_id': ('str', 'swh.journal.client'), # Object types to deal with (in a subscription manner) 'object_types': ('list[str]', [ 'content', 'revision', 'release', 'occurrence', 'origin', 'origin_visit' ]), # Number of messages to batch process 'max_messages': ('int', 100), 'auto_offset_reset': ('str', 'earliest') } CONFIG_BASE_FILENAME = 'journal/client' ADDITIONAL_CONFIG = {} def __init__(self, extra_configuration={}): self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) if extra_configuration: self.config.update(extra_configuration) - self.log = logging.getLogger('swh.journal.client.JournalClient') - auto_offset_reset = self.config['auto_offset_reset'] if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( 'Option \'auto_offset_reset\' only accept %s.' % ACCEPTED_OFFSET_RESET) object_types = self.config['object_types'] for object_type in object_types: if object_type not in ACCEPTED_OBJECT_TYPES: raise ValueError( 'Option \'object_types\' only accepts %s.' % ACCEPTED_OFFSET_RESET) self.consumer = KafkaConsumer( bootstrap_servers=self.config['brokers'], key_deserializer=kafka_to_key, value_deserializer=kafka_to_value, auto_offset_reset=auto_offset_reset, enable_auto_commit=False, group_id=self.config['consumer_id'], ) self.consumer.subscribe( topics=['%s.%s' % (self.config['topic_prefix'], object_type) for object_type in object_types], ) self.max_messages = self.config['max_messages'] def process(self): """Main entry point to process event message reception. """ while True: messages = defaultdict(list) for num, message in enumerate(self.consumer): object_type = message.topic.split('.')[-1] messages[object_type].append(message.value) if num + 1 >= self.max_messages: break self.process_objects(messages) self.consumer.commit() # Override the following method in the sub-classes @abstractmethod def process_objects(self, messages): """Process the objects (store, compute, etc...) Args: messages (dict): Dict of key object_type (as per configuration) and their associated values. """ pass diff --git a/swh/journal/direct_writer.py b/swh/journal/direct_writer.py index bff463c..f009abc 100644 --- a/swh/journal/direct_writer.py +++ b/swh/journal/direct_writer.py @@ -1,69 +1,69 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from kafka import KafkaProducer from .serializers import key_to_kafka, value_to_kafka logger = logging.getLogger(__name__) class DirectKafkaWriter: """This class is instantiated and used by swh-storage to write incoming new objects to Kafka before adding them to the storage backend (eg. postgresql) itself.""" def __init__(self, brokers, prefix, client_id): self._prefix = prefix self.producer = KafkaProducer( bootstrap_servers=brokers, key_serializer=key_to_kafka, value_serializer=value_to_kafka, client_id=client_id, ) def send(self, topic, key, value): self.producer.send(topic=topic, key=key, value=value) def _get_key(self, object_type, object_): if object_type in ('revision', 'release', 'directory', 'snapshot'): return object_['id'] elif object_type == 'content': return object_['sha1'] # TODO: use a dict of hashes elif object_type == 'origin': return {'url': object_['url'], 'type': object_['type']} elif object_type == 'origin_visit': return { 'origin': object_['origin'], 'date': str(object_['date']), } else: raise ValueError('Unknown object type: %s.' % object_type) def _sanitize_object(self, object_type, object_): if object_type == 'origin_visit': # Compatibility with the publisher's format return { **object_, 'date': str(object_['date']), } elif object_type == 'origin': assert 'id' not in object_ return object_ def write_addition(self, object_type, object_): topic = '%s.%s' % (self._prefix, object_type) key = self._get_key(object_type, object_) object_ = self._sanitize_object(object_type, object_) - logger.debug('topic: %s, key: %s, value: %s' % (topic, key, object_)) + logger.debug('topic: %s, key: %s, value: %s', topic, key, object_) self.send(topic, key=key, value=object_) write_update = write_addition def write_additions(self, object_type, objects): for object_ in objects: self.write_addition(object_type, object_)