diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py index 9774c44..5a9dd63 100644 --- a/swh/journal/publisher.py +++ b/swh/journal/publisher.py @@ -1,198 +1,199 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging from kafka import KafkaProducer, KafkaConsumer from swh.storage import get_storage from swh.storage.algos import snapshot from .serializers import kafka_to_key, key_to_kafka logger = logging.getLogger(__name__) class JournalPublisher: """The journal publisher is a layer in charge of: - consuming messages from topics (1 topic per object_type) - reify the object ids read from those topics (using the storage) - producing those reified objects to output topics (1 topic per object type) The main entry point for this class is the 'poll' method. """ def __init__(self, config): self.config = config self._prepare_storage(config) self._prepare_journal(config) self.max_messages = self.config['max_messages'] + logger.setLevel(logging.DEBUG) def _prepare_journal(self, config): """Prepare the consumer and subscriber instances for the publisher to actually be able to discuss with the journal. """ # yes, the temporary topics contain values that are actually _keys_ self.consumer = KafkaConsumer( bootstrap_servers=config['brokers'], value_deserializer=kafka_to_key, auto_offset_reset='earliest', enable_auto_commit=False, group_id=config['consumer_id'], ) self.producer = KafkaProducer( bootstrap_servers=config['brokers'], key_serializer=key_to_kafka, value_serializer=key_to_kafka, client_id=config['publisher_id'], ) logger.info('Subscribing to object types event: %s' % ( config['object_types'], )) self.consumer.subscribe( topics=['%s.%s' % (config['temporary_prefix'], object_type) for object_type in config['object_types']], ) def _prepare_storage(self, config): """Prepare the storage instance needed for the publisher to be able to discuss with the storage to retrieve the objects. """ self.storage = get_storage(**config['storage']) def poll(self, max_messages=None): """Process a batch of messages from the consumer's topics. Use the storage to reify those ids. Produces back those reified objects to the production topics. This method polls a given amount of message then stops. The number of messages to consume is either provided or configured as fallback. The following method is expected to be called from within a loop. """ messages = defaultdict(list) if max_messages is None: max_messages = self.max_messages for num, message in enumerate(self.consumer): object_type = message.topic.split('.')[-1] logger.debug('num: %s, object_type: %s, message: %s' % ( num, object_type, message)) messages[object_type].append(message.value) if num + 1 >= self.max_messages: break logger.debug('number of messages: %s', num) new_objects = self.process_objects(messages) self.produce_messages(new_objects) self.consumer.commit() def process_objects(self, messages): """Given a dict of messages {object type: [object id]}, reify those ids to swh object from the storage and returns a corresponding dict. Args: messages (dict): Dict of {object_type: [id-as-bytes]} Returns: Dict of {object_type: [tuple]}. object_type (str): content, revision, release tuple (bytes, dict): object id as bytes, object as swh dict. """ processors = { 'content': self.process_contents, 'revision': self.process_revisions, 'release': self.process_releases, 'snapshot': self.process_snapshots, 'origin': self.process_origins, 'origin_visit': self.process_origin_visits, } return { key: processors[key](value) for key, value in messages.items() } def produce_messages(self, messages): """Produce new swh object to the producer topic. Args: messages ([dict]): Dict of {object_type: [tuple]}. object_type (str): content, revision, release tuple (bytes, dict): object id as bytes, object as swh dict. """ for object_type, objects in messages.items(): topic = '%s.%s' % (self.config['final_prefix'], object_type) for key, object in objects: logger.debug('topic: %s, key: %s, value: %s' % ( topic, key, object)) self.producer.send(topic, key=key, value=object) self.producer.flush() def process_contents(self, content_objs): logger.debug('contents: %s' % content_objs) metadata = self.storage.content_get_metadata( (c[b'sha1'] for c in content_objs)) return [(content['sha1'], content) for content in metadata] def process_revisions(self, revision_objs): logger.debug('revisions: %s' % revision_objs) metadata = self.storage.revision_get((r[b'id'] for r in revision_objs)) return [(revision['id'], revision) for revision in metadata if revision] def process_releases(self, release_objs): logger.debug('releases: %s' % release_objs) metadata = self.storage.release_get((r[b'id'] for r in release_objs)) return [(release['id'], release) for release in metadata] def process_origins(self, origin_objs): logger.debug('origins: %s' % origin_objs) r = [] for o in origin_objs: origin = {'url': o[b'url'], 'type': o[b'type']} r.append((origin, origin)) return r def process_origin_visits(self, origin_visits): logger.debug('origin_visits: %s' % origin_visits) metadata = [] for ov in origin_visits: origin_visit = self.storage.origin_visit_get_by( ov[b'origin'], ov[b'visit']) if origin_visit: pk = ov[b'origin'], ov[b'visit'] origin_visit['date'] = str(origin_visit['date']) metadata.append((pk, origin_visit)) return metadata def process_snapshots(self, snapshot_objs): logger.debug('snapshots: %s' % snapshot_objs) metadata = [] for snap in snapshot_objs: full_obj = snapshot.snapshot_get_all_branches( self.storage, snap[b'id']) metadata.append((full_obj['id'], full_obj)) return metadata if __name__ == '__main__': print('Please use the "swh-journal publisher run" command') diff --git a/swh/journal/tests/test_publisher2.py b/swh/journal/tests/test_publisher2.py index 5284a82..7e3af6f 100644 --- a/swh/journal/tests/test_publisher2.py +++ b/swh/journal/tests/test_publisher2.py @@ -1,101 +1,101 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from kafka import KafkaConsumer, KafkaProducer from typing import Dict, Text from swh.journal.publisher import JournalPublisher from .conftest import ( - TEST_CONFIG, CONTENTS, REVISIONS # , RELEASES, ORIGINS + TEST_CONFIG, CONTENTS, REVISIONS, RELEASES, ORIGINS ) OBJECT_TYPE_KEYS = { - 'content': (b'sha1', CONTENTS), - # 'revision': (b'id', REVISIONS), + # 'content': (b'sha1', CONTENTS), + 'revision': (b'id', REVISIONS), # 'release': (b'id', RELEASES), } def assert_publish(publisher: JournalPublisher, consumer_from_publisher: KafkaConsumer, producer_to_publisher: KafkaProducer, object_type: Text): """Assert that publishing object in the publisher is reified and published in topics. Args: journal_publisher (JournalPublisher): publisher to read and write data kafka_consumer (KafkaConsumer): To read data from the publisher kafka_producer (KafkaProducer): To send data to the publisher """ # object type's id label key object_key_id, expected_objects = OBJECT_TYPE_KEYS[object_type] # objects to send to the publisher objects = [{object_key_id: c[object_key_id.decode()]} for c in expected_objects] # send message to the publisher for obj in objects: producer_to_publisher.send( '%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type), obj ) nb_messages = len(objects) # publisher should poll 1 message and send 1 reified object publisher.poll(max_messages=nb_messages) # then (client reads from the messages from output topic) msgs = [] for num, msg in enumerate(consumer_from_publisher): print('#### consumer_from_publisher: msg %s: %s ' % (num, msg)) print('#### consumer_from_publisher: msg.value %s: %s ' % ( num, msg.value)) msgs.append((msg.topic, msg.key, msg.value)) expected_topic = '%s.content' % TEST_CONFIG['final_prefix'] assert expected_topic == msg.topic expected_key = objects[num][object_key_id] assert expected_key == msg.key expected_value = expected_objects[num] # Transformation is needed due to msgpack which encodes keys and values value = {} for k, v in msg.value.items(): k = k.decode() if k == 'status': v = v.decode() value[k] = v assert expected_value == value def test_publish( publisher: JournalPublisher, consumer_from_publisher: KafkaConsumer, producer_to_publisher: KafkaProducer): """ Reading from and writing to the journal publisher should work (contents) Args: journal_publisher (JournalPublisher): publisher to read and write data kafka_consumer (KafkaConsumer): To read data from the publisher kafka_producer (KafkaProducer): To send data to the publisher """ object_types = OBJECT_TYPE_KEYS.keys() # Subscribe to topics consumer_from_publisher.subscribe( topics=['%s.%s' % (TEST_CONFIG['final_prefix'], object_type) for object_type in object_types]) for object_type in object_types: assert_publish(publisher, consumer_from_publisher, producer_to_publisher, object_type)