diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py index e9d2b45..1bf7f58 100644 --- a/swh/journal/publisher.py +++ b/swh/journal/publisher.py @@ -1,222 +1,222 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging from kafka import KafkaProducer, KafkaConsumer from swh.storage import get_storage from swh.storage.algos import snapshot from .serializers import kafka_to_key, key_to_kafka logger = logging.getLogger(__name__) MANDATORY_KEYS = [ 'brokers', 'temporary_prefix', 'final_prefix', 'consumer_id', 'publisher_id', 'object_types', 'storage' ] class JournalPublisher: """The journal publisher is a layer in charge of: - consuming messages from topics (1 topic per object_type) - reify the object ids read from those topics (using the storage) - producing those reified objects to output topics (1 topic per object type) The main entry point for this class is the 'poll' method. """ def __init__(self, config): self.config = config self.check_config(config) self._prepare_storage(config) self._prepare_journal(config) self.max_messages = self.config['max_messages'] logger.setLevel(logging.DEBUG) def check_config(self, config): """Check the configuration is fine. If not raise an error. """ missing_keys = [] for key in MANDATORY_KEYS: if not config.get(key): missing_keys.append(key) if missing_keys: raise ValueError( 'Configuration error: The following keys must be' ' provided: %s' % (','.join(missing_keys), )) def _prepare_journal(self, config): """Prepare the consumer and subscriber instances for the publisher to actually be able to discuss with the journal. """ # yes, the temporary topics contain values that are actually _keys_ self.consumer = KafkaConsumer( bootstrap_servers=config['brokers'], value_deserializer=kafka_to_key, auto_offset_reset='earliest', enable_auto_commit=False, group_id=config['consumer_id'], ) self.producer = KafkaProducer( bootstrap_servers=config['brokers'], key_serializer=key_to_kafka, value_serializer=key_to_kafka, client_id=config['publisher_id'], ) logger.info('Subscribing to object types event: %s' % ( config['object_types'], )) self.consumer.subscribe( topics=['%s.%s' % (config['temporary_prefix'], object_type) for object_type in config['object_types']], ) def _prepare_storage(self, config): """Prepare the storage instance needed for the publisher to be able to discuss with the storage to retrieve the objects. """ self.storage = get_storage(**config['storage']) def poll(self, max_messages=None): """Process a batch of messages from the consumer's topics. Use the storage to reify those ids. Produces back those reified objects to the production topics. This method polls a given amount of message then stops. The number of messages to consume is either provided or configured as fallback. The following method is expected to be called from within a loop. """ messages = defaultdict(list) if max_messages is None: max_messages = self.max_messages for num, message in enumerate(self.consumer): object_type = message.topic.split('.')[-1] logger.debug('num: %s, object_type: %s, message: %s' % ( - num, object_type, message)) + num+1, object_type, message)) messages[object_type].append(message.value) if num + 1 >= self.max_messages: break - logger.debug('number of messages: %s', num) + logger.debug('number of messages: %s', num+1) new_objects = self.process_objects(messages) self.produce_messages(new_objects) self.consumer.commit() def process_objects(self, messages): """Given a dict of messages {object type: [object id]}, reify those ids to swh object from the storage and returns a corresponding dict. Args: messages (dict): Dict of {object_type: [id-as-bytes]} Returns: Dict of {object_type: [tuple]}. object_type (str): content, revision, release tuple (bytes, dict): object id as bytes, object as swh dict. """ processors = { 'content': self.process_contents, 'revision': self.process_revisions, 'release': self.process_releases, 'snapshot': self.process_snapshots, 'origin': self.process_origins, 'origin_visit': self.process_origin_visits, } return { key: processors[key](value) for key, value in messages.items() } def produce_messages(self, messages): """Produce new swh object to the producer topic. Args: messages ([dict]): Dict of {object_type: [tuple]}. object_type (str): content, revision, release tuple (bytes, dict): object id as bytes, object as swh dict. """ for object_type, objects in messages.items(): topic = '%s.%s' % (self.config['final_prefix'], object_type) for key, object in objects: logger.debug('topic: %s, key: %s, value: %s' % ( topic, key, object)) self.producer.send(topic, key=key, value=object) self.producer.flush() def process_contents(self, content_objs): logger.debug('contents: %s' % content_objs) metadata = self.storage.content_get_metadata( (c[b'sha1'] for c in content_objs)) return [(content['sha1'], content) for content in metadata] def process_revisions(self, revision_objs): logger.debug('revisions: %s' % revision_objs) metadata = self.storage.revision_get((r[b'id'] for r in revision_objs)) return [(revision['id'], revision) for revision in metadata if revision] def process_releases(self, release_objs): logger.debug('releases: %s' % release_objs) metadata = self.storage.release_get((r[b'id'] for r in release_objs)) return [(release['id'], release) for release in metadata] def process_origins(self, origin_objs): logger.debug('origins: %s' % origin_objs) r = [] for o in origin_objs: origin = {'url': o[b'url'], 'type': o[b'type']} r.append((origin, origin)) return r def process_origin_visits(self, origin_visits): logger.debug('origin_visits: %s' % origin_visits) metadata = [] for ov in origin_visits: origin_visit = self.storage.origin_visit_get_by( ov[b'origin'], ov[b'visit']) if origin_visit: pk = ov[b'origin'], ov[b'visit'] origin_visit['date'] = str(origin_visit['date']) metadata.append((pk, origin_visit)) return metadata def process_snapshots(self, snapshot_objs): logger.debug('snapshots: %s' % snapshot_objs) metadata = [] for snap in snapshot_objs: full_obj = snapshot.snapshot_get_all_branches( self.storage, snap[b'id']) metadata.append((full_obj['id'], full_obj)) return metadata if __name__ == '__main__': print('Please use the "swh-journal publisher run" command') diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index 8078eec..26105f6 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,250 +1,257 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import logging from kafka import KafkaProducer, KafkaConsumer from subprocess import Popen from typing import Tuple, Dict from pathlib import Path from pytest_kafka import ( make_zookeeper_process, make_kafka_server, constants ) from swh.journal.publisher import JournalPublisher from swh.model.hashutil import hash_to_bytes from swh.journal.serializers import kafka_to_key, key_to_kafka, kafka_to_value CONTENTS = [ { 'length': 3, 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': b'foo', 'blake2s256': b'bar', 'sha256': b'baz', 'status': 'visible', }, ] COMMITTER = [ { 'id': 1, 'fullname': 'foo', }, { 'id': 2, 'fullname': 'bar', } ] REVISIONS = [ { 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'hello', 'date': { 'timestamp': { 'seconds': 1234567891, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'committer': COMMITTER[0], 'author': COMMITTER[0], 'committer_date': None, }, { 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'hello again', 'date': { 'timestamp': { 'seconds': 1234567892, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'committer': COMMITTER[1], 'author': COMMITTER[1], 'committer_date': None, }, ] RELEASES = [ { 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'name': b'v0.0.1', 'date': { 'timestamp': { 'seconds': 1234567890, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'author': COMMITTER[0], }, ] ORIGINS = [ { 'url': 'https://somewhere.org/den/fox', 'type': 'git', }, { 'url': 'https://overtherainbow.org/fox/den', 'type': 'svn', } ] ORIGIN_VISITS = [ { 'date': '2013-05-07T04:20:39.369271+00:00', }, { 'date': '2018-11-27T17:20:39.000000+00:00', } ] # From type to tuple (id, ) OBJECT_TYPE_KEYS = { 'content': (b'sha1', CONTENTS), 'revision': (b'id', REVISIONS), 'release': (b'id', RELEASES), } TEST_CONFIG = { 'temporary_prefix': 'swh.tmp_journal.new', 'final_prefix': 'swh.journal.objects', 'consumer_id': 'swh.journal.publisher', 'publisher_id': 'swh.journal.publisher', 'object_types': OBJECT_TYPE_KEYS.keys(), 'max_messages': 1, # will read 1 message and stops 'storage': {'cls': 'memory', 'args': {}} } class JournalPublisherTest(JournalPublisher): """A journal publisher which override the default configuration parsing setup. """ def _prepare_storage(self, config): super()._prepare_storage(config) self.storage.content_add({'data': b'42', **c} for c in CONTENTS) self.storage.revision_add(REVISIONS) self.storage.release_add(RELEASES) origins = self.storage.origin_add(ORIGINS) origin_visits = [] for i, ov in enumerate(ORIGIN_VISITS): origin_id = origins[i]['id'] ov = self.storage.origin_visit_add(origin_id, ov['date']) origin_visits.append(ov) self.origins = origins self.origin_visits = origin_visits KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT') KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + '/kafka' if not os.path.exists(KAFKA_ROOT): msg = ('Development error: %s must exist and target an ' 'existing kafka installation' % KAFKA_ROOT) raise ValueError(msg) KAFKA_SCRIPTS = Path(KAFKA_ROOT) / 'bin' KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') # Those defines fixtures zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN) kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc') logger = logging.getLogger('kafka') logger.setLevel(logging.WARN) @pytest.fixture def test_config(): """Test configuration needed for publisher/producer/consumer """ return TEST_CONFIG @pytest.fixture def producer_to_publisher( request: 'SubRequest', # noqa F821 kafka_server: Tuple[Popen, int], test_config: Dict, ) -> KafkaProducer: # noqa """Producer to send message to the publisher's consumer. """ _, port = kafka_server producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=key_to_kafka, client_id=test_config['consumer_id'], ) return producer @pytest.fixture def kafka_consumer( request: 'SubRequest', # noqa F821 test_config: Dict, ) -> KafkaConsumer: """Get a connected Kafka consumer. """ kafka_topics = ['%s.%s' % (test_config['final_prefix'], object_type) for object_type in test_config['object_types']] test_config['topics'] = kafka_topics consumer_kwargs = dict( key_deserializer=kafka_to_key, value_deserializer=kafka_to_value, auto_offset_reset='earliest', enable_auto_commit=True, - client_id=test_config['publisher_id']) + group_id="test-consumer") _, kafka_port = request.getfixturevalue('kafka_server') used_consumer_kwargs = consumer_kwargs.copy() used_consumer_kwargs.setdefault('consumer_timeout_ms', constants.DEFAULT_CONSUMER_TIMEOUT_MS) used_consumer_kwargs.setdefault( 'bootstrap_servers', 'localhost:{}'.format(kafka_port)) consumer = KafkaConsumer( *kafka_topics, **used_consumer_kwargs, ) + + # Enforce auto_offset_reset=earliest even if the consumer was created + # too soon wrt the server. + while len(consumer.assignment()) == 0: + consumer.poll(timeout_ms=20) + consumer.seek_to_beginning() + return consumer @pytest.fixture def publisher(kafka_server: Tuple[Popen, int], test_config: Dict) -> JournalPublisher: """Test Publisher factory. We cannot use a fixture here as we need to modify the sample. """ # consumer and producer of the publisher needs to discuss with the # right instance _, port = kafka_server test_config['brokers'] = ['localhost:{}'.format(port)] publisher = JournalPublisherTest(test_config) return publisher diff --git a/swh/journal/tests/test_publisher_kafka.py b/swh/journal/tests/test_publisher_kafka.py index 2fa4c6b..8eae576 100644 --- a/swh/journal/tests/test_publisher_kafka.py +++ b/swh/journal/tests/test_publisher_kafka.py @@ -1,88 +1,88 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from kafka import KafkaConsumer, KafkaProducer from subprocess import Popen from typing import Tuple, Text from swh.journal.serializers import value_to_kafka, kafka_to_value from swh.journal.publisher import JournalPublisher from .conftest import TEST_CONFIG, OBJECT_TYPE_KEYS def assert_publish_ok(publisher: JournalPublisher, consumer_from_publisher: KafkaConsumer, producer_to_publisher: KafkaProducer, object_type: Text): """Assert that publishing object in the publisher is reified and published in output topics. Args: publisher (JournalPublisher): publisher to read and write data consumer_from_publisher (KafkaConsumer): To read data from the publisher producer_to_publisher (KafkaProducer): To send data to the publisher object_type (str): Object type to look for (e.g content, revision, etc...) """ # object type's id label key object_key_id, expected_objects = OBJECT_TYPE_KEYS[object_type] # objects to send to the publisher objects = [{object_key_id: c[object_key_id.decode()]} for c in expected_objects] # send message to the publisher for obj in objects: producer_to_publisher.send( '%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type), obj ) nb_messages = len(objects) - # publisher should poll 1 message and send 1 reified object - publisher.poll(max_messages=nb_messages) + for _ in range(nb_messages): + publisher.poll(max_messages=1) # then (client reads from the messages from output topic) - msgs = [] + num = -1 for num, msg in enumerate(consumer_from_publisher): - msgs.append((msg.topic, msg.key, msg.value)) - expected_topic = '%s.%s' % (TEST_CONFIG['final_prefix'], object_type) assert expected_topic == msg.topic expected_key = objects[num][object_key_id] assert expected_key == msg.key # Transformation is needed due to our back and forth # serialization to kafka expected_value = kafka_to_value(value_to_kafka(expected_objects[num])) assert expected_value == msg.value + assert num + 1 == len(expected_objects) + def test_publish( publisher: JournalPublisher, kafka_server: Tuple[Popen, int], kafka_consumer: KafkaConsumer, producer_to_publisher: KafkaProducer): """ Reading from and writing to the journal publisher should work (contents) Args: journal_publisher (JournalPublisher): publisher to read and write data kafka_consumer (KafkaConsumer): To read data from the publisher producer_to_publisher (KafkaProducer): To send data to the publisher """ # retrieve the object types we want to test object_types = OBJECT_TYPE_KEYS.keys() # Now for each object type, we'll send data to the publisher and # check that data is indeed fetched and reified in the publisher's # output topics for object_type in object_types: assert_publish_ok( publisher, kafka_consumer, producer_to_publisher, object_type)