diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py --- a/swh/journal/publisher.py +++ b/swh/journal/publisher.py @@ -15,6 +15,16 @@ class SWHJournalPublisher(SWHConfig): + """The journal publisher is a layer in charge of: + + - consuming messages from topics (1 topic per object_type) + - reify the object ids read from those topics (using the storage) + - producing those reified objects to output topics (1 topic per + object type) + + The main entry point for this class is the 'poll' method. + + """ DEFAULT_CONFIG = { 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), @@ -49,7 +59,10 @@ self.max_messages = self.config['max_messages'] def _prepare_journal(self, config): + """Prepare the consumer and subscriber instances for the publisher to + actually be able to discuss with the journal. + """ # yes, the temporary topics contain values that are actually _keys_ self.consumer = KafkaConsumer( bootstrap_servers=config['brokers'], @@ -71,17 +84,33 @@ ) def _prepare_storage(self, config): + """Prepare the storage instance needed for the publisher to be able to + discuss with the storage to retrieve the objects. + + """ self.storage = get_storage(**config['storage']) - def poll(self): - """Process a batch of messages""" - num = 0 + def poll(self, max_messages=None): + """Process a batch of messages from the consumer's topics. Use the + storage to reify those ids. Produces back those reified + objects to the production topics. + + This method polls a given amount of message then stops. + The number of messages to consume is either provided or + configured as fallback. + + The following method is expected to be called from within a + loop. + + """ messages = defaultdict(list) + if max_messages is None: + max_messages = self.max_messages for num, message in enumerate(self.consumer): object_type = message.topic.split('.')[-1] messages[object_type].append(message.value) - if num >= self.max_messages: + if num >= max_messages: break new_objects = self.process_objects(messages) @@ -89,6 +118,20 @@ self.consumer.commit() def process_objects(self, messages): + """Given a dict of messages {object type: [object id]}, reify those + ids to swh object from the storage and returns a + corresponding dict. + + Args: + messages (dict): Dict of {object_type: [id-as-bytes]} + + Returns: + Dict of {object_type: [tuple]}. + + object_type (str): content, revision, release + tuple (bytes, dict): object id as bytes, object as swh dict. + + """ processors = { 'content': self.process_contents, 'revision': self.process_revisions, @@ -101,6 +144,15 @@ } def produce_messages(self, messages): + """Produce new swh object to the producer topic. + + Args: + messages ([dict]): Dict of {object_type: [tuple]}. + + object_type (str): content, revision, release + tuple (bytes, dict): object id as bytes, object as swh dict. + + """ for object_type, objects in messages.items(): topic = '%s.%s' % (self.config['final_prefix'], object_type) for key, object in objects: