diff --git a/debian/control b/debian/control --- a/debian/control +++ b/debian/control @@ -9,12 +9,13 @@ python3-nose, python3-setuptools, python3-swh.core, - python3-swh.storage, + python3-swh.storage (>= 0.0.108~), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DJNL/ Package: python3-swh.journal Architecture: all -Depends: python3-kafka (>= 1.3), ${misc:Depends}, ${python3:Depends} +Depends: python3-kafka (>= 1.3), python3-swh.storage (>= 0.0.108~), + ${misc:Depends}, ${python3:Depends} Description: Software Heritage Journal utilities diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,2 @@ swh.core -swh.storage +swh.storage >= 0.0.108 diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py --- a/swh/journal/publisher.py +++ b/swh/journal/publisher.py @@ -10,11 +10,22 @@ from swh.core.config import SWHConfig from swh.storage import get_storage +from swh.storage.algos import snapshot from .serializers import kafka_to_key, key_to_kafka class SWHJournalPublisher(SWHConfig): + """The journal publisher is a layer in charge of: + + - consuming messages from topics (1 topic per object_type) + - reify the object ids read from those topics (using the storage) + - producing those reified objects to output topics (1 topic per + object type) + + The main entry point for this class is the 'poll' method. + + """ DEFAULT_CONFIG = { 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), @@ -49,7 +60,10 @@ self.max_messages = self.config['max_messages'] def _prepare_journal(self, config): + """Prepare the consumer and subscriber instances for the publisher to + actually be able to discuss with the journal. + """ # yes, the temporary topics contain values that are actually _keys_ self.consumer = KafkaConsumer( bootstrap_servers=config['brokers'], @@ -71,17 +85,33 @@ ) def _prepare_storage(self, config): + """Prepare the storage instance needed for the publisher to be able to + discuss with the storage to retrieve the objects. + + """ self.storage = get_storage(**config['storage']) - def poll(self): - """Process a batch of messages""" - num = 0 + def poll(self, max_messages=None): + """Process a batch of messages from the consumer's topics. Use the + storage to reify those ids. Produces back those reified + objects to the production topics. + + This method polls a given amount of message then stops. + The number of messages to consume is either provided or + configured as fallback. + + The following method is expected to be called from within a + loop. + + """ messages = defaultdict(list) + if max_messages is None: + max_messages = self.max_messages for num, message in enumerate(self.consumer): object_type = message.topic.split('.')[-1] messages[object_type].append(message.value) - if num >= self.max_messages: + if num >= max_messages: break new_objects = self.process_objects(messages) @@ -89,10 +119,25 @@ self.consumer.commit() def process_objects(self, messages): + """Given a dict of messages {object type: [object id]}, reify those + ids to swh object from the storage and returns a + corresponding dict. + + Args: + messages (dict): Dict of {object_type: [id-as-bytes]} + + Returns: + Dict of {object_type: [tuple]}. + + object_type (str): content, revision, release + tuple (bytes, dict): object id as bytes, object as swh dict. + + """ processors = { 'content': self.process_contents, 'revision': self.process_revisions, 'release': self.process_releases, + 'snapshot': self.process_snapshots, } return { @@ -101,6 +146,15 @@ } def produce_messages(self, messages): + """Produce new swh object to the producer topic. + + Args: + messages ([dict]): Dict of {object_type: [tuple]}. + + object_type (str): content, revision, release + tuple (bytes, dict): object id as bytes, object as swh dict. + + """ for object_type, objects in messages.items(): topic = '%s.%s' % (self.config['final_prefix'], object_type) for key, object in objects: @@ -121,6 +175,15 @@ metadata = self.storage.release_get((r[b'id'] for r in release_objs)) return [(release['id'], release) for release in metadata] + def process_snapshots(self, snapshot_objs): + metadata = [] + for snap in snapshot_objs: + full_obj = snapshot.snapshot_get_all_branches( + self.storage, snap[b'id']) + metadata.append((full_obj['id'], full_obj)) + + return metadata + if __name__ == '__main__': logging.basicConfig(