diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -44,11 +44,11 @@ conf = config.read(config_file) ctx.ensure_object(dict) - logger = logging.getLogger(__name__) - logger.setLevel(log_level) - - _log = logging.getLogger('kafka') - _log.setLevel(logging.INFO) + logging.basicConfig( + level=log_level, + format='%(asctime)s %(process)d %(levelname)s %(message)s' + ) + logging.getLogger('kafka').setLevel(logging.INFO) ctx.obj['config'] = conf ctx.obj['loglevel'] = log_level diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -73,7 +73,7 @@ if extra_configuration: self.config.update(extra_configuration) - self.log = logging.getLogger('swh.journal.client.JournalClient') + self.log = logging.getLogger(__name__) auto_offset_reset = self.config['auto_offset_reset'] if auto_offset_reset not in ACCEPTED_OFFSET_RESET: diff --git a/swh/journal/direct_writer.py b/swh/journal/direct_writer.py --- a/swh/journal/direct_writer.py +++ b/swh/journal/direct_writer.py @@ -9,14 +9,13 @@ from .serializers import key_to_kafka, value_to_kafka -logger = logging.getLogger(__name__) - class DirectKafkaWriter: """This class is instantiated and used by swh-storage to write incoming new objects to Kafka before adding them to the storage backend (eg. postgresql) itself.""" def __init__(self, brokers, prefix, client_id): + self.log = logging.getLogger(__name__) self._prefix = prefix self.producer = KafkaProducer( @@ -54,7 +53,7 @@ topic = '%s.%s' % (self._prefix, object_type) key = self._get_key(object_type, object_) object_ = self._sanitize_object(object_type, object_) - logger.debug('topic: %s, key: %s, value: %s' % (topic, key, object_)) + self.log.debug('topic: %s, key: %s, value: %s' % (topic, key, object_)) self.producer.send(topic, key=key, value=object_) write_update = write_addition diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py --- a/swh/journal/publisher.py +++ b/swh/journal/publisher.py @@ -13,8 +13,6 @@ from .serializers import kafka_to_key, key_to_kafka -logger = logging.getLogger(__name__) - MANDATORY_KEYS = [ 'brokers', 'temporary_prefix', 'final_prefix', 'consumer_id', @@ -34,12 +32,12 @@ """ def __init__(self, config): + self.log = logging.getLogger(__name__) self.config = config self.check_config(config) self._prepare_storage(config) self._prepare_journal(config) self.max_messages = self.config['max_messages'] - logger.setLevel(logging.DEBUG) def check_config(self, config): """Check the configuration is fine. @@ -70,6 +68,7 @@ enable_auto_commit=False, group_id=config['consumer_id'], ) + self.producer = KafkaProducer( bootstrap_servers=config['brokers'], key_serializer=key_to_kafka, @@ -77,8 +76,9 @@ client_id=config['publisher_id'], ) - logger.info('Subscribing to object types event: %s' % ( - config['object_types'], )) + self.log.info('Subscribing to object types event: %s' % + config['object_types']) + self.consumer.subscribe( topics=['%s.%s' % (config['temporary_prefix'], object_type) for object_type in config['object_types']], @@ -110,13 +110,13 @@ for num, message in enumerate(self.consumer): object_type = message.topic.split('.')[-1] - logger.debug('num: %s, object_type: %s, message: %s' % ( - num+1, object_type, message)) + self.log.debug('num: %s, object_type: %s, message: %s' % + (num+1, object_type, message)) messages[object_type].append(message.value) if num + 1 >= self.max_messages: break - logger.debug('number of messages: %s', num+1) + self.log.debug('number of messages: %s', num+1) new_objects = self.process_objects(messages) self.produce_messages(new_objects) @@ -164,31 +164,31 @@ for object_type, objects in messages.items(): topic = '%s.%s' % (self.config['final_prefix'], object_type) for key, object in objects: - logger.debug('topic: %s, key: %s, value: %s' % ( - topic, key, object)) + self.log.debug('topic: %s, key: %s, value: %s' % + (topic, key, object)) self.producer.send(topic, key=key, value=object) self.producer.flush() def process_contents(self, content_objs): - logger.debug('contents: %s' % content_objs) + self.log.debug('contents: %s' % content_objs) metadata = self.storage.content_get_metadata( (c[b'sha1'] for c in content_objs)) return [(content['sha1'], content) for content in metadata] def process_revisions(self, revision_objs): - logger.debug('revisions: %s' % revision_objs) + self.log.debug('revisions: %s' % revision_objs) metadata = self.storage.revision_get((r[b'id'] for r in revision_objs)) return [(revision['id'], revision) for revision in metadata if revision] def process_releases(self, release_objs): - logger.debug('releases: %s' % release_objs) + self.log.debug('releases: %s' % release_objs) metadata = self.storage.release_get((r[b'id'] for r in release_objs)) return [(release['id'], release) for release in metadata] def process_origins(self, origin_objs): - logger.debug('origins: %s' % origin_objs) + self.log.debug('origins: %s' % origin_objs) r = [] for o in origin_objs: origin = {'url': o[b'url'], 'type': o[b'type']} @@ -196,7 +196,7 @@ return r def process_origin_visits(self, origin_visits): - logger.debug('origin_visits: %s' % origin_visits) + self.log.debug('origin_visits: %s' % origin_visits) metadata = [] for ov in origin_visits: origin_visit = self.storage.origin_visit_get_by( @@ -208,7 +208,7 @@ return metadata def process_snapshots(self, snapshot_objs): - logger.debug('snapshots: %s' % snapshot_objs) + self.log.debug('snapshots: %s' % snapshot_objs) metadata = [] for snap in snapshot_objs: full_obj = snapshot.snapshot_get_all_branches( diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -9,8 +9,6 @@ from .serializers import kafka_to_value -logger = logging.getLogger(__name__) - OBJECT_TYPES = frozenset([ 'origin', 'origin_visit', 'snapshot', 'release', 'revision', @@ -24,7 +22,7 @@ if not set(object_types).issubset(OBJECT_TYPES): raise ValueError('Unknown object types: %s' % ', '.join( set(object_types) - OBJECT_TYPES)) - + self.log = logging.getLogger(__name__) self._object_types = object_types self.consumer = KafkaConsumer( bootstrap_servers=brokers,