Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/publisher.py
Show First 20 Lines • Show All 109 Lines • ▼ Show 20 Lines | def poll(self, max_messages=None): | ||||
if max_messages is None: | if max_messages is None: | ||||
max_messages = self.max_messages | max_messages = self.max_messages | ||||
for num, message in enumerate(self.consumer): | for num, message in enumerate(self.consumer): | ||||
object_type = message.topic.split('.')[-1] | object_type = message.topic.split('.')[-1] | ||||
logging.debug('num: %s, object_type: %s, message: %s' % ( | logging.debug('num: %s, object_type: %s, message: %s' % ( | ||||
num, object_type, message)) | num, object_type, message)) | ||||
messages[object_type].append(message.value) | messages[object_type].append(message.value) | ||||
if num >= max_messages: | if num + 1 >= self.max_messages: | ||||
break | break | ||||
new_objects = self.process_objects(messages) | new_objects = self.process_objects(messages) | ||||
self.produce_messages(new_objects) | self.produce_messages(new_objects) | ||||
self.consumer.commit() | self.consumer.commit() | ||||
def process_objects(self, messages): | def process_objects(self, messages): | ||||
"""Given a dict of messages {object type: [object id]}, reify those | """Given a dict of messages {object type: [object id]}, reify those | ||||
▲ Show 20 Lines • Show All 113 Lines • Show Last 20 Lines |