diff --git a/.gitignore b/.gitignore --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,7 @@ dist/ version.txt .tox/ +kafka/ +kafka*.tgz* +kafka*.tar.gz* +swh/journal/tests/kafka* \ No newline at end of file diff --git a/Makefile.local b/Makefile.local new file mode 100644 --- /dev/null +++ b/Makefile.local @@ -0,0 +1,15 @@ +install: + bin/install-kafka.sh install + +clean: + bin/install-kafka.sh clean + +clean-all: + bin/install-kafka.sh clean + bin/install-kafka.sh clean-cache + +test-fast: + pytest + +test: + tox diff --git a/README.md b/README.md --- a/README.md +++ b/README.md @@ -6,3 +6,45 @@ See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. + +# Local test + +As a pre-requisite, you need a kakfa installation path. +The following target will take care of this: + +``` +make install +``` + +Then, provided you are in the right virtual environment as described +in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): + +``` +pytest +``` + +or: + +``` +tox +``` + + +# Running + +## publisher + +Command: +``` +$ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ + publisher +``` + +# Auto-completion + +To have the completion, add the following in your +~/.virtualenvs/swh/bin/postactivate: + +``` +eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" +``` diff --git a/bin/install-kafka.sh b/bin/install-kafka.sh new file mode 100755 --- /dev/null +++ b/bin/install-kafka.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash + +set -xe + +SCALA_VERSION=2.12 +KAFKA_VERSION=2.1.1 +KAFKA_APP="kafka_${SCALA_VERSION}-${KAFKA_VERSION}" +TARBALL="${KAFKA_APP}.tgz" +CHECKSUMS="${TARBALL}.sha512" +KAFKA_ROOT_DIR=swh/journal/tests +KAFKA_DIR="${KAFKA_ROOT_DIR}/${KAFKA_APP}" +KAFKA_LINK="${KAFKA_ROOT_DIR}/kafka" +URL="http://apache.mirrors.ovh.net/ftp.apache.org/dist/kafka/${KAFKA_VERSION}/${TARBALL}" +KAFKA_CHECKSUM=a2e8168e8de6b45e8fca1f2883f0744d3c5a939b70d8a47a5428b72188501d4c2fc11bc35759f2392680d4e8ecf2fa9d0e518e77fd28393afba22194ad018b10 + +case $1 in + "install") + echo "Kafka installation started." + if [ ! -f $TARBALL ]; then + echo "Kafka download" + wget $URL + echo "${KAFKA_CHECKSUM} ${TARBALL}" > $CHECKSUMS + sha512sum -c $CHECKSUMS + + if [ $? -ne 0 ]; then + echo "Kafka download: failed to retrieve ${TARBALL}"; + exit 1 + fi + echo "Kafka download done" + fi + + if [ ! -d $KAFKA_DIR ]; then + echo "Kafka extraction" + tar xvf $TARBALL -C $KAFKA_ROOT_DIR + pushd $KAFKA_ROOT_DIR && ln -nsf $KAFKA_APP kafka && popd + echo "Kafka extraction done" + fi + echo "Kafka installation done. Kafka is installed at $KAFKA_DIR" + ;; + "clean") + echo "Kafka cleanup started." + [ -d $KAFKA_DIR ] && rm -rf $KAFKA_DIR + [ -L $KAFKA_LINK ] && rm $KAFKA_LINK + echo "Kafka cleanup done." + ;; + "clean-cache") + echo "Kafka cleanup cache started." + [ -f $TARBALL ] && rm $TARBALL + [ -f $CHECKSUMS ] && rm $CHECKSUMS + echo "Kafka cleanup cache done." + ;; + *) + echo "Unknown command, do nothing" + exit 1; +esac diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,2 +1,3 @@ pytest swh.model +pytest-kafka diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -57,22 +57,7 @@ """Manipulate publisher """ - mandatory_keys = [ - 'brokers', 'temporary_prefix', 'final_prefix', 'consumer_id', - 'publisher_id', 'object_types', 'storage' - ] - conf = ctx.obj['config'] - missing_keys = [] - for key in mandatory_keys: - if not conf.get(key): - missing_keys.append(key) - - if missing_keys: - raise click.ClickException( - 'Configuration error: The following keys must be' - ' provided: %s' % (','.join(missing_keys), )) - publisher = JournalPublisher(conf) try: while True: diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py --- a/swh/journal/publisher.py +++ b/swh/journal/publisher.py @@ -16,6 +16,12 @@ logger = logging.getLogger(__name__) +MANDATORY_KEYS = [ + 'brokers', 'temporary_prefix', 'final_prefix', 'consumer_id', + 'publisher_id', 'object_types', 'storage' +] + + class JournalPublisher: """The journal publisher is a layer in charge of: @@ -29,9 +35,27 @@ """ def __init__(self, config): self.config = config + self.check_config(config) self._prepare_storage(config) self._prepare_journal(config) self.max_messages = self.config['max_messages'] + logger.setLevel(logging.DEBUG) + + def check_config(self, config): + """Check the configuration is fine. + + If not raise an error. + + """ + missing_keys = [] + for key in MANDATORY_KEYS: + if not config.get(key): + missing_keys.append(key) + + if missing_keys: + raise ValueError( + 'Configuration error: The following keys must be' + ' provided: %s' % (','.join(missing_keys), )) def _prepare_journal(self, config): """Prepare the consumer and subscriber instances for the publisher to diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py new file mode 100644 --- /dev/null +++ b/swh/journal/tests/conftest.py @@ -0,0 +1,205 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os +import pytest +import logging + +from kafka import KafkaProducer +from subprocess import Popen +from typing import Tuple, Dict + +from pathlib import Path +from pytest_kafka import ( + make_zookeeper_process, make_kafka_server, make_kafka_consumer +) + +from swh.journal.publisher import JournalPublisher +from swh.model.hashutil import hash_to_bytes + +from swh.journal.serializers import kafka_to_key, key_to_kafka, kafka_to_value + + +TEST_CONFIG = { + 'brokers': ['something'], # this will be overriden in publisher setup + 'temporary_prefix': 'swh.tmp_journal.new', + 'final_prefix': 'swh.journal.objects', + 'consumer_id': 'swh.journal.publisher', + 'publisher_id': 'swh.journal.publisher', + 'object_types': ['something'], # this will be overriden in publisher setup + 'max_messages': 1, # will read 1 message and stops + 'storage': {'cls': 'memory', 'args': {}} +} + +CONTENTS = [ + { + 'length': 3, + 'sha1': hash_to_bytes( + '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), + 'sha1_git': b'foo', + 'blake2s256': b'bar', + 'sha256': b'baz', + 'status': 'visible', + }, +] + +COMMITTER = [ + { + 'id': 1, + 'fullname': 'foo', + }, + { + 'id': 2, + 'fullname': 'bar', + } +] + +REVISIONS = [ + { + 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), + 'message': b'hello', + 'date': { + 'timestamp': { + 'seconds': 1234567891, + 'microseconds': 0, + }, + 'offset': 120, + 'negative_utc': None, + }, + 'committer': COMMITTER[0], + 'author': COMMITTER[0], + 'committer_date': None, + }, + { + 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), + 'message': b'hello again', + 'date': { + 'timestamp': { + 'seconds': 1234567892, + 'microseconds': 0, + }, + 'offset': 120, + 'negative_utc': None, + }, + 'committer': COMMITTER[1], + 'author': COMMITTER[1], + 'committer_date': None, + }, +] + +RELEASES = [ + { + 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), + 'name': b'v0.0.1', + 'date': { + 'timestamp': { + 'seconds': 1234567890, + 'microseconds': 0, + }, + 'offset': 120, + 'negative_utc': None, + }, + 'author': COMMITTER[0], + }, +] + +ORIGINS = [ + { + 'url': 'https://somewhere.org/den/fox', + 'type': 'git', + }, + { + 'url': 'https://overtherainbow.org/fox/den', + 'type': 'svn', + } +] + +ORIGIN_VISITS = [ + { + 'date': '2013-05-07T04:20:39.369271+00:00', + }, + { + 'date': '2018-11-27T17:20:39.000000+00:00', + } +] + + +class JournalPublisherTest(JournalPublisher): + """A journal publisher which override the default configuration + parsing setup. + + """ + def _prepare_storage(self, config): + super()._prepare_storage(config) + self.storage.content_add({'data': b'42', **c} for c in CONTENTS) + self.storage.revision_add(REVISIONS) + self.storage.release_add(RELEASES) + origins = self.storage.origin_add(ORIGINS) + origin_visits = [] + for i, ov in enumerate(ORIGIN_VISITS): + origin_id = origins[i]['id'] + ov = self.storage.origin_visit_add(origin_id, ov['date']) + origin_visits.append(ov) + self.origins = origins + self.origin_visits = origin_visits + + +KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT') +KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + '/kafka' +if not os.path.exists(KAFKA_ROOT): + msg = ('Development error: %s must exist and target an ' + 'existing kafka installation' % KAFKA_ROOT) + raise ValueError(msg) + +KAFKA_SCRIPTS = Path(KAFKA_ROOT) / 'bin' + +KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') +ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') + + +# Those defines fixtures +zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN) +kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc') + +logger = logging.getLogger('kafka') +logger.setLevel(logging.WARN) + + +@pytest.fixture +def producer_to_publisher( + request: 'SubRequest', # noqa F821 + kafka_server: Tuple[Popen, int]) -> KafkaProducer: # noqa + """Producer to send message to the publisher's consumer. + + """ + _, port = kafka_server + producer = KafkaProducer( + bootstrap_servers='localhost:{}'.format(port), + key_serializer=key_to_kafka, + value_serializer=key_to_kafka, + client_id=TEST_CONFIG['consumer_id'], + ) + return producer + + +# pytest fixture (no need for the annotation though or else it breaks) +consumer_from_publisher = make_kafka_consumer( + 'kafka_server', + seek_to_beginning=True, + key_deserializer=kafka_to_key, + value_deserializer=kafka_to_value, + auto_offset_reset='earliest', + enable_auto_commit=True, + client_id=TEST_CONFIG['publisher_id'], + kafka_topics=['dummy']) # will be overriden during test setup + + +def publisher(kafka_server: Tuple[Popen, int], + config: Dict) -> JournalPublisher: + # consumer and producer of the publisher needs to discuss with the + # right instance + _, port = kafka_server + config['brokers'] = ['localhost:{}'.format(port)] + return JournalPublisherTest(config) diff --git a/swh/journal/tests/test_publisher.py b/swh/journal/tests/test_publisher.py --- a/swh/journal/tests/test_publisher.py +++ b/swh/journal/tests/test_publisher.py @@ -1,223 +1,105 @@ -# Copyright (C) 2018 The Software Heritage developers +# Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import unittest -from swh.model.hashutil import hash_to_bytes +from kafka import KafkaConsumer, KafkaProducer +from subprocess import Popen +from typing import Tuple, Text +from swh.journal.serializers import value_to_kafka, kafka_to_value + from swh.journal.publisher import JournalPublisher -from swh.storage.in_memory import Storage - -CONTENTS = [ - { - 'length': 3, - 'sha1': hash_to_bytes( - '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), - 'sha1_git': b'foo', - 'blake2s256': b'bar', - 'sha256': b'baz', - 'status': 'visible', - }, -] - -COMMITTER = [ - { - 'id': 1, - 'fullname': 'foo', - }, - { - 'id': 2, - 'fullname': 'bar', - } -] - -REVISIONS = [ - { - 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), - 'message': b'hello', - 'date': { - 'timestamp': { - 'seconds': 1234567891, - 'microseconds': 0, - }, - 'offset': 120, - 'negative_utc': None, - }, - 'committer': COMMITTER[0], - 'author': COMMITTER[0], - 'committer_date': None, - }, - { - 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), - 'message': b'hello again', - 'date': { - 'timestamp': { - 'seconds': 1234567892, - 'microseconds': 0, - }, - 'offset': 120, - 'negative_utc': None, - }, - 'committer': COMMITTER[1], - 'author': COMMITTER[1], - 'committer_date': None, - }, -] - -RELEASES = [ - { - 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), - 'name': b'v0.0.1', - 'date': { - 'timestamp': { - 'seconds': 1234567890, - 'microseconds': 0, - }, - 'offset': 120, - 'negative_utc': None, - }, - 'author': COMMITTER[0], - }, -] - -ORIGINS = [ - { - 'url': 'https://somewhere.org/den/fox', - 'type': 'git', - }, - { - 'url': 'https://overtherainbow.org/fox/den', - 'type': 'svn', - } -] - -ORIGIN_VISITS = [ - { - 'date': '2013-05-07T04:20:39.369271+00:00', - }, - { - 'date': '2018-11-27T17:20:39.000000+00:00', - } -] - -TEST_CONFIG = { - 'brokers': ['localhost'], - 'temporary_prefix': 'swh.tmp_journal.new', - 'final_prefix': 'swh.journal.objects', - 'consumer_id': 'swh.journal.test.publisher', - 'publisher_id': 'swh.journal.test.publisher', - 'object_types': ['content'], - 'max_messages': 3, + +from .conftest import ( + TEST_CONFIG, CONTENTS, REVISIONS, RELEASES, publisher +) + + +OBJECT_TYPE_KEYS = { + 'content': (b'sha1', CONTENTS), + 'revision': (b'id', REVISIONS), + 'release': (b'id', RELEASES), } -class JournalPublisherTest(JournalPublisher): - def _prepare_storage(self, config): - self.storage = Storage() - self.storage.content_add({'data': b'42', **c} for c in CONTENTS) - self.storage.revision_add(REVISIONS) - self.storage.release_add(RELEASES) - origins = self.storage.origin_add(ORIGINS) - origin_visits = [] - for i, ov in enumerate(ORIGIN_VISITS): - origin_id = origins[i]['id'] - ov = self.storage.origin_visit_add(origin_id, ov['date']) - origin_visits.append(ov) - self.origins = origins - self.origin_visits = origin_visits - - print("publisher.origin-visits", self.origin_visits) - - -class JournalPublisherNoKafkaInMemoryStorage(JournalPublisherTest): - """A journal publisher with: - - no kafka dependency - - in-memory storage +def assert_publish(publisher: JournalPublisher, + consumer_from_publisher: KafkaConsumer, + producer_to_publisher: KafkaProducer, + object_type: Text): + """Assert that publishing object in the publisher is reified and + published in topics. + + Args: + journal_publisher (JournalPublisher): publisher to read and write data + kafka_consumer (KafkaConsumer): To read data from the publisher + kafka_producer (KafkaProducer): To send data to the publisher + + """ + # object type's id label key + object_key_id, expected_objects = OBJECT_TYPE_KEYS[object_type] + # objects to send to the publisher + objects = [{object_key_id: c[object_key_id.decode()]} + for c in expected_objects] + + # send message to the publisher + for obj in objects: + producer_to_publisher.send( + '%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type), + obj + ) + + nb_messages = len(objects) + + # publisher should poll 1 message and send 1 reified object + publisher.poll(max_messages=nb_messages) + + # then (client reads from the messages from output topic) + msgs = [] + for num, msg in enumerate(consumer_from_publisher): + msgs.append((msg.topic, msg.key, msg.value)) + + expected_topic = '%s.%s' % (TEST_CONFIG['final_prefix'], object_type) + assert expected_topic == msg.topic + + expected_key = objects[num][object_key_id] + assert expected_key == msg.key + + # Transformation is needed due to our back and forth + # serialization to kafka + expected_value = kafka_to_value(value_to_kafka(expected_objects[num])) + assert expected_value == msg.value + + +def test_publish( + kafka_server: Tuple[Popen, int], + consumer_from_publisher: KafkaConsumer, + producer_to_publisher: KafkaProducer): """ + Reading from and writing to the journal publisher should work (contents) - def _prepare_journal(self, config): - """No journal for now - - """ - pass - - -class TestPublisher(unittest.TestCase): - def setUp(self): - self.publisher = JournalPublisherNoKafkaInMemoryStorage(TEST_CONFIG) - self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] - self.revisions = [{b'id': c['id']} for c in REVISIONS] - self.releases = [{b'id': c['id']} for c in RELEASES] - # those needs id generation from the storage - # so initialization is different than other entities - self.origins = [{b'url': o['url'], - b'type': o['type']} - for o in self.publisher.origins] - self.origin_visits = [{b'origin': ov['origin'], - b'visit': ov['visit']} - for ov in self.publisher.origin_visits] - # full objects - storage = self.publisher.storage - ovs = [] - for ov in self.origin_visits: - _ov = storage.origin_visit_get_by( - ov[b'origin'], ov[b'visit']) - _ov['date'] = str(_ov['date']) - ovs.append(_ov) - self.expected_origin_visits = ovs - - def test_process_contents(self): - actual_contents = self.publisher.process_contents(self.contents) - expected_contents = [(c['sha1'], c) for c in CONTENTS] - self.assertEqual(actual_contents, expected_contents) - - def test_process_revisions(self): - actual_revisions = self.publisher.process_revisions(self.revisions) - expected_revisions = [(c['id'], c) for c in REVISIONS] - self.assertEqual(actual_revisions, expected_revisions) - - def test_process_releases(self): - actual_releases = self.publisher.process_releases(self.releases) - expected_releases = [(c['id'], c) for c in RELEASES] - self.assertEqual(actual_releases, expected_releases) - - def test_process_origins(self): - actual_origins = self.publisher.process_origins(self.origins) - expected_origins = [({'url': o[b'url'], 'type': o[b'type']}, - {'url': o[b'url'], 'type': o[b'type']}) - for o in self.origins] - self.assertEqual(actual_origins, expected_origins) - - def test_process_origin_visits(self): - actual_ovs = self.publisher.process_origin_visits(self.origin_visits) - expected_ovs = [((ov['origin'], ov['visit']), ov) - for ov in self.expected_origin_visits] - self.assertEqual(actual_ovs, expected_ovs) - - def test_process_objects(self): - messages = { - 'content': self.contents, - 'revision': self.revisions, - 'release': self.releases, - 'origin': self.origins, - 'origin_visit': self.origin_visits, - } - - actual_objects = self.publisher.process_objects(messages) - - expected_contents = [(c['sha1'], c) for c in CONTENTS] - expected_revisions = [(c['id'], c) for c in REVISIONS] - expected_releases = [(c['id'], c) for c in RELEASES] - expected_origins = [(o, o) for o in ORIGINS] - expected_ovs = [((ov['origin'], ov['visit']), ov) - for ov in self.expected_origin_visits] - expected_objects = { - 'content': expected_contents, - 'revision': expected_revisions, - 'release': expected_releases, - 'origin': expected_origins, - 'origin_visit': expected_ovs, - } - - self.assertEqual(actual_objects, expected_objects) + Args: + journal_publisher (JournalPublisher): publisher to read and write data + kafka_consumer (KafkaConsumer): To read data from the publisher + kafka_producer (KafkaProducer): To send data to the publisher + + """ + # retrieve the object types we want to test + object_types = OBJECT_TYPE_KEYS.keys() + # synchronize the publisher's config with the test + conf = TEST_CONFIG.copy() + conf['object_types'] = object_types + # instantiate the publisher (not a fixture due to initialization) + p = publisher(kafka_server, config=conf) + + # Subscribe to the publisher's output topics + consumer_from_publisher.subscribe( + topics=['%s.%s' % (conf['final_prefix'], object_type) + for object_type in object_types]) + + # Now for each object type, we'll send data to the publisher and + # check that data is indeed fetched and reified in the publisher's + # output topics + for object_type in object_types: + assert_publish(p, consumer_from_publisher, + producer_to_publisher, object_type) diff --git a/swh/journal/tests/test_publisher.py b/swh/journal/tests/test_publisher_no_kafka.py copy from swh/journal/tests/test_publisher.py copy to swh/journal/tests/test_publisher_no_kafka.py --- a/swh/journal/tests/test_publisher.py +++ b/swh/journal/tests/test_publisher_no_kafka.py @@ -1,140 +1,29 @@ -# Copyright (C) 2018 The Software Heritage developers +# Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import pytest import unittest -from swh.model.hashutil import hash_to_bytes -from swh.journal.publisher import JournalPublisher -from swh.storage.in_memory import Storage - -CONTENTS = [ - { - 'length': 3, - 'sha1': hash_to_bytes( - '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), - 'sha1_git': b'foo', - 'blake2s256': b'bar', - 'sha256': b'baz', - 'status': 'visible', - }, -] - -COMMITTER = [ - { - 'id': 1, - 'fullname': 'foo', - }, - { - 'id': 2, - 'fullname': 'bar', - } -] - -REVISIONS = [ - { - 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), - 'message': b'hello', - 'date': { - 'timestamp': { - 'seconds': 1234567891, - 'microseconds': 0, - }, - 'offset': 120, - 'negative_utc': None, - }, - 'committer': COMMITTER[0], - 'author': COMMITTER[0], - 'committer_date': None, - }, - { - 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), - 'message': b'hello again', - 'date': { - 'timestamp': { - 'seconds': 1234567892, - 'microseconds': 0, - }, - 'offset': 120, - 'negative_utc': None, - }, - 'committer': COMMITTER[1], - 'author': COMMITTER[1], - 'committer_date': None, - }, -] - -RELEASES = [ - { - 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), - 'name': b'v0.0.1', - 'date': { - 'timestamp': { - 'seconds': 1234567890, - 'microseconds': 0, - }, - 'offset': 120, - 'negative_utc': None, - }, - 'author': COMMITTER[0], - }, -] - -ORIGINS = [ - { - 'url': 'https://somewhere.org/den/fox', - 'type': 'git', - }, - { - 'url': 'https://overtherainbow.org/fox/den', - 'type': 'svn', - } -] - -ORIGIN_VISITS = [ - { - 'date': '2013-05-07T04:20:39.369271+00:00', - }, - { - 'date': '2018-11-27T17:20:39.000000+00:00', - } -] - -TEST_CONFIG = { - 'brokers': ['localhost'], - 'temporary_prefix': 'swh.tmp_journal.new', - 'final_prefix': 'swh.journal.objects', - 'consumer_id': 'swh.journal.test.publisher', - 'publisher_id': 'swh.journal.test.publisher', - 'object_types': ['content'], - 'max_messages': 3, -} - - -class JournalPublisherTest(JournalPublisher): - def _prepare_storage(self, config): - self.storage = Storage() - self.storage.content_add({'data': b'42', **c} for c in CONTENTS) - self.storage.revision_add(REVISIONS) - self.storage.release_add(RELEASES) - origins = self.storage.origin_add(ORIGINS) - origin_visits = [] - for i, ov in enumerate(ORIGIN_VISITS): - origin_id = origins[i]['id'] - ov = self.storage.origin_visit_add(origin_id, ov['date']) - origin_visits.append(ov) - self.origins = origins - self.origin_visits = origin_visits - - print("publisher.origin-visits", self.origin_visits) +from .conftest import ( + JournalPublisherTest, TEST_CONFIG, + CONTENTS, REVISIONS, RELEASES, ORIGINS +) +from swh.journal.publisher import MANDATORY_KEYS class JournalPublisherNoKafkaInMemoryStorage(JournalPublisherTest): """A journal publisher with: - no kafka dependency - in-memory storage + """ + def check_config(self, config): + """No need to check the configuration here as we do not use kafka + + """ + pass def _prepare_journal(self, config): """No journal for now @@ -143,7 +32,10 @@ pass -class TestPublisher(unittest.TestCase): +class TestPublisherNoKafka(unittest.TestCase): + """This tests only the part not using any kafka instance + + """ def setUp(self): self.publisher = JournalPublisherNoKafkaInMemoryStorage(TEST_CONFIG) self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] @@ -221,3 +113,39 @@ } self.assertEqual(actual_objects, expected_objects) + + +class JournalPublisherCheckTest(JournalPublisherTest): + """A journal publisher with: + - no kafka dependency + - in-memory storage + + """ + def _prepare_journal(self, config): + """No journal for now + + """ + pass + + +def test_check_config_ok(): + """Instantiate a publisher with the right config is fine + + """ + publisher = JournalPublisherCheckTest(TEST_CONFIG) + assert publisher is not None + + +def test_check_config_ko(): + """Instantiate a publisher with the wrong config should raise + + """ + for k in MANDATORY_KEYS: + conf = TEST_CONFIG.copy() + conf.pop(k) + with pytest.raises(ValueError) as e: + JournalPublisherCheckTest(conf) + + error = ('Configuration error: The following keys must be' + ' provided: %s' % (','.join([k]), )) + assert e.value.args[0] == error diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -2,6 +2,7 @@ envlist=flake8,py3 [testenv:py3] +passenv=SWH_KAFKA_ROOT deps = .[testing] pytest-cov