Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/publisher.py
Show First 20 Lines • Show All 67 Lines • ▼ Show 20 Lines | def _prepare_journal(self, config): | ||||
value_deserializer=kafka_to_key, | value_deserializer=kafka_to_key, | ||||
auto_offset_reset='earliest', | auto_offset_reset='earliest', | ||||
enable_auto_commit=False, | enable_auto_commit=False, | ||||
group_id=config['consumer_id'], | group_id=config['consumer_id'], | ||||
) | ) | ||||
self.producer = KafkaProducer( | self.producer = KafkaProducer( | ||||
bootstrap_servers=config['brokers'], | bootstrap_servers=config['brokers'], | ||||
key_serializer=key_to_kafka, | key_serializer=key_to_kafka, | ||||
value_serializer=key_to_kafka, | value_serializer=value_to_kafka, | ||||
client_id=config['publisher_id'], | client_id=config['publisher_id'], | ||||
) | ) | ||||
logger.info('Subscribing to object types event: %s' % ( | logger.info('Subscribing to object types event: %s' % ( | ||||
config['object_types'], )) | config['object_types'], )) | ||||
self.consumer.subscribe( | self.consumer.subscribe( | ||||
topics=['%s.%s' % (config['temporary_prefix'], object_type) | topics=['%s.%s' % (config['temporary_prefix'], object_type) | ||||
for object_type in config['object_types']], | for object_type in config['object_types']], | ||||
▲ Show 20 Lines • Show All 138 Lines • Show Last 20 Lines |