diff --git a/bin/install-kafka.sh b/bin/install-kafka.sh deleted file mode 100755 --- a/bin/install-kafka.sh +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env bash - -set -xe - -SCALA_VERSION=2.12 -KAFKA_VERSION=2.4.0 -KAFKA_CHECKSUM=53b52f86ea56c9fac62046524f03f75665a089ea2dae554aefe3a3d2694f2da88b5ba8725d8be55f198ba80695443559ed9de7c0b2a2817f7a6141008ff79f49 - -KAFKA_APP="kafka_${SCALA_VERSION}-${KAFKA_VERSION}" -TARBALL="${KAFKA_APP}.tgz" -URL="http://apache.mirrors.ovh.net/ftp.apache.org/dist/kafka/${KAFKA_VERSION}/${TARBALL}" -CHECKSUMS="${TARBALL}.sha512" -KAFKA_ROOT_DIR=swh/journal/tests -KAFKA_DIR="${KAFKA_ROOT_DIR}/${KAFKA_APP}" -KAFKA_LINK="${KAFKA_ROOT_DIR}/kafka" - -case $1 in - "install") - echo "Kafka installation started." - if [ ! -f $TARBALL ]; then - echo "Kafka download" - wget $URL - echo "${KAFKA_CHECKSUM} ${TARBALL}" > $CHECKSUMS - sha512sum -c $CHECKSUMS - - if [ $? -ne 0 ]; then - echo "Kafka download: failed to retrieve ${TARBALL}"; - exit 1 - fi - echo "Kafka download done" - fi - - if [ ! -d $KAFKA_DIR ]; then - echo "Kafka extraction" - tar xvf $TARBALL -C $KAFKA_ROOT_DIR - pushd $KAFKA_ROOT_DIR && ln -nsf $KAFKA_APP kafka && popd - echo "Kafka extraction done" - fi - echo "Kafka installation done. Kafka is installed at $KAFKA_DIR" - ;; - "clean") - echo "Kafka cleanup started." - [ -d $KAFKA_DIR ] && rm -rf $KAFKA_DIR - [ -L $KAFKA_LINK ] && rm $KAFKA_LINK - echo "Kafka cleanup done." - ;; - "clean-cache") - echo "Kafka cleanup cache started." - [ -f $TARBALL ] && rm $TARBALL - [ -f $CHECKSUMS ] && rm $CHECKSUMS - echo "Kafka cleanup cache done." - ;; - *) - echo "Unknown command, do nothing" - exit 1; -esac diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,4 +1,3 @@ pytest swh.model >= 0.0.34 -pytest-kafka hypothesis diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -4,26 +4,15 @@ # See top-level LICENSE file for more information import datetime -import os import pytest import logging import random import string -from confluent_kafka import Consumer -from confluent_kafka.admin import AdminClient, ConfigResource +from confluent_kafka import Consumer, Producer from hypothesis.strategies import one_of -from subprocess import Popen -from typing import Any, Dict, Iterator, List, Tuple - -from pathlib import Path -from pytest_kafka import ( - make_zookeeper_process, - make_kafka_server, - KAFKA_SERVER_CONFIG_TEMPLATE, - ZOOKEEPER_CONFIG_TEMPLATE, -) +from typing import Any, Dict, Iterator, List from swh.model import hypothesis_strategies as strategies from swh.model.hashutil import MultiHash, hash_to_bytes @@ -172,41 +161,6 @@ TEST_OBJECTS[object_type] = converted_objects -KAFKA_ROOT = os.environ.get("SWH_KAFKA_ROOT") -KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + "/kafka" -if not os.path.exists(KAFKA_ROOT): - msg = ( - "Development error: %s must exist and target an " - "existing kafka installation" % KAFKA_ROOT - ) - raise ValueError(msg) - -KAFKA_SCRIPTS = Path(KAFKA_ROOT) / "bin" - -KAFKA_BIN = str(KAFKA_SCRIPTS / "kafka-server-start.sh") -ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / "zookeeper-server-start.sh") - -ZK_CONFIG_TEMPLATE = ZOOKEEPER_CONFIG_TEMPLATE + "\nadmin.enableServer=false\n" -KAFKA_CONFIG_TEMPLATE = KAFKA_SERVER_CONFIG_TEMPLATE + "\nmessage.max.bytes=104857600\n" - -# Those defines fixtures -zookeeper_proc = make_zookeeper_process( - ZOOKEEPER_BIN, zk_config_template=ZK_CONFIG_TEMPLATE, scope="session" -) -os.environ[ - "KAFKA_LOG4J_OPTS" -] = "-Dlog4j.configuration=file:%s/log4j.properties" % os.path.dirname(__file__) -session_kafka_server = make_kafka_server( - KAFKA_BIN, - "zookeeper_proc", - kafka_config_template=KAFKA_CONFIG_TEMPLATE, - scope="session", -) - -kafka_logger = logging.getLogger("kafka") -kafka_logger.setLevel(logging.WARN) - - @pytest.fixture(scope="function") def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" @@ -220,79 +174,21 @@ @pytest.fixture(scope="session") -def kafka_admin_client(session_kafka_server: Tuple[Popen, int]) -> AdminClient: - return AdminClient({"bootstrap.servers": "localhost:%s" % session_kafka_server[1]}) +def kafka_server() -> Iterator[str]: + p = Producer({"test.mock.num.brokers": "1"}) + metadata = p.list_topics() + brokers = [str(broker) for broker in metadata.brokers.values()] + assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" -@pytest.fixture(scope="function") -def kafka_server_config_overrides() -> Dict[str, str]: - return {} + broker_connstr, broker_id = brokers[0].split("/") + ip, port_str = broker_connstr.split(":") + assert ip == "127.0.0.1" + assert int(port_str) + yield broker_connstr -@pytest.fixture(scope="function") -def kafka_server( - session_kafka_server: Tuple[Popen, int], - kafka_admin_client: AdminClient, - kafka_server_config_overrides: Dict[str, str], -) -> Iterator[Tuple[Popen, int]]: - # No overrides, we can just return the original broker connection - if not kafka_server_config_overrides: - yield session_kafka_server - return - - # This is the minimal operation that the kafka_admin_client gives to - # retrieve the cluster metadata, which we need to get the numeric id of the - # broker spawned by pytest_kafka. - metadata = kafka_admin_client.list_topics("__consumer_offsets") - broker_ids = [str(broker) for broker in metadata.brokers.keys()] - assert len(broker_ids) == 1, "More than one broker found in the kafka cluster?!" - - # Pull the current broker configuration. describe_configs and alter_configs - # generate a dict containing one concurrent.future per queried - # ConfigResource, hence the use of .result() - broker = ConfigResource("broker", broker_ids[0]) - futures = kafka_admin_client.describe_configs([broker]) - original_config = futures[broker].result() - - # Gather the list of settings we need to change in the broker - # ConfigResource, and their original values in the to_restore dict - to_restore = {} - for key, new_value in kafka_server_config_overrides.items(): - if key not in original_config: - raise ValueError(f"Cannot override unknown configuration {key}") - orig_value = original_config[key].value - if orig_value == new_value: - continue - if original_config[key].is_read_only: - raise ValueError(f"Cannot override read-only configuration {key}") - - broker.set_config(key, new_value) - to_restore[key] = orig_value - - # to_restore will be empty if all the config "overrides" are equal to the - # original value. No need to wait for a config alteration if that's the - # case. The result() will raise a KafkaException if the settings change - # failed. - if to_restore: - futures = kafka_admin_client.alter_configs([broker]) - try: - futures[broker].result() - except Exception: - raise - - yield session_kafka_server - - # Now we can restore the old setting values. Again, the result() will raise - # a KafkaException if the settings change failed. - if to_restore: - for key, orig_value in to_restore.items(): - broker.set_config(key, orig_value) - - futures = kafka_admin_client.alter_configs([broker]) - try: - futures[broker].result() - except Exception: - raise + p.flush() TEST_CONFIG = { @@ -304,29 +200,27 @@ @pytest.fixture -def test_config(kafka_server: Tuple[Popen, int], kafka_prefix: str): +def test_config(kafka_server: str, kafka_prefix: str): """Test configuration needed for producer/consumer """ - _, port = kafka_server return { **TEST_CONFIG, - "brokers": ["127.0.0.1:{}".format(port)], + "brokers": [kafka_server], "prefix": kafka_prefix + ".swh.journal.objects", } @pytest.fixture def consumer( - kafka_server: Tuple[Popen, int], test_config: Dict, kafka_consumer_group: str, + kafka_server: str, test_config: Dict, kafka_consumer_group: str, ) -> Consumer: """Get a connected Kafka consumer. """ - _, kafka_port = kafka_server consumer = Consumer( { - "bootstrap.servers": "127.0.0.1:{}".format(kafka_port), + "bootstrap.servers": kafka_server, "auto.offset.reset": "earliest", "enable.auto.commit": True, "group.id": kafka_consumer_group, diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -9,8 +9,7 @@ import logging import re import tempfile -from subprocess import Popen -from typing import Any, Dict, Tuple +from typing import Any, Dict from unittest.mock import patch from click.testing import CliRunner @@ -69,17 +68,13 @@ def test_replay( - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" producer = Producer( { - "bootstrap.servers": "localhost:{}".format(kafka_port), + "bootstrap.servers": kafka_server, "client.id": "test-producer", "acks": "all", } @@ -103,7 +98,7 @@ "--stop-after-objects", "1", journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, @@ -138,10 +133,10 @@ NUM_CONTENTS = 10 -def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages): +def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages): producer = Producer( { - "bootstrap.servers": "127.0.0.1:{}".format(kafka_port), + "bootstrap.servers": kafka_server, "client.id": "test-producer", "acks": "all", } @@ -169,19 +164,18 @@ storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, @@ -202,13 +196,12 @@ storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, caplog, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) caplog.set_level(logging.DEBUG, "swh.journal.replay") @@ -219,7 +212,7 @@ "--stop-after-objects", str(NUM_CONTENTS), journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, @@ -245,13 +238,12 @@ storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, caplog, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) # Setup log capture to fish the consumer settings out of the log messages caplog.set_level(logging.DEBUG, "swh.journal.client") @@ -262,7 +254,7 @@ str(NUM_CONTENTS), env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"}, journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, @@ -296,12 +288,11 @@ storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) excluded_contents = list(contents)[0::2] # picking half of them with tempfile.NamedTemporaryFile(mode="w+b") as fd: @@ -316,7 +307,7 @@ "--exclude-sha1-file", fd.name, journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, @@ -349,16 +340,15 @@ storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, check_dst: bool, expected_copied: int, expected_in_dst: int, caplog, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: @@ -374,7 +364,7 @@ str(NUM_CONTENTS), "--check-dst" if check_dst else "--no-check-dst", journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, @@ -433,13 +423,12 @@ storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, monkeypatch_retry_sleep, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) failures = {} for i, (sha1, content) in enumerate(contents.items()): @@ -458,7 +447,7 @@ str(NUM_CONTENTS), "--check-dst", journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, @@ -478,14 +467,13 @@ storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, caplog, monkeypatch_retry_sleep, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) add_failures = {} get_failures = {} @@ -532,7 +520,7 @@ "--stop-after-objects", str(NUM_CONTENTS), journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, @@ -571,13 +559,12 @@ storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, caplog, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) num_contents_deleted = 5 contents_deleted = set() @@ -596,7 +583,7 @@ "--stop-after-objects", str(NUM_CONTENTS), journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -3,8 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from subprocess import Popen -from typing import Dict, List, Tuple +from typing import Dict, List from unittest.mock import MagicMock from confluent_kafka import Producer @@ -17,15 +16,12 @@ from swh.journal.serializers import key_to_kafka, value_to_kafka -def test_client( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int] -): - (_, port) = kafka_server +def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): kafka_prefix += ".swh.journal.objects" producer = Producer( { - "bootstrap.servers": "localhost:{}".format(port), + "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } @@ -42,7 +38,7 @@ producer.flush() client = JournalClient( - brokers="localhost:%d" % kafka_server[1], + brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, @@ -53,15 +49,12 @@ worker_fn.assert_called_once_with({"revision": [rev.to_dict()]}) -def test_client_eof( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int] -): - (_, port) = kafka_server +def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): kafka_prefix += ".swh.journal.objects" producer = Producer( { - "bootstrap.servers": "localhost:{}".format(port), + "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } @@ -78,7 +71,7 @@ producer.flush() client = JournalClient( - brokers="localhost:%d" % kafka_server[1], + brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=None, @@ -93,12 +86,8 @@ @pytest.mark.parametrize("batch_size", [1, 5, 100]) def test_client_batch_size( - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - batch_size: int, + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int, ): - (_, port) = kafka_server kafka_prefix += ".swh.journal.objects" num_objects = 2 * batch_size + 1 @@ -106,7 +95,7 @@ producer = Producer( { - "bootstrap.servers": "localhost:{}".format(port), + "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } @@ -125,7 +114,7 @@ producer.flush() client = JournalClient( - brokers=["localhost:%d" % kafka_server[1]], + brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=num_objects, @@ -141,4 +130,8 @@ client.process(worker_fn) - assert collected_output == [content.to_dict() for content in contents] + expected_output = [content.to_dict() for content in contents] + assert len(collected_output) == len(expected_output) + + for output in collected_output: + assert output in expected_output diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -5,18 +5,15 @@ from collections import defaultdict -from confluent_kafka import Consumer, Producer, KafkaException - import pytest -from subprocess import Popen -from typing import List, Tuple +from confluent_kafka import Consumer, Producer, KafkaException from swh.storage import get_storage from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError -from swh.model.model import Directory, DirectoryEntry, Origin, OriginVisit +from swh.model.model import Directory, Origin, OriginVisit from .conftest import TEST_OBJECTS, TEST_OBJECT_DICTS @@ -79,15 +76,11 @@ assert value in received_values -def test_kafka_writer( - kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer -): +def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer): kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriter( - brokers=[f"localhost:{kafka_server[1]}"], - client_id="kafka_writer", - prefix=kafka_prefix, + brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, ) expected_messages = 0 @@ -100,14 +93,12 @@ assert_all_objects_consumed(consumed_messages) -def test_storage_direct_writer( - kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer -): +def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): kafka_prefix += ".swh.journal.objects" writer_config = { "cls": "kafka", - "brokers": ["localhost:%d" % kafka_server[1]], + "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, } @@ -151,150 +142,57 @@ assert_all_objects_consumed(consumed_messages) -@pytest.fixture(scope="session") -def large_directories() -> List[Directory]: - dir_sizes = [1 << n for n in range(21)] # 2**0 = 1 to 2**20 = 1024 * 1024 - - dir_entries = [ - DirectoryEntry( - name=("%09d" % i).encode(), - type="file", - perms=0o100644, - target=b"\x00" * 20, - ) - for i in range(max(dir_sizes)) - ] - - return [Directory(entries=dir_entries[:size]) for size in dir_sizes] - - -def test_write_large_objects( - kafka_prefix: str, - kafka_server: Tuple[Popen, int], - consumer: Consumer, - large_directories: List[Directory], +def test_write_delivery_failure( + kafka_prefix: str, kafka_server: str, consumer: Consumer ): - kafka_prefix += ".swh.journal.objects" - - # Needed as there is no directories in TEST_OBJECT_DICTS, the consumer - # isn't autosubscribed to directories. - consumer.subscribe([kafka_prefix + ".directory"]) - - writer = KafkaJournalWriter( - brokers=["localhost:%d" % kafka_server[1]], - client_id="kafka_writer", - prefix=kafka_prefix, - ) + class MockKafkaError: + """A mocked kafka error""" - writer.write_additions("directory", large_directories) - - consumed_messages = consume_messages(consumer, kafka_prefix, len(large_directories)) - - for dir, message in zip(large_directories, consumed_messages["directory"]): - (dir_id, consumed_dir) = message - assert dir_id == dir.id - assert consumed_dir == dir.to_dict() - - -def dir_message_size(directory: Directory) -> int: - """Estimate the size of a directory kafka message. - - We could just do it with `len(value_to_kafka(directory.to_dict()))`, - but the serialization is a substantial chunk of the test time here. - - """ - n_entries = len(directory.entries) - return ( - # fmt: off - 0 - + 1 # header of a 2-element fixmap - + 1 + 2 # fixstr("id") - + 2 + 20 # bin8(directory.id of length 20) - + 1 + 7 # fixstr("entries") - + 4 # array header - + n_entries - * ( - 0 - + 1 # header of a 4-element fixmap - + 1 + 6 # fixstr("target") - + 2 + 20 # bin8(target of length 20) - + 1 + 4 # fixstr("name") - + 2 + 9 # bin8(name of length 9) - + 1 + 5 # fixstr("perms") - + 5 # uint32(perms) - + 1 + 4 # fixstr("type") - + 1 + 3 # fixstr(type) - ) - # fmt: on - ) + def str(self): + return "Mocked Kafka Error" + def name(self): + return "SWH_MOCK_ERROR" -SMALL_MESSAGE_SIZE = 1024 * 1024 + class KafkaJournalWriterFailDelivery(KafkaJournalWriter): + """A journal writer which always fails delivering messages""" + def _on_delivery(self, error, message): + """Replace the inbound error with a fake delivery error""" + super()._on_delivery(MockKafkaError(), message) -@pytest.mark.parametrize( - "kafka_server_config_overrides", [{"message.max.bytes": str(SMALL_MESSAGE_SIZE)}] -) -def test_fail_write_large_objects( - kafka_prefix: str, - kafka_server: Tuple[Popen, int], - consumer: Consumer, - large_directories: List[Directory], -): kafka_prefix += ".swh.journal.objects" - - # Needed as there is no directories in TEST_OBJECT_DICTS, the consumer - # isn't autosubscribed to directories. - consumer.subscribe([kafka_prefix + ".directory"]) - - writer = KafkaJournalWriter( - brokers=["localhost:%d" % kafka_server[1]], - client_id="kafka_writer", - prefix=kafka_prefix, + writer = KafkaJournalWriterFailDelivery( + brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, ) - expected_dirs = [] - - for directory in large_directories: - if dir_message_size(directory) < SMALL_MESSAGE_SIZE: - # No error; write anyway, but continue - writer.write_addition("directory", directory) - expected_dirs.append(directory) - continue - - with pytest.raises(KafkaDeliveryError) as exc: - writer.write_addition("directory", directory) - - assert "Failed deliveries" in exc.value.message - assert len(exc.value.delivery_failures) == 1 - - object_type, key, msg, code = exc.value.delivery_failures[0] - - assert object_type == "directory" - assert key == directory.id - assert code == "MSG_SIZE_TOO_LARGE" - - consumed_messages = consume_messages(consumer, kafka_prefix, len(expected_dirs)) + empty_dir = Directory(entries=[]) + with pytest.raises(KafkaDeliveryError) as exc: + writer.write_addition("directory", empty_dir) - for dir, message in zip(expected_dirs, consumed_messages["directory"]): - (dir_id, consumed_dir) = message - assert dir_id == dir.id - assert consumed_dir == dir.to_dict() + assert "Failed deliveries" in exc.value.message + assert len(exc.value.delivery_failures) == 1 + delivery_failure = exc.value.delivery_failures[0] + assert delivery_failure.key == empty_dir.id + assert delivery_failure.code == "SWH_MOCK_ERROR" def test_write_delivery_timeout( - kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer + kafka_prefix: str, kafka_server: str, consumer: Consumer ): produced = [] class MockProducer(Producer): + """A kafka producer which pretends to produce messages, but never sends any + delivery acknowledgements""" + def produce(self, **kwargs): produced.append(kwargs) kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriter( - brokers=["localhost:%d" % kafka_server[1]], + brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, flush_timeout=1, diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -7,8 +7,7 @@ import functools import logging import random -from subprocess import Popen -from typing import Dict, List, Tuple +from typing import Dict, List import dateutil import pytest @@ -36,10 +35,7 @@ def test_storage_play( - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - caplog, + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, ): """Optimal replayer scenario. @@ -48,14 +44,13 @@ - replayer consumes objects from the topic and replay them """ - (_, port) = kafka_server kafka_prefix += ".swh.journal.objects" storage = get_storage(**storage_config) producer = Producer( { - "bootstrap.servers": "localhost:{}".format(port), + "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } @@ -86,7 +81,7 @@ caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the storage from Kafka replayer = JournalClient( - brokers="localhost:%d" % kafka_server[1], + brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, @@ -138,10 +133,7 @@ def test_storage_play_with_collision( - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - caplog, + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, ): """Another replayer scenario with collisions. @@ -151,14 +143,13 @@ - This drops the colliding contents from the replay when detected """ - (_, port) = kafka_server kafka_prefix += ".swh.journal.objects" storage = get_storage(**storage_config) producer = Producer( { - "bootstrap.servers": "localhost:{}".format(port), + "bootstrap.servers": kafka_server, "client.id": "test producer", "enable.idempotence": "true", } @@ -199,7 +190,7 @@ caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the storage from Kafka replayer = JournalClient( - brokers="localhost:%d" % kafka_server[1], + brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -2,14 +2,11 @@ envlist=black,flake8,mypy,py3 [testenv] -passenv=SWH_KAFKA_ROOT extras = testing deps = pytest-cov dev: pdbpp -setenv = - SWH_KAFKA_ROOT = {env:SWH_KAFKA_ROOT:swh/journal/tests/kafka} commands = pytest --cov={envsitepackagesdir}/swh/journal \ {envsitepackagesdir}/swh/journal \