diff --git a/bin/install-kafka.sh b/bin/install-kafka.sh index 73f5ce1..79fd5e6 100755 --- a/bin/install-kafka.sh +++ b/bin/install-kafka.sh @@ -1,56 +1,56 @@ #!/usr/bin/env bash set -xe SCALA_VERSION=2.12 -KAFKA_VERSION=2.1.1 -KAFKA_CHECKSUM=a2e8168e8de6b45e8fca1f2883f0744d3c5a939b70d8a47a5428b72188501d4c2fc11bc35759f2392680d4e8ecf2fa9d0e518e77fd28393afba22194ad018b10 +KAFKA_VERSION=2.4.0 +KAFKA_CHECKSUM=53b52f86ea56c9fac62046524f03f75665a089ea2dae554aefe3a3d2694f2da88b5ba8725d8be55f198ba80695443559ed9de7c0b2a2817f7a6141008ff79f49 KAFKA_APP="kafka_${SCALA_VERSION}-${KAFKA_VERSION}" TARBALL="${KAFKA_APP}.tgz" URL="http://apache.mirrors.ovh.net/ftp.apache.org/dist/kafka/${KAFKA_VERSION}/${TARBALL}" CHECKSUMS="${TARBALL}.sha512" KAFKA_ROOT_DIR=swh/journal/tests KAFKA_DIR="${KAFKA_ROOT_DIR}/${KAFKA_APP}" KAFKA_LINK="${KAFKA_ROOT_DIR}/kafka" case $1 in "install") echo "Kafka installation started." if [ ! -f $TARBALL ]; then echo "Kafka download" wget $URL echo "${KAFKA_CHECKSUM} ${TARBALL}" > $CHECKSUMS sha512sum -c $CHECKSUMS if [ $? -ne 0 ]; then echo "Kafka download: failed to retrieve ${TARBALL}"; exit 1 fi echo "Kafka download done" fi if [ ! -d $KAFKA_DIR ]; then echo "Kafka extraction" tar xvf $TARBALL -C $KAFKA_ROOT_DIR pushd $KAFKA_ROOT_DIR && ln -nsf $KAFKA_APP kafka && popd echo "Kafka extraction done" fi echo "Kafka installation done. Kafka is installed at $KAFKA_DIR" ;; "clean") echo "Kafka cleanup started." [ -d $KAFKA_DIR ] && rm -rf $KAFKA_DIR [ -L $KAFKA_LINK ] && rm $KAFKA_LINK echo "Kafka cleanup done." ;; "clean-cache") echo "Kafka cleanup cache started." [ -f $TARBALL ] && rm $TARBALL [ -f $CHECKSUMS ] && rm $CHECKSUMS echo "Kafka cleanup cache done." ;; *) echo "Unknown command, do nothing" exit 1; esac diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index 5a67b62..35b3408 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,235 +1,238 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import logging import random import string from confluent_kafka import Consumer from subprocess import Popen from typing import Any, Dict, List, Optional, Tuple from pathlib import Path from pytest_kafka import ( - make_zookeeper_process, make_kafka_server + make_zookeeper_process, make_kafka_server, ZOOKEEPER_CONFIG_TEMPLATE, ) from swh.model.hashutil import hash_to_bytes logger = logging.getLogger(__name__) CONTENTS = [ { 'length': 3, 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': b'foo', 'blake2s256': b'bar', 'sha256': b'baz', 'status': 'visible', }, ] COMMITTERS = [ { 'fullname': b'foo', 'name': b'foo', 'email': b'', }, { 'fullname': b'bar', 'name': b'bar', 'email': b'', } ] DATES = [ { 'timestamp': { 'seconds': 1234567891, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, { 'timestamp': { 'seconds': 1234567892, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, } ] REVISIONS = [ { 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'hello', 'date': DATES[0], 'committer': COMMITTERS[0], 'author': COMMITTERS[0], 'committer_date': DATES[0], 'type': 'git', 'directory': '\x01'*20, 'synthetic': False, 'metadata': None, 'parents': [], }, { 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'hello again', 'date': DATES[1], 'committer': COMMITTERS[1], 'author': COMMITTERS[1], 'committer_date': DATES[1], 'type': 'hg', 'directory': '\x02'*20, 'synthetic': False, 'metadata': None, 'parents': [], }, ] RELEASES = [ { 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'name': b'v0.0.1', 'date': { 'timestamp': { 'seconds': 1234567890, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'author': COMMITTERS[0], 'target_type': 'revision', 'target': b'\x04'*20, 'message': b'foo', 'synthetic': False, }, ] ORIGINS = [ { 'url': 'https://somewhere.org/den/fox', }, { 'url': 'https://overtherainbow.org/fox/den', } ] ORIGIN_VISITS = [ { 'origin': ORIGINS[0]['url'], 'date': '2013-05-07 04:20:39.369271+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO 'metadata': {'foo': 'bar'}, 'type': 'git', }, { 'origin': ORIGINS[0]['url'], 'date': '2018-11-27 17:20:39+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO 'metadata': {'baz': 'qux'}, 'type': 'git', } ] # From type to tuple (id, ) OBJECT_TYPE_KEYS = { 'content': ('sha1', CONTENTS), 'revision': ('id', REVISIONS), 'release': ('id', RELEASES), 'origin': (None, ORIGINS), 'origin_visit': (None, ORIGIN_VISITS), } # type: Dict[str, Tuple[Optional[str], List[Dict[str, Any]]]] KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT') KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + '/kafka' if not os.path.exists(KAFKA_ROOT): msg = ('Development error: %s must exist and target an ' 'existing kafka installation' % KAFKA_ROOT) raise ValueError(msg) KAFKA_SCRIPTS = Path(KAFKA_ROOT) / 'bin' KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') +ZK_CONFIG_TEMPLATE = ZOOKEEPER_CONFIG_TEMPLATE + '\nadmin.enableServer=false\n' # Those defines fixtures -zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, scope='session') +zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, + zk_config_template=ZK_CONFIG_TEMPLATE, + scope='session') os.environ['KAFKA_LOG4J_OPTS'] = \ '-Dlog4j.configuration=file:%s/log4j.properties' % \ os.path.dirname(__file__) kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session') kafka_logger = logging.getLogger('kafka') kafka_logger.setLevel(logging.WARN) @pytest.fixture(scope='function') def kafka_prefix(): return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) TEST_CONFIG = { 'consumer_id': 'swh.journal.consumer', 'object_types': OBJECT_TYPE_KEYS.keys(), 'max_messages': 1, # will read 1 message and stops 'storage': {'cls': 'memory', 'args': {}}, } @pytest.fixture def test_config(kafka_server: Tuple[Popen, int], kafka_prefix: str): """Test configuration needed for producer/consumer """ _, port = kafka_server return { **TEST_CONFIG, 'brokers': ['127.0.0.1:{}'.format(port)], 'prefix': kafka_prefix + '.swh.journal.objects', } @pytest.fixture def consumer( kafka_server: Tuple[Popen, int], test_config: Dict, kafka_prefix: str, ) -> Consumer: """Get a connected Kafka consumer. """ _, kafka_port = kafka_server consumer = Consumer({ 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), 'auto.offset.reset': 'earliest', 'enable.auto.commit': True, 'group.id': "test-consumer-%s" % kafka_prefix, }) kafka_topics = [ '%s.%s' % (test_config['prefix'], object_type) for object_type in test_config['object_types'] ] consumer.subscribe(kafka_topics) yield consumer consumer.close()