diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py --- a/swh/journal/publisher.py +++ b/swh/journal/publisher.py @@ -54,6 +54,9 @@ if extra_configuration: config.update(extra_configuration) + logging.debug('###### config: %s' % config) + logging.debug('###### object types: %s' % config['object_types']) + self._prepare_storage(config) self._prepare_journal(config) @@ -109,7 +112,10 @@ max_messages = self.max_messages for num, message in enumerate(self.consumer): + logging.debug('num: %s, message: %s' % (num, message)) object_type = message.topic.split('.')[-1] + logging.debug('num: %s, object_type: %s, message: %s' % ( + num, object_type, message)) messages[object_type].append(message.value) if num >= max_messages: break @@ -160,37 +166,51 @@ for object_type, objects in messages.items(): topic = '%s.%s' % (self.config['final_prefix'], object_type) for key, object in objects: + logging.debug('topic: %s, key: %s, value: %s' % ( + topic, key, object)) self.producer.send(topic, key=key, value=object) self.producer.flush() def process_contents(self, content_objs): + logging.debug('##### contents: %s' % content_objs) metadata = self.storage.content_get_metadata( (c[b'sha1'] for c in content_objs)) return [(content['sha1'], content) for content in metadata] def process_revisions(self, revision_objs): + logging.debug('##### revisions: %s' % revision_objs) metadata = self.storage.revision_get((r[b'id'] for r in revision_objs)) - return [(revision['id'], revision) for revision in metadata] + return [(revision['id'], revision) + for revision in metadata if revision] def process_releases(self, release_objs): + logging.debug('##### releases: %s' % release_objs) metadata = self.storage.release_get((r[b'id'] for r in release_objs)) return [(release['id'], release) for release in metadata] def process_origins(self, origin_objs): - return origin_objs + logging.debug('##### origins: %s' % origin_objs) + r = [] + for o in origin_objs: + origin = {'url': o[b'url'], 'type': o[b'type']} + r.append((origin, origin)) + return r def process_origin_visits(self, origin_visits): + logging.debug('##### origin_visits: %s' % origin_visits) metadata = [] for ov in origin_visits: origin_visit = self.storage.origin_visit_get_by( - ov['origin'], ov['visit']) + ov[b'origin'], ov[b'visit']) if origin_visit: - pk = ov['origin'], ov['visit'] + pk = ov[b'origin'], ov[b'visit'] + origin_visit['date'] = str(origin_visit['date']) metadata.append((pk, origin_visit)) return metadata def process_snapshots(self, snapshot_objs): + logging.debug('##### snapshots: %s' % snapshot_objs) metadata = [] for snap in snapshot_objs: full_obj = snapshot.snapshot_get_all_branches( @@ -201,10 +221,21 @@ if __name__ == '__main__': - logging.basicConfig( - level=logging.INFO, - format='%(asctime)s %(process)d %(levelname)s %(message)s' - ) - publisher = JournalPublisher() - while True: - publisher.poll() + import click + + @click.command() + @click.option('--verbose', is_flag=True, default=False, + help='Be verbose if asked.') + def main(verbose): + logging.basicConfig( + level=logging.DEBUG if verbose else logging.INFO, + format='%(asctime)s %(process)d %(levelname)s %(message)s' + ) + _log = logging.getLogger('kafka') + _log.setLevel(logging.INFO) + + publisher = JournalPublisher() + while True: + publisher.poll() + + main() diff --git a/swh/journal/tests/test_publisher.py b/swh/journal/tests/test_publisher.py --- a/swh/journal/tests/test_publisher.py +++ b/swh/journal/tests/test_publisher.py @@ -129,17 +129,18 @@ self.releases = [{b'id': c['id']} for c in RELEASES] # those needs id generation from the storage # so initialization is different than other entities - self.origins = [{'url': o['url'], - 'type': o['type']} + self.origins = [{b'url': o['url'], + b'type': o['type']} for o in self.publisher.origins] - self.origin_visits = [{'origin': ov['origin'], - 'visit': ov['visit']} + self.origin_visits = [{b'origin': ov['origin'], + b'visit': ov['visit']} for ov in self.publisher.origin_visits] # full objects storage = self.publisher.storage ovs = [] for ov in self.origin_visits: - ovs.append(storage.origin_visit_get_by(**ov)) + ovs.append(storage.origin_visit_get_by( + ov[b'origin'], ov[b'visit'])) self.expected_origin_visits = ovs def test_process_contents(self): @@ -159,7 +160,9 @@ def test_process_origins(self): actual_origins = self.publisher.process_origins(self.origins) - expected_origins = self.origins + expected_origins = [({'url': o[b'url'], 'type': o[b'type']}, + {'url': o[b'url'], 'type': o[b'type']}) + for o in self.origins] self.assertEqual(actual_origins, expected_origins) def test_process_origin_visits(self): @@ -182,10 +185,9 @@ expected_contents = [(c['sha1'], c) for c in CONTENTS] expected_revisions = [(c['id'], c) for c in REVISIONS] expected_releases = [(c['id'], c) for c in RELEASES] - expected_origins = ORIGINS + expected_origins = [(o, o) for o in ORIGINS] expected_ovs = [((ov['origin'], ov['visit']), ov) for ov in self.expected_origin_visits] - expected_objects = { 'content': expected_contents, 'revision': expected_revisions,