Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/publisher.py
Show First 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | class JournalPublisher(SWHConfig): | ||||
} | } | ||||
CONFIG_BASE_FILENAME = 'journal/publisher' | CONFIG_BASE_FILENAME = 'journal/publisher' | ||||
def __init__(self, extra_configuration=None): | def __init__(self, extra_configuration=None): | ||||
self.config = config = self.parse_config_file() | self.config = config = self.parse_config_file() | ||||
if extra_configuration: | if extra_configuration: | ||||
config.update(extra_configuration) | config.update(extra_configuration) | ||||
self._prepare_storage(config) | self._prepare_storage(config) | ||||
vlorentz: why `######`? | |||||
Done Inline Actionsarf to see them, i forgot to remove those. ardumont: arf to see them, i forgot to remove those. | |||||
self._prepare_journal(config) | self._prepare_journal(config) | ||||
self.max_messages = self.config['max_messages'] | self.max_messages = self.config['max_messages'] | ||||
def _prepare_journal(self, config): | def _prepare_journal(self, config): | ||||
"""Prepare the consumer and subscriber instances for the publisher to | """Prepare the consumer and subscriber instances for the publisher to | ||||
actually be able to discuss with the journal. | actually be able to discuss with the journal. | ||||
Show All 38 Lines | def poll(self, max_messages=None): | ||||
loop. | loop. | ||||
""" | """ | ||||
messages = defaultdict(list) | messages = defaultdict(list) | ||||
if max_messages is None: | if max_messages is None: | ||||
max_messages = self.max_messages | max_messages = self.max_messages | ||||
for num, message in enumerate(self.consumer): | for num, message in enumerate(self.consumer): | ||||
logging.debug('num: %s, message: %s' % (num, message)) | |||||
object_type = message.topic.split('.')[-1] | object_type = message.topic.split('.')[-1] | ||||
logging.debug('num: %s, object_type: %s, message: %s' % ( | |||||
num, object_type, message)) | |||||
messages[object_type].append(message.value) | messages[object_type].append(message.value) | ||||
if num >= max_messages: | if num >= max_messages: | ||||
break | break | ||||
new_objects = self.process_objects(messages) | new_objects = self.process_objects(messages) | ||||
self.produce_messages(new_objects) | self.produce_messages(new_objects) | ||||
self.consumer.commit() | self.consumer.commit() | ||||
Show All 34 Lines | def produce_messages(self, messages): | ||||
object_type (str): content, revision, release | object_type (str): content, revision, release | ||||
tuple (bytes, dict): object id as bytes, object as swh dict. | tuple (bytes, dict): object id as bytes, object as swh dict. | ||||
""" | """ | ||||
for object_type, objects in messages.items(): | for object_type, objects in messages.items(): | ||||
topic = '%s.%s' % (self.config['final_prefix'], object_type) | topic = '%s.%s' % (self.config['final_prefix'], object_type) | ||||
for key, object in objects: | for key, object in objects: | ||||
logging.debug('topic: %s, key: %s, value: %s' % ( | |||||
topic, key, object)) | |||||
self.producer.send(topic, key=key, value=object) | self.producer.send(topic, key=key, value=object) | ||||
self.producer.flush() | self.producer.flush() | ||||
def process_contents(self, content_objs): | def process_contents(self, content_objs): | ||||
Not Done Inline Actionssame question here and below vlorentz: same question here and below | |||||
metadata = self.storage.content_get_metadata( | metadata = self.storage.content_get_metadata( | ||||
(c[b'sha1'] for c in content_objs)) | (c[b'sha1'] for c in content_objs)) | ||||
return [(content['sha1'], content) for content in metadata] | return [(content['sha1'], content) for content in metadata] | ||||
def process_revisions(self, revision_objs): | def process_revisions(self, revision_objs): | ||||
metadata = self.storage.revision_get((r[b'id'] for r in revision_objs)) | metadata = self.storage.revision_get((r[b'id'] for r in revision_objs)) | ||||
return [(revision['id'], revision) for revision in metadata] | return [(revision['id'], revision) for revision in metadata] | ||||
Show All 20 Lines | def process_snapshots(self, snapshot_objs): | ||||
full_obj = snapshot.snapshot_get_all_branches( | full_obj = snapshot.snapshot_get_all_branches( | ||||
self.storage, snap[b'id']) | self.storage, snap[b'id']) | ||||
metadata.append((full_obj['id'], full_obj)) | metadata.append((full_obj['id'], full_obj)) | ||||
return metadata | return metadata | ||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
import click | |||||
@click.command() | |||||
@click.option('--verbose', is_flag=True, default=False, | |||||
help='Be verbose if asked.') | |||||
def main(verbose): | |||||
logging.basicConfig( | logging.basicConfig( | ||||
level=logging.INFO, | level=logging.DEBUG if verbose else logging.INFO, | ||||
format='%(asctime)s %(process)d %(levelname)s %(message)s' | format='%(asctime)s %(process)d %(levelname)s %(message)s' | ||||
) | ) | ||||
publisher = JournalPublisher() | publisher = JournalPublisher() | ||||
while True: | while True: | ||||
publisher.poll() | publisher.poll() | ||||
main() |
why ######?