Page MenuHomeSoftware Heritage

D610.diff
No OneTemporary

D610.diff

diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py
--- a/swh/journal/publisher.py
+++ b/swh/journal/publisher.py
@@ -15,6 +15,16 @@
class SWHJournalPublisher(SWHConfig):
+ """The journal publisher is a layer in charge of:
+
+ - consuming messages from topics (1 topic per object_type)
+ - reify the object ids read from those topics (using the storage)
+ - producing those reified objects to output topics (1 topic per
+ object type)
+
+ The main entry point for this class is the 'poll' method.
+
+ """
DEFAULT_CONFIG = {
'brokers': ('list[str]', ['getty.internal.softwareheritage.org']),
@@ -49,7 +59,10 @@
self.max_messages = self.config['max_messages']
def _prepare_journal(self, config):
+ """Prepare the consumer and subscriber instances for the publisher to
+ actually be able to discuss with the journal.
+ """
# yes, the temporary topics contain values that are actually _keys_
self.consumer = KafkaConsumer(
bootstrap_servers=config['brokers'],
@@ -71,17 +84,33 @@
)
def _prepare_storage(self, config):
+ """Prepare the storage instance needed for the publisher to be able to
+ discuss with the storage to retrieve the objects.
+
+ """
self.storage = get_storage(**config['storage'])
- def poll(self):
- """Process a batch of messages"""
- num = 0
+ def poll(self, max_messages=None):
+ """Process a batch of messages from the consumer's topics. Use the
+ storage to reify those ids. Produces back those reified
+ objects to the production topics.
+
+ This method polls a given amount of message then stops.
+ The number of messages to consume is either provided or
+ configured as fallback.
+
+ The following method is expected to be called from within a
+ loop.
+
+ """
messages = defaultdict(list)
+ if max_messages is None:
+ max_messages = self.max_messages
for num, message in enumerate(self.consumer):
object_type = message.topic.split('.')[-1]
messages[object_type].append(message.value)
- if num >= self.max_messages:
+ if num >= max_messages:
break
new_objects = self.process_objects(messages)
@@ -89,6 +118,20 @@
self.consumer.commit()
def process_objects(self, messages):
+ """Given a dict of messages {object type: [object id]}, reify those
+ ids to swh object from the storage and returns a
+ corresponding dict.
+
+ Args:
+ messages (dict): Dict of {object_type: [id-as-bytes]}
+
+ Returns:
+ Dict of {object_type: [tuple]}.
+
+ object_type (str): content, revision, release
+ tuple (bytes, dict): object id as bytes, object as swh dict.
+
+ """
processors = {
'content': self.process_contents,
'revision': self.process_revisions,
@@ -101,6 +144,15 @@
}
def produce_messages(self, messages):
+ """Produce new swh object to the producer topic.
+
+ Args:
+ messages ([dict]): Dict of {object_type: [tuple]}.
+
+ object_type (str): content, revision, release
+ tuple (bytes, dict): object id as bytes, object as swh dict.
+
+ """
for object_type, objects in messages.items():
topic = '%s.%s' % (self.config['final_prefix'], object_type)
for key, object in objects:

File Metadata

Mime Type
text/plain
Expires
Fri, Jun 20, 5:27 PM (2 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220061

Event Timeline