diff --git a/swh/journal/cli.py b/swh/journal/cli.py index b498fbb..456f6d9 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,89 +1,74 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging import os from swh.core import config from swh.journal.publisher import JournalPublisher CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @click.group(context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.option('--log-level', '-l', default='INFO', type=click.Choice(logging._nameToLevel.keys()), help="Log level (default to INFO)") @click.pass_context def cli(ctx, config_file, log_level): """Software Heritage Scheduler CLI interface Default to use the the local scheduler instance (plugged to the main scheduler db). """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') if not config_file: raise ValueError('You must either pass a config-file parameter ' 'or set SWH_CONFIG_FILENAME to target ' 'the config-file') if not os.path.exists(config_file): raise ValueError('%s does not exist' % config_file) conf = config.read(config_file) ctx.ensure_object(dict) logger = logging.getLogger(__name__) logger.setLevel(log_level) _log = logging.getLogger('kafka') _log.setLevel(logging.INFO) ctx.obj['config'] = conf ctx.obj['loglevel'] = log_level @cli.command() @click.pass_context def publisher(ctx): """Manipulate publisher """ - mandatory_keys = [ - 'brokers', 'temporary_prefix', 'final_prefix', 'consumer_id', - 'publisher_id', 'object_types', 'storage' - ] - conf = ctx.obj['config'] - missing_keys = [] - for key in mandatory_keys: - if not conf.get(key): - missing_keys.append(key) - - if missing_keys: - raise click.ClickException( - 'Configuration error: The following keys must be' - ' provided: %s' % (','.join(missing_keys), )) - publisher = JournalPublisher(conf) try: while True: publisher.poll() except KeyboardInterrupt: ctx.exit(0) def main(): return cli(auto_envvar_prefix='SWH_JOURNAL') if __name__ == '__main__': main() diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py index 5a9dd63..e9d2b45 100644 --- a/swh/journal/publisher.py +++ b/swh/journal/publisher.py @@ -1,199 +1,222 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging from kafka import KafkaProducer, KafkaConsumer from swh.storage import get_storage from swh.storage.algos import snapshot from .serializers import kafka_to_key, key_to_kafka logger = logging.getLogger(__name__) +MANDATORY_KEYS = [ + 'brokers', 'temporary_prefix', 'final_prefix', 'consumer_id', + 'publisher_id', 'object_types', 'storage' +] + + class JournalPublisher: """The journal publisher is a layer in charge of: - consuming messages from topics (1 topic per object_type) - reify the object ids read from those topics (using the storage) - producing those reified objects to output topics (1 topic per object type) The main entry point for this class is the 'poll' method. """ def __init__(self, config): self.config = config + self.check_config(config) self._prepare_storage(config) self._prepare_journal(config) self.max_messages = self.config['max_messages'] logger.setLevel(logging.DEBUG) + def check_config(self, config): + """Check the configuration is fine. + + If not raise an error. + + """ + missing_keys = [] + for key in MANDATORY_KEYS: + if not config.get(key): + missing_keys.append(key) + + if missing_keys: + raise ValueError( + 'Configuration error: The following keys must be' + ' provided: %s' % (','.join(missing_keys), )) + def _prepare_journal(self, config): """Prepare the consumer and subscriber instances for the publisher to actually be able to discuss with the journal. """ # yes, the temporary topics contain values that are actually _keys_ self.consumer = KafkaConsumer( bootstrap_servers=config['brokers'], value_deserializer=kafka_to_key, auto_offset_reset='earliest', enable_auto_commit=False, group_id=config['consumer_id'], ) self.producer = KafkaProducer( bootstrap_servers=config['brokers'], key_serializer=key_to_kafka, value_serializer=key_to_kafka, client_id=config['publisher_id'], ) logger.info('Subscribing to object types event: %s' % ( config['object_types'], )) self.consumer.subscribe( topics=['%s.%s' % (config['temporary_prefix'], object_type) for object_type in config['object_types']], ) def _prepare_storage(self, config): """Prepare the storage instance needed for the publisher to be able to discuss with the storage to retrieve the objects. """ self.storage = get_storage(**config['storage']) def poll(self, max_messages=None): """Process a batch of messages from the consumer's topics. Use the storage to reify those ids. Produces back those reified objects to the production topics. This method polls a given amount of message then stops. The number of messages to consume is either provided or configured as fallback. The following method is expected to be called from within a loop. """ messages = defaultdict(list) if max_messages is None: max_messages = self.max_messages for num, message in enumerate(self.consumer): object_type = message.topic.split('.')[-1] logger.debug('num: %s, object_type: %s, message: %s' % ( num, object_type, message)) messages[object_type].append(message.value) if num + 1 >= self.max_messages: break logger.debug('number of messages: %s', num) new_objects = self.process_objects(messages) self.produce_messages(new_objects) self.consumer.commit() def process_objects(self, messages): """Given a dict of messages {object type: [object id]}, reify those ids to swh object from the storage and returns a corresponding dict. Args: messages (dict): Dict of {object_type: [id-as-bytes]} Returns: Dict of {object_type: [tuple]}. object_type (str): content, revision, release tuple (bytes, dict): object id as bytes, object as swh dict. """ processors = { 'content': self.process_contents, 'revision': self.process_revisions, 'release': self.process_releases, 'snapshot': self.process_snapshots, 'origin': self.process_origins, 'origin_visit': self.process_origin_visits, } return { key: processors[key](value) for key, value in messages.items() } def produce_messages(self, messages): """Produce new swh object to the producer topic. Args: messages ([dict]): Dict of {object_type: [tuple]}. object_type (str): content, revision, release tuple (bytes, dict): object id as bytes, object as swh dict. """ for object_type, objects in messages.items(): topic = '%s.%s' % (self.config['final_prefix'], object_type) for key, object in objects: logger.debug('topic: %s, key: %s, value: %s' % ( topic, key, object)) self.producer.send(topic, key=key, value=object) self.producer.flush() def process_contents(self, content_objs): logger.debug('contents: %s' % content_objs) metadata = self.storage.content_get_metadata( (c[b'sha1'] for c in content_objs)) return [(content['sha1'], content) for content in metadata] def process_revisions(self, revision_objs): logger.debug('revisions: %s' % revision_objs) metadata = self.storage.revision_get((r[b'id'] for r in revision_objs)) return [(revision['id'], revision) for revision in metadata if revision] def process_releases(self, release_objs): logger.debug('releases: %s' % release_objs) metadata = self.storage.release_get((r[b'id'] for r in release_objs)) return [(release['id'], release) for release in metadata] def process_origins(self, origin_objs): logger.debug('origins: %s' % origin_objs) r = [] for o in origin_objs: origin = {'url': o[b'url'], 'type': o[b'type']} r.append((origin, origin)) return r def process_origin_visits(self, origin_visits): logger.debug('origin_visits: %s' % origin_visits) metadata = [] for ov in origin_visits: origin_visit = self.storage.origin_visit_get_by( ov[b'origin'], ov[b'visit']) if origin_visit: pk = ov[b'origin'], ov[b'visit'] origin_visit['date'] = str(origin_visit['date']) metadata.append((pk, origin_visit)) return metadata def process_snapshots(self, snapshot_objs): logger.debug('snapshots: %s' % snapshot_objs) metadata = [] for snap in snapshot_objs: full_obj = snapshot.snapshot_get_all_branches( self.storage, snap[b'id']) metadata.append((full_obj['id'], full_obj)) return metadata if __name__ == '__main__': print('Please use the "swh-journal publisher run" command') diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index fd72443..85ad928 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,204 +1,204 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import logging from kafka import KafkaProducer from subprocess import Popen from typing import Tuple from pathlib import Path from pytest_kafka import ( make_zookeeper_process, make_kafka_server, make_kafka_consumer ) from swh.journal.publisher import JournalPublisher from swh.model.hashutil import hash_to_bytes from swh.journal.serializers import kafka_to_key, key_to_kafka, kafka_to_value TEST_CONFIG = { - 'brokers': [], + 'brokers': ['something'], # this will be overriden in publisher setup 'temporary_prefix': 'swh.tmp_journal.new', 'final_prefix': 'swh.journal.objects', 'consumer_id': 'swh.journal.publisher', 'publisher_id': 'swh.journal.publisher', - 'object_types': ['content'], + 'object_types': ['something'], # this will be overriden in publisher setup 'max_messages': 1, # will read 1 message and stops 'storage': {'cls': 'memory', 'args': {}} } CONTENTS = [ { 'length': 3, 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': b'foo', 'blake2s256': b'bar', 'sha256': b'baz', 'status': 'visible', }, ] COMMITTER = [ { 'id': 1, 'fullname': 'foo', }, { 'id': 2, 'fullname': 'bar', } ] REVISIONS = [ { 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'hello', 'date': { 'timestamp': { 'seconds': 1234567891, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'committer': COMMITTER[0], 'author': COMMITTER[0], 'committer_date': None, }, { 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'hello again', 'date': { 'timestamp': { 'seconds': 1234567892, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'committer': COMMITTER[1], 'author': COMMITTER[1], 'committer_date': None, }, ] RELEASES = [ { 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'name': b'v0.0.1', 'date': { 'timestamp': { 'seconds': 1234567890, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'author': COMMITTER[0], }, ] ORIGINS = [ { 'url': 'https://somewhere.org/den/fox', 'type': 'git', }, { 'url': 'https://overtherainbow.org/fox/den', 'type': 'svn', } ] ORIGIN_VISITS = [ { 'date': '2013-05-07T04:20:39.369271+00:00', }, { 'date': '2018-11-27T17:20:39.000000+00:00', } ] class JournalPublisherTest(JournalPublisher): """A journal publisher which override the default configuration parsing setup. """ def _prepare_storage(self, config): super()._prepare_storage(config) self.storage.content_add({'data': b'42', **c} for c in CONTENTS) print('#### all contents: %s' % self.storage._contents) self.storage.revision_add(REVISIONS) self.storage.release_add(RELEASES) origins = self.storage.origin_add(ORIGINS) origin_visits = [] for i, ov in enumerate(ORIGIN_VISITS): origin_id = origins[i]['id'] ov = self.storage.origin_visit_add(origin_id, ov['date']) origin_visits.append(ov) self.origins = origins self.origin_visits = origin_visits print("publisher.origin-visits", self.origin_visits) KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT', Path(__file__).parent) KAFKA_SCRIPTS = KAFKA_ROOT / 'kafka/bin/' KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') # Those defines fixtures zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN) kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc') logger = logging.getLogger('kafka') logger.setLevel(logging.WARN) @pytest.fixture def producer_to_publisher( request: 'SubRequest', # noqa F821 kafka_server: Tuple[Popen, int]) -> KafkaProducer: # noqa """Producer to send message to the publisher's consumer. """ _, port = kafka_server producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=key_to_kafka, client_id=TEST_CONFIG['consumer_id'], ) return producer # pytest fixture (no need for the annotation though or else it breaks) consumer_from_publisher = make_kafka_consumer( 'kafka_server', seek_to_beginning=True, key_deserializer=kafka_to_key, value_deserializer=kafka_to_value, auto_offset_reset='earliest', enable_auto_commit=True, client_id=TEST_CONFIG['publisher_id'], kafka_topics=['dummy']) # will be overriden during test setup @pytest.fixture def publisher( request: 'SubRequest', # noqa F821 kafka_server: Tuple[Popen, int]) -> JournalPublisher: # consumer and producer of the publisher needs to discuss with the # right instance _, port = kafka_server TEST_CONFIG['brokers'] = ['localhost:{}'.format(port)] return JournalPublisherTest(TEST_CONFIG) diff --git a/swh/journal/tests/test_publisher.py b/swh/journal/tests/test_publisher.py index e990693..f1792cf 100644 --- a/swh/journal/tests/test_publisher.py +++ b/swh/journal/tests/test_publisher.py @@ -1,107 +1,151 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import pytest import unittest from .conftest import ( JournalPublisherTest, TEST_CONFIG, CONTENTS, REVISIONS, RELEASES, ORIGINS ) +from swh.journal.publisher import MANDATORY_KEYS class JournalPublisherNoKafkaInMemoryStorage(JournalPublisherTest): """A journal publisher with: - no kafka dependency - in-memory storage """ + def check_config(self, config): + """No need to check the configuration here as we do not use kafka + + """ + pass + def _prepare_journal(self, config): """No journal for now """ pass class TestPublisherNoKafka(unittest.TestCase): """This tests only the part not using any kafka instance """ def setUp(self): self.publisher = JournalPublisherNoKafkaInMemoryStorage(TEST_CONFIG) self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] self.revisions = [{b'id': c['id']} for c in REVISIONS] self.releases = [{b'id': c['id']} for c in RELEASES] # those needs id generation from the storage # so initialization is different than other entities self.origins = [{b'url': o['url'], b'type': o['type']} for o in self.publisher.origins] self.origin_visits = [{b'origin': ov['origin'], b'visit': ov['visit']} for ov in self.publisher.origin_visits] # full objects storage = self.publisher.storage ovs = [] for ov in self.origin_visits: _ov = storage.origin_visit_get_by( ov[b'origin'], ov[b'visit']) _ov['date'] = str(_ov['date']) ovs.append(_ov) self.expected_origin_visits = ovs def test_process_contents(self): actual_contents = self.publisher.process_contents(self.contents) expected_contents = [(c['sha1'], c) for c in CONTENTS] self.assertEqual(actual_contents, expected_contents) def test_process_revisions(self): actual_revisions = self.publisher.process_revisions(self.revisions) expected_revisions = [(c['id'], c) for c in REVISIONS] self.assertEqual(actual_revisions, expected_revisions) def test_process_releases(self): actual_releases = self.publisher.process_releases(self.releases) expected_releases = [(c['id'], c) for c in RELEASES] self.assertEqual(actual_releases, expected_releases) def test_process_origins(self): actual_origins = self.publisher.process_origins(self.origins) expected_origins = [({'url': o[b'url'], 'type': o[b'type']}, {'url': o[b'url'], 'type': o[b'type']}) for o in self.origins] self.assertEqual(actual_origins, expected_origins) def test_process_origin_visits(self): actual_ovs = self.publisher.process_origin_visits(self.origin_visits) expected_ovs = [((ov['origin'], ov['visit']), ov) for ov in self.expected_origin_visits] self.assertEqual(actual_ovs, expected_ovs) def test_process_objects(self): messages = { 'content': self.contents, 'revision': self.revisions, 'release': self.releases, 'origin': self.origins, 'origin_visit': self.origin_visits, } actual_objects = self.publisher.process_objects(messages) expected_contents = [(c['sha1'], c) for c in CONTENTS] expected_revisions = [(c['id'], c) for c in REVISIONS] expected_releases = [(c['id'], c) for c in RELEASES] expected_origins = [(o, o) for o in ORIGINS] expected_ovs = [((ov['origin'], ov['visit']), ov) for ov in self.expected_origin_visits] expected_objects = { 'content': expected_contents, 'revision': expected_revisions, 'release': expected_releases, 'origin': expected_origins, 'origin_visit': expected_ovs, } self.assertEqual(actual_objects, expected_objects) + + +class JournalPublisherCheckTest(JournalPublisherTest): + """A journal publisher with: + - no kafka dependency + - in-memory storage + + """ + def _prepare_journal(self, config): + """No journal for now + + """ + pass + + +def test_check_config_ok(): + """Instantiate a publisher with the right config is fine + + """ + publisher = JournalPublisherCheckTest(TEST_CONFIG) + assert publisher is not None + + +def test_check_config_ko(): + """Instantiate a publisher with the wrong config should raise + + """ + for k in MANDATORY_KEYS: + conf = TEST_CONFIG.copy() + conf.pop(k) + with pytest.raises(ValueError) as e: + JournalPublisherCheckTest(conf) + + error = ('Configuration error: The following keys must be' + ' provided: %s' % (','.join([k]), )) + assert e.value.args[0] == error