Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/publisher.py
Show All 23 Lines | - producing those reified objects to output topics (1 topic per | ||||
object type) | object type) | ||||
The main entry point for this class is the 'poll' method. | The main entry point for this class is the 'poll' method. | ||||
""" | """ | ||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), | 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), | ||||
'temporary_prefix': ('str', 'swh.tmp_journal.new'), | 'temporary_prefix': ('str', 'swh.tmp_journal'), | ||||
'final_prefix': ('str', 'swh.journal.objects'), | 'final_prefix': ('str', 'swh.journal.objects'), | ||||
'consumer_id': ('str', 'swh.journal.publisher'), | 'consumer_id': ('str', 'swh.journal.publisher'), | ||||
'publisher_id': ('str', 'swh.journal.publisher'), | 'publisher_id': ('str', 'swh.journal.publisher'), | ||||
'object_types': ('list[str]', ['content', 'revision', 'release']), | 'object_types': ('list[str]', ['content', 'revision', 'release']), | ||||
'mutable_object_types': ('list[str]', []), | |||||
'storage': ('dict', { | 'storage': ('dict', { | ||||
'cls': 'remote', | 'cls': 'remote', | ||||
'args': { | 'args': { | ||||
'url': 'http://localhost:5002/', | 'url': 'http://localhost:5002/', | ||||
} | } | ||||
}), | }), | ||||
Show All 30 Lines | def _prepare_journal(self, config): | ||||
key_serializer=key_to_kafka, | key_serializer=key_to_kafka, | ||||
value_serializer=key_to_kafka, | value_serializer=key_to_kafka, | ||||
client_id=config['publisher_id'], | client_id=config['publisher_id'], | ||||
) | ) | ||||
logging.debug('Subscribing to object types event: %s' % ( | logging.debug('Subscribing to object types event: %s' % ( | ||||
config['object_types'], )) | config['object_types'], )) | ||||
self.consumer.subscribe( | self.consumer.subscribe( | ||||
topics=['%s.%s' % (config['temporary_prefix'], object_type) | topics=['%s.new.%s' % (config['temporary_prefix'], object_type) | ||||
for object_type in config['object_types']], | for object_type in config['object_types']], | ||||
) | ) | ||||
self.consumer.subscribe( | |||||
topics=['%s.changed.%s' % (config['temporary_prefix'], object_type) | |||||
for object_type in config['mutable_object_types']], | |||||
) | |||||
def _prepare_storage(self, config): | def _prepare_storage(self, config): | ||||
"""Prepare the storage instance needed for the publisher to be able to | """Prepare the storage instance needed for the publisher to be able to | ||||
discuss with the storage to retrieve the objects. | discuss with the storage to retrieve the objects. | ||||
""" | """ | ||||
self.storage = get_storage(**config['storage']) | self.storage = get_storage(**config['storage']) | ||||
▲ Show 20 Lines • Show All 144 Lines • Show Last 20 Lines |