diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py --- a/swh/journal/publisher.py +++ b/swh/journal/publisher.py @@ -13,6 +13,8 @@ from .serializers import kafka_to_key, key_to_kafka +logger = logging.getLogger(__name__) + class JournalPublisher: """The journal publisher is a layer in charge of: @@ -51,7 +53,7 @@ client_id=config['publisher_id'], ) - logging.debug('Subscribing to object types event: %s' % ( + logger.info('Subscribing to object types event: %s' % ( config['object_types'], )) self.consumer.subscribe( topics=['%s.%s' % (config['temporary_prefix'], object_type) @@ -84,12 +86,14 @@ for num, message in enumerate(self.consumer): object_type = message.topic.split('.')[-1] - logging.debug('num: %s, object_type: %s, message: %s' % ( + logger.debug('num: %s, object_type: %s, message: %s' % ( num, object_type, message)) messages[object_type].append(message.value) if num + 1 >= self.max_messages: break + logger.debug('number of messages: %s', num) + new_objects = self.process_objects(messages) self.produce_messages(new_objects) self.consumer.commit() @@ -136,31 +140,31 @@ for object_type, objects in messages.items(): topic = '%s.%s' % (self.config['final_prefix'], object_type) for key, object in objects: - logging.debug('topic: %s, key: %s, value: %s' % ( + logger.debug('topic: %s, key: %s, value: %s' % ( topic, key, object)) self.producer.send(topic, key=key, value=object) self.producer.flush() def process_contents(self, content_objs): - logging.debug('contents: %s' % content_objs) + logger.debug('contents: %s' % content_objs) metadata = self.storage.content_get_metadata( (c[b'sha1'] for c in content_objs)) return [(content['sha1'], content) for content in metadata] def process_revisions(self, revision_objs): - logging.debug('revisions: %s' % revision_objs) + logger.debug('revisions: %s' % revision_objs) metadata = self.storage.revision_get((r[b'id'] for r in revision_objs)) return [(revision['id'], revision) for revision in metadata if revision] def process_releases(self, release_objs): - logging.debug('releases: %s' % release_objs) + logger.debug('releases: %s' % release_objs) metadata = self.storage.release_get((r[b'id'] for r in release_objs)) return [(release['id'], release) for release in metadata] def process_origins(self, origin_objs): - logging.debug('origins: %s' % origin_objs) + logger.debug('origins: %s' % origin_objs) r = [] for o in origin_objs: origin = {'url': o[b'url'], 'type': o[b'type']} @@ -168,7 +172,7 @@ return r def process_origin_visits(self, origin_visits): - logging.debug('origin_visits: %s' % origin_visits) + logger.debug('origin_visits: %s' % origin_visits) metadata = [] for ov in origin_visits: origin_visit = self.storage.origin_visit_get_by( @@ -180,7 +184,7 @@ return metadata def process_snapshots(self, snapshot_objs): - logging.debug('snapshots: %s' % snapshot_objs) + logger.debug('snapshots: %s' % snapshot_objs) metadata = [] for snap in snapshot_objs: full_obj = snapshot.snapshot_get_all_branches(