diff --git a/bin/install-kafka.sh b/bin/install-kafka.sh deleted file mode 100755 index 79fd5e6..0000000 --- a/bin/install-kafka.sh +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env bash - -set -xe - -SCALA_VERSION=2.12 -KAFKA_VERSION=2.4.0 -KAFKA_CHECKSUM=53b52f86ea56c9fac62046524f03f75665a089ea2dae554aefe3a3d2694f2da88b5ba8725d8be55f198ba80695443559ed9de7c0b2a2817f7a6141008ff79f49 - -KAFKA_APP="kafka_${SCALA_VERSION}-${KAFKA_VERSION}" -TARBALL="${KAFKA_APP}.tgz" -URL="http://apache.mirrors.ovh.net/ftp.apache.org/dist/kafka/${KAFKA_VERSION}/${TARBALL}" -CHECKSUMS="${TARBALL}.sha512" -KAFKA_ROOT_DIR=swh/journal/tests -KAFKA_DIR="${KAFKA_ROOT_DIR}/${KAFKA_APP}" -KAFKA_LINK="${KAFKA_ROOT_DIR}/kafka" - -case $1 in - "install") - echo "Kafka installation started." - if [ ! -f $TARBALL ]; then - echo "Kafka download" - wget $URL - echo "${KAFKA_CHECKSUM} ${TARBALL}" > $CHECKSUMS - sha512sum -c $CHECKSUMS - - if [ $? -ne 0 ]; then - echo "Kafka download: failed to retrieve ${TARBALL}"; - exit 1 - fi - echo "Kafka download done" - fi - - if [ ! -d $KAFKA_DIR ]; then - echo "Kafka extraction" - tar xvf $TARBALL -C $KAFKA_ROOT_DIR - pushd $KAFKA_ROOT_DIR && ln -nsf $KAFKA_APP kafka && popd - echo "Kafka extraction done" - fi - echo "Kafka installation done. Kafka is installed at $KAFKA_DIR" - ;; - "clean") - echo "Kafka cleanup started." - [ -d $KAFKA_DIR ] && rm -rf $KAFKA_DIR - [ -L $KAFKA_LINK ] && rm $KAFKA_LINK - echo "Kafka cleanup done." - ;; - "clean-cache") - echo "Kafka cleanup cache started." - [ -f $TARBALL ] && rm $TARBALL - [ -f $CHECKSUMS ] && rm $CHECKSUMS - echo "Kafka cleanup cache done." - ;; - *) - echo "Unknown command, do nothing" - exit 1; -esac diff --git a/requirements-test.txt b/requirements-test.txt index 7f2dc48..c727717 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,4 +1,3 @@ pytest swh.model >= 0.0.34 -pytest-kafka hypothesis diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index e7169e2..3b89a7b 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,358 +1,252 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime -import os import pytest import logging import random import string -from confluent_kafka import Consumer -from confluent_kafka.admin import AdminClient, ConfigResource +from confluent_kafka import Consumer, Producer from hypothesis.strategies import one_of -from subprocess import Popen -from typing import Any, Dict, Iterator, List, Tuple - -from pathlib import Path -from pytest_kafka import ( - make_zookeeper_process, - make_kafka_server, - KAFKA_SERVER_CONFIG_TEMPLATE, - ZOOKEEPER_CONFIG_TEMPLATE, -) +from typing import Any, Dict, Iterator, List from swh.model import hypothesis_strategies as strategies from swh.model.hashutil import MultiHash, hash_to_bytes from swh.journal.serializers import ModelObject from swh.journal.writer.kafka import OBJECT_TYPES logger = logging.getLogger(__name__) CONTENTS = [ {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",}, ] duplicate_content1 = { "length": 4, "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), "sha1_git": b"another-foo", "blake2s256": b"another-bar", "sha256": b"another-baz", "status": "visible", } # Craft a sha1 collision duplicate_content2 = duplicate_content1.copy() sha1_array = bytearray(duplicate_content1["sha1_git"]) sha1_array[0] += 1 duplicate_content2["sha1_git"] = bytes(sha1_array) DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] COMMITTERS = [ {"fullname": b"foo", "name": b"foo", "email": b"",}, {"fullname": b"bar", "name": b"bar", "email": b"",}, ] DATES = [ { "timestamp": {"seconds": 1234567891, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, { "timestamp": {"seconds": 1234567892, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, ] REVISIONS = [ { "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), "message": b"hello", "date": DATES[0], "committer": COMMITTERS[0], "author": COMMITTERS[0], "committer_date": DATES[0], "type": "git", "directory": b"\x01" * 20, "synthetic": False, "metadata": None, "parents": [], }, { "id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"), "message": b"hello again", "date": DATES[1], "committer": COMMITTERS[1], "author": COMMITTERS[1], "committer_date": DATES[1], "type": "hg", "directory": b"\x02" * 20, "synthetic": False, "metadata": None, "parents": [], }, ] RELEASES = [ { "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), "name": b"v0.0.1", "date": { "timestamp": {"seconds": 1234567890, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, "author": COMMITTERS[0], "target_type": "revision", "target": b"\x04" * 20, "message": b"foo", "synthetic": False, }, ] ORIGINS = [ {"url": "https://somewhere.org/den/fox",}, {"url": "https://overtherainbow.org/fox/den",}, ] ORIGIN_VISITS = [ { "origin": ORIGINS[0]["url"], "date": "2013-05-07 04:20:39.369271+00:00", "snapshot": None, # TODO "status": "ongoing", # TODO "metadata": {"foo": "bar"}, "type": "git", }, { "origin": ORIGINS[0]["url"], "date": "2018-11-27 17:20:39+00:00", "snapshot": None, # TODO "status": "ongoing", # TODO "metadata": {"baz": "qux"}, "type": "git", }, ] TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = { "content": CONTENTS, "revision": REVISIONS, "release": RELEASES, "origin": ORIGINS, "origin_visit": ORIGIN_VISITS, } MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} TEST_OBJECTS: Dict[str, List[ModelObject]] = {} for object_type, objects in TEST_OBJECT_DICTS.items(): converted_objects: List[ModelObject] = [] model = MODEL_OBJECTS[object_type] for (num, obj_d) in enumerate(objects): if object_type == "origin_visit": obj_d = {**obj_d, "visit": num} elif object_type == "content": obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now()} converted_objects.append(model.from_dict(obj_d)) TEST_OBJECTS[object_type] = converted_objects -KAFKA_ROOT = os.environ.get("SWH_KAFKA_ROOT") -KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + "/kafka" -if not os.path.exists(KAFKA_ROOT): - msg = ( - "Development error: %s must exist and target an " - "existing kafka installation" % KAFKA_ROOT - ) - raise ValueError(msg) - -KAFKA_SCRIPTS = Path(KAFKA_ROOT) / "bin" - -KAFKA_BIN = str(KAFKA_SCRIPTS / "kafka-server-start.sh") -ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / "zookeeper-server-start.sh") - -ZK_CONFIG_TEMPLATE = ZOOKEEPER_CONFIG_TEMPLATE + "\nadmin.enableServer=false\n" -KAFKA_CONFIG_TEMPLATE = KAFKA_SERVER_CONFIG_TEMPLATE + "\nmessage.max.bytes=104857600\n" - -# Those defines fixtures -zookeeper_proc = make_zookeeper_process( - ZOOKEEPER_BIN, zk_config_template=ZK_CONFIG_TEMPLATE, scope="session" -) -os.environ[ - "KAFKA_LOG4J_OPTS" -] = "-Dlog4j.configuration=file:%s/log4j.properties" % os.path.dirname(__file__) -session_kafka_server = make_kafka_server( - KAFKA_BIN, - "zookeeper_proc", - kafka_config_template=KAFKA_CONFIG_TEMPLATE, - scope="session", -) - -kafka_logger = logging.getLogger("kafka") -kafka_logger.setLevel(logging.WARN) - - @pytest.fixture(scope="function") def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) @pytest.fixture(scope="function") def kafka_consumer_group(kafka_prefix: str): """Pick a random consumer group for kafka consumers on each call""" return "test-consumer-%s" % kafka_prefix @pytest.fixture(scope="session") -def kafka_admin_client(session_kafka_server: Tuple[Popen, int]) -> AdminClient: - return AdminClient({"bootstrap.servers": "localhost:%s" % session_kafka_server[1]}) +def kafka_server() -> Iterator[str]: + p = Producer({"test.mock.num.brokers": "1"}) + metadata = p.list_topics() + brokers = [str(broker) for broker in metadata.brokers.values()] + assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" -@pytest.fixture(scope="function") -def kafka_server_config_overrides() -> Dict[str, str]: - return {} + broker_connstr, broker_id = brokers[0].split("/") + ip, port_str = broker_connstr.split(":") + assert ip == "127.0.0.1" + assert int(port_str) + yield broker_connstr -@pytest.fixture(scope="function") -def kafka_server( - session_kafka_server: Tuple[Popen, int], - kafka_admin_client: AdminClient, - kafka_server_config_overrides: Dict[str, str], -) -> Iterator[Tuple[Popen, int]]: - # No overrides, we can just return the original broker connection - if not kafka_server_config_overrides: - yield session_kafka_server - return - - # This is the minimal operation that the kafka_admin_client gives to - # retrieve the cluster metadata, which we need to get the numeric id of the - # broker spawned by pytest_kafka. - metadata = kafka_admin_client.list_topics("__consumer_offsets") - broker_ids = [str(broker) for broker in metadata.brokers.keys()] - assert len(broker_ids) == 1, "More than one broker found in the kafka cluster?!" - - # Pull the current broker configuration. describe_configs and alter_configs - # generate a dict containing one concurrent.future per queried - # ConfigResource, hence the use of .result() - broker = ConfigResource("broker", broker_ids[0]) - futures = kafka_admin_client.describe_configs([broker]) - original_config = futures[broker].result() - - # Gather the list of settings we need to change in the broker - # ConfigResource, and their original values in the to_restore dict - to_restore = {} - for key, new_value in kafka_server_config_overrides.items(): - if key not in original_config: - raise ValueError(f"Cannot override unknown configuration {key}") - orig_value = original_config[key].value - if orig_value == new_value: - continue - if original_config[key].is_read_only: - raise ValueError(f"Cannot override read-only configuration {key}") - - broker.set_config(key, new_value) - to_restore[key] = orig_value - - # to_restore will be empty if all the config "overrides" are equal to the - # original value. No need to wait for a config alteration if that's the - # case. The result() will raise a KafkaException if the settings change - # failed. - if to_restore: - futures = kafka_admin_client.alter_configs([broker]) - try: - futures[broker].result() - except Exception: - raise - - yield session_kafka_server - - # Now we can restore the old setting values. Again, the result() will raise - # a KafkaException if the settings change failed. - if to_restore: - for key, orig_value in to_restore.items(): - broker.set_config(key, orig_value) - - futures = kafka_admin_client.alter_configs([broker]) - try: - futures[broker].result() - except Exception: - raise + p.flush() TEST_CONFIG = { "consumer_id": "swh.journal.consumer", "object_types": TEST_OBJECT_DICTS.keys(), "stop_after_objects": 1, # will read 1 object and stop "storage": {"cls": "memory", "args": {}}, } @pytest.fixture -def test_config(kafka_server: Tuple[Popen, int], kafka_prefix: str): +def test_config(kafka_server: str, kafka_prefix: str): """Test configuration needed for producer/consumer """ - _, port = kafka_server return { **TEST_CONFIG, - "brokers": ["127.0.0.1:{}".format(port)], + "brokers": [kafka_server], "prefix": kafka_prefix + ".swh.journal.objects", } @pytest.fixture def consumer( - kafka_server: Tuple[Popen, int], test_config: Dict, kafka_consumer_group: str, + kafka_server: str, test_config: Dict, kafka_consumer_group: str, ) -> Consumer: """Get a connected Kafka consumer. """ - _, kafka_port = kafka_server consumer = Consumer( { - "bootstrap.servers": "127.0.0.1:{}".format(kafka_port), + "bootstrap.servers": kafka_server, "auto.offset.reset": "earliest", "enable.auto.commit": True, "group.id": kafka_consumer_group, } ) kafka_topics = [ "%s.%s" % (test_config["prefix"], object_type) for object_type in test_config["object_types"] ] consumer.subscribe(kafka_topics) yield consumer consumer.close() def objects_d(): return one_of( strategies.origins().map(lambda x: ("origin", x.to_dict())), strategies.origin_visits().map(lambda x: ("origin_visit", x.to_dict())), strategies.snapshots().map(lambda x: ("snapshot", x.to_dict())), strategies.releases().map(lambda x: ("release", x.to_dict())), strategies.revisions().map(lambda x: ("revision", x.to_dict())), strategies.directories().map(lambda x: ("directory", x.to_dict())), strategies.skipped_contents().map(lambda x: ("skipped_content", x.to_dict())), strategies.present_contents().map(lambda x: ("content", x.to_dict())), ) diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py index eb19869..fd4ce12 100644 --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -1,631 +1,618 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter import copy import functools import logging import re import tempfile -from subprocess import Popen -from typing import Any, Dict, Tuple +from typing import Any, Dict from unittest.mock import patch from click.testing import CliRunner from confluent_kafka import Producer import pytest import yaml from swh.model.hashutil import hash_to_hex from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.storage import get_storage from swh.journal.cli import cli from swh.journal.replay import CONTENT_REPLAY_RETRIES from swh.journal.serializers import key_to_kafka, value_to_kafka logger = logging.getLogger(__name__) CLI_CONFIG = { "storage": {"cls": "memory",}, "objstorage_src": {"cls": "mocked", "name": "src",}, "objstorage_dst": {"cls": "mocked", "name": "dst",}, } @pytest.fixture def storage(): """An swh-storage object that gets injected into the CLI functions.""" storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} storage = get_storage(**storage_config) with patch("swh.journal.cli.get_storage") as get_storage_mock: get_storage_mock.return_value = storage yield storage @pytest.fixture def monkeypatch_retry_sleep(monkeypatch): from swh.journal.replay import copy_object, obj_in_objstorage monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None) monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None) def invoke(*args, env=None, journal_config=None): config = copy.deepcopy(CLI_CONFIG) if journal_config: config["journal"] = journal_config runner = CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: yaml.dump(config, config_fd) config_fd.seek(0) args = ["-C" + config_fd.name] + list(args) return runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,) def test_replay( - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" producer = Producer( { - "bootstrap.servers": "localhost:{}".format(kafka_port), + "bootstrap.servers": kafka_server, "client.id": "test-producer", "acks": "all", } ) snapshot = { "id": b"foo", "branches": {b"HEAD": {"target_type": "revision", "target": b"\x01" * 20,}}, } # type: Dict[str, Any] producer.produce( topic=kafka_prefix + ".snapshot", key=key_to_kafka(snapshot["id"]), value=value_to_kafka(snapshot), ) producer.flush() logger.debug("Flushed producer") result = invoke( "replay", "--stop-after-objects", "1", journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output assert storage.snapshot_get(snapshot["id"]) == {**snapshot, "next_branch": None} def _patch_objstorages(names): objstorages = {name: InMemoryObjStorage() for name in names} def get_mock_objstorage(cls, **args): assert cls == "mocked", cls return objstorages[args["name"]] def decorator(f): @functools.wraps(f) @patch("swh.journal.cli.get_objstorage") def newf(get_objstorage_mock, *args, **kwargs): get_objstorage_mock.side_effect = get_mock_objstorage f(*args, objstorages=objstorages, **kwargs) return newf return decorator NUM_CONTENTS = 10 -def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages): +def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages): producer = Producer( { - "bootstrap.servers": "127.0.0.1:{}".format(kafka_port), + "bootstrap.servers": kafka_server, "client.id": "test-producer", "acks": "all", } ) contents = {} for i in range(NUM_CONTENTS): content = b"\x00" * 19 + bytes([i]) sha1 = objstorages["src"].add(content) contents[sha1] = content producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(sha1), value=key_to_kafka({"sha1": sha1, "status": "visible",}), ) producer.flush() return contents @_patch_objstorages(["src", "dst"]) def test_replay_content( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_structured_log( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, caplog, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) caplog.set_level(logging.DEBUG, "swh.journal.replay") expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = set() for record in caplog.records: logtext = record.getMessage() if "copied" in logtext: copied.add(record.args["obj_id"]) assert ( copied == expected_obj_ids ), "Mismatched logging; see captured log output for details." @_patch_objstorages(["src", "dst"]) def test_replay_content_static_group_id( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, caplog, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) # Setup log capture to fish the consumer settings out of the log messages caplog.set_level(logging.DEBUG, "swh.journal.client") result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"}, journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output consumer_settings = None for record in caplog.records: if "Consumer settings" in record.message: consumer_settings = record.args break assert consumer_settings is not None, ( "Failed to get consumer settings out of the consumer log. " "See log capture for details." ) assert consumer_settings["group.instance.id"] == "static-group-instance-id" assert consumer_settings["session.timeout.ms"] == 60 * 10 * 1000 assert consumer_settings["max.poll.interval.ms"] == 90 * 10 * 1000 for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_exclude( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) excluded_contents = list(contents)[0::2] # picking half of them with tempfile.NamedTemporaryFile(mode="w+b") as fd: fd.write(b"".join(sorted(excluded_contents))) fd.seek(0) result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), "--exclude-sha1-file", fd.name, journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): if sha1 in excluded_contents: assert sha1 not in objstorages["dst"], sha1 else: assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content NUM_CONTENTS_DST = 5 @_patch_objstorages(["src", "dst"]) @pytest.mark.parametrize( "check_dst,expected_copied,expected_in_dst", [ (True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST), (False, NUM_CONTENTS, 0), ], ) def test_replay_content_check_dst( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, check_dst: bool, expected_copied: int, expected_in_dst: int, caplog, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break objstorages["dst"].add(content, obj_id=sha1) caplog.set_level(logging.DEBUG, "swh.journal.replay") result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), "--check-dst" if check_dst else "--no-check-dst", journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 in_dst = 0 for record in caplog.records: logtext = record.getMessage() if "copied" in logtext: copied += 1 elif "in dst" in logtext: in_dst += 1 assert ( copied == expected_copied and in_dst == expected_in_dst ), "Unexpected amount of objects copied, see the captured log for details" for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content class FlakyObjStorage(InMemoryObjStorage): def __init__(self, *args, **kwargs): state = kwargs.pop("state") self.failures_left = Counter(kwargs.pop("failures")) super().__init__(*args, **kwargs) if state: self.state = state def flaky_operation(self, op, obj_id): if self.failures_left[op, obj_id] > 0: self.failures_left[op, obj_id] -= 1 raise RuntimeError("Failed %s on %s" % (op, hash_to_hex(obj_id))) def get(self, obj_id): self.flaky_operation("get", obj_id) return super().get(obj_id) def add(self, data, obj_id=None, check_presence=True): self.flaky_operation("add", obj_id) return super().add(data, obj_id=obj_id, check_presence=check_presence) def __contains__(self, obj_id): self.flaky_operation("in", obj_id) return super().__contains__(obj_id) @_patch_objstorages(["src", "dst"]) def test_replay_content_check_dst_retry( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, monkeypatch_retry_sleep, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) failures = {} for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break objstorages["dst"].add(content, obj_id=sha1) failures["in", sha1] = 1 orig_dst = objstorages["dst"] objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures) result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), "--check-dst", journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_failed_copy_retry( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, caplog, monkeypatch_retry_sleep, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) add_failures = {} get_failures = {} definitely_failed = set() # We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times. # We generate failures for 2 different operations, get and add. num_retry_contents = 2 * CONTENT_REPLAY_RETRIES assert ( num_retry_contents < NUM_CONTENTS ), "Need to generate more test contents to properly test retry behavior" for i, sha1 in enumerate(contents): if i >= num_retry_contents: break # This generates a number of failures, up to CONTENT_REPLAY_RETRIES num_failures = (i % CONTENT_REPLAY_RETRIES) + 1 # This generates failures of add for the first CONTENT_REPLAY_RETRIES # objects, then failures of get. if i < CONTENT_REPLAY_RETRIES: add_failures["add", sha1] = num_failures else: get_failures["get", sha1] = num_failures # Only contents that have CONTENT_REPLAY_RETRIES or more are # definitely failing if num_failures >= CONTENT_REPLAY_RETRIES: definitely_failed.add(hash_to_hex(sha1)) objstorages["dst"] = FlakyObjStorage( state=objstorages["dst"].state, failures=add_failures, ) objstorages["src"] = FlakyObjStorage( state=objstorages["src"].state, failures=get_failures, ) caplog.set_level(logging.DEBUG, "swh.journal.replay") result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 actually_failed = set() for record in caplog.records: logtext = record.getMessage() if "copied" in logtext: copied += 1 elif "Failed operation" in logtext: assert record.levelno == logging.ERROR assert record.args["retries"] == CONTENT_REPLAY_RETRIES actually_failed.add(record.args["obj_id"]) assert ( actually_failed == definitely_failed ), "Unexpected object copy failures; see captured log for details" for (sha1, content) in contents.items(): if hash_to_hex(sha1) in definitely_failed: assert sha1 not in objstorages["dst"] continue assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content @_patch_objstorages(["src", "dst"]) def test_replay_content_objnotfound( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], + kafka_server: str, caplog, ): - (_, kafka_port) = kafka_server kafka_prefix += ".swh.journal.objects" - contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages) + contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) num_contents_deleted = 5 contents_deleted = set() for i, sha1 in enumerate(contents): if i >= num_contents_deleted: break del objstorages["src"].state[sha1] contents_deleted.add(hash_to_hex(sha1)) caplog.set_level(logging.DEBUG, "swh.journal.replay") result = invoke( "content-replay", "--stop-after-objects", str(NUM_CONTENTS), journal_config={ - "brokers": ["127.0.0.1:%d" % kafka_port], + "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 not_in_src = set() for record in caplog.records: logtext = record.getMessage() if "copied" in logtext: copied += 1 elif "object not found" in logtext: # Check that the object id can be recovered from logs assert record.levelno == logging.ERROR not_in_src.add(record.args["obj_id"]) assert ( copied == NUM_CONTENTS - num_contents_deleted ), "Unexpected number of contents copied" assert ( not_in_src == contents_deleted ), "Mismatch between deleted contents and not_in_src logs" for (sha1, content) in contents.items(): if sha1 not in objstorages["src"]: continue assert sha1 in objstorages["dst"], sha1 assert objstorages["dst"].get(sha1) == content diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py index f0bd3cd..76d190c 100644 --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -1,144 +1,137 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from subprocess import Popen -from typing import Dict, List, Tuple +from typing import Dict, List from unittest.mock import MagicMock from confluent_kafka import Producer import pytest from swh.model.hypothesis_strategies import revisions from swh.model.model import Content from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka -def test_client( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int] -): - (_, port) = kafka_server +def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): kafka_prefix += ".swh.journal.objects" producer = Producer( { - "bootstrap.servers": "localhost:{}".format(port), + "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) rev = revisions().example() # Fill Kafka producer.produce( topic=kafka_prefix + ".revision", key=key_to_kafka(rev.id), value=value_to_kafka(rev.to_dict()), ) producer.flush() client = JournalClient( - brokers="localhost:%d" % kafka_server[1], + brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"revision": [rev.to_dict()]}) -def test_client_eof( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int] -): - (_, port) = kafka_server +def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): kafka_prefix += ".swh.journal.objects" producer = Producer( { - "bootstrap.servers": "localhost:{}".format(port), + "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) rev = revisions().example() # Fill Kafka producer.produce( topic=kafka_prefix + ".revision", key=key_to_kafka(rev.id), value=value_to_kafka(rev.to_dict()), ) producer.flush() client = JournalClient( - brokers="localhost:%d" % kafka_server[1], + brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=None, stop_on_eof=True, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"revision": [rev.to_dict()]}) @pytest.mark.parametrize("batch_size", [1, 5, 100]) def test_client_batch_size( - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - batch_size: int, + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int, ): - (_, port) = kafka_server kafka_prefix += ".swh.journal.objects" num_objects = 2 * batch_size + 1 assert num_objects < 256, "Too many objects, generation will fail" producer = Producer( { - "bootstrap.servers": "localhost:{}".format(port), + "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) contents = [Content.from_data(bytes([i])) for i in range(num_objects)] # Fill Kafka for content in contents: producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(content.sha1), value=value_to_kafka(content.to_dict()), ) producer.flush() client = JournalClient( - brokers=["localhost:%d" % kafka_server[1]], + brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=num_objects, batch_size=batch_size, ) collected_output: List[Dict] = [] def worker_fn(objects): received = objects["content"] assert len(received) <= batch_size collected_output.extend(received) client.process(worker_fn) - assert collected_output == [content.to_dict() for content in contents] + expected_output = [content.to_dict() for content in contents] + assert len(collected_output) == len(expected_output) + + for output in collected_output: + assert output in expected_output diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py index 0aa749e..1ff8712 100644 --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,223 +1,212 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict -from confluent_kafka import Consumer, Producer, KafkaException - import pytest -from subprocess import Popen -from typing import Tuple +from confluent_kafka import Consumer, Producer, KafkaException from swh.storage import get_storage from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError from swh.model.model import Directory, Origin, OriginVisit from .conftest import TEST_OBJECTS, TEST_OBJECT_DICTS def consume_messages(consumer, kafka_prefix, expected_messages): """Consume expected_messages from the consumer; Sort them all into a consumed_objects dict""" consumed_messages = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError("Timed out fetching messages from kafka") msg = consumer.poll(timeout=0.01) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 topic = msg.topic() assert topic.startswith(kafka_prefix + "."), "Unexpected topic" object_type = topic[len(kafka_prefix + ".") :] consumed_messages[object_type].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) return consumed_messages def assert_all_objects_consumed(consumed_messages): """Check whether all objects from TEST_OBJECT_DICTS have been consumed""" for object_type, known_values in TEST_OBJECT_DICTS.items(): known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]] (received_keys, received_values) = zip(*consumed_messages[object_type]) if object_type == "origin_visit": for value in received_values: del value["visit"] elif object_type == "content": for value in received_values: del value["ctime"] for key in known_keys: assert key in received_keys for value in known_values: assert value in received_values -def test_kafka_writer( - kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer -): +def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer): kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriter( - brokers=[f"localhost:{kafka_server[1]}"], - client_id="kafka_writer", - prefix=kafka_prefix, + brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, ) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) -def test_storage_direct_writer( - kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer -): +def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): kafka_prefix += ".swh.journal.objects" writer_config = { "cls": "kafka", - "brokers": ["localhost:%d" % kafka_server[1]], + "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, } storage_config = { "cls": "pipeline", "steps": [{"cls": "memory", "journal_writer": writer_config},], } storage = get_storage(**storage_config) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): method = getattr(storage, object_type + "_add") if object_type in ( "content", "directory", "revision", "release", "snapshot", "origin", ): method(objects) expected_messages += len(objects) elif object_type in ("origin_visit",): for obj in objects: assert isinstance(obj, OriginVisit) storage.origin_add_one(Origin(url=obj.origin)) visit = method(obj.origin, date=obj.date, type=obj.type) expected_messages += 1 obj_d = obj.to_dict() for k in ("visit", "origin", "date", "type"): del obj_d[k] storage.origin_visit_update(obj.origin, visit.visit, **obj_d) expected_messages += 1 else: assert False, object_type consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) def test_write_delivery_failure( - kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer, + kafka_prefix: str, kafka_server: str, consumer: Consumer ): class MockKafkaError: """A mocked kafka error""" def str(self): return "Mocked Kafka Error" def name(self): return "SWH_MOCK_ERROR" class KafkaJournalWriterFailDelivery(KafkaJournalWriter): """A journal writer which always fails delivering messages""" def _on_delivery(self, error, message): """Replace the inbound error with a fake delivery error""" super()._on_delivery(MockKafkaError(), message) kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriterFailDelivery( - brokers=["localhost:%d" % kafka_server[1]], - client_id="kafka_writer", - prefix=kafka_prefix, + brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, ) empty_dir = Directory(entries=[]) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert "Failed deliveries" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_MOCK_ERROR" def test_write_delivery_timeout( - kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer + kafka_prefix: str, kafka_server: str, consumer: Consumer ): produced = [] class MockProducer(Producer): """A kafka producer which pretends to produce messages, but never sends any delivery acknowledgements""" def produce(self, **kwargs): produced.append(kwargs) kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriter( - brokers=["localhost:%d" % kafka_server[1]], + brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, flush_timeout=1, producer_class=MockProducer, ) empty_dir = Directory(entries=[]) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert len(produced) == 1 assert "timeout" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_FLUSH_TIMEOUT" diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index 5925049..796236d 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,414 +1,405 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import logging import random -from subprocess import Popen -from typing import Dict, List, Tuple +from typing import Dict, List import dateutil import pytest from confluent_kafka import Producer from hypothesis import strategies, given, settings from swh.storage import get_storage from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects, is_hash_in_bytearray from swh.model.hashutil import hash_to_hex from swh.model.model import Content from .conftest import TEST_OBJECT_DICTS, DUPLICATE_CONTENTS from .utils import MockedJournalClient, MockedKafkaWriter storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} def make_topic(kafka_prefix: str, object_type: str) -> str: return kafka_prefix + "." + object_type def test_storage_play( - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - caplog, + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, ): """Optimal replayer scenario. This: - writes objects to the topic - replayer consumes objects from the topic and replay them """ - (_, port) = kafka_server kafka_prefix += ".swh.journal.objects" storage = get_storage(**storage_config) producer = Producer( { - "bootstrap.servers": "localhost:{}".format(port), + "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for object_type, objects in TEST_OBJECT_DICTS.items(): topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == "content": object_["ctime"] = now elif object_type == "origin_visit": nb_visits += 1 object_["visit"] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the storage from Kafka replayer = JournalClient( - brokers="localhost:%d" % kafka_server[1], + brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert TEST_OBJECT_DICTS["revision"] == list( storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) ) assert TEST_OBJECT_DICTS["release"] == list( storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) ) origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] for origin in origins: origin_url = origin["url"] expected_visits = [ { **visit, "origin": origin_url, "date": dateutil.parser.parse(visit["date"]), } for visit in TEST_OBJECT_DICTS["origin_visit"] if visit["origin"] == origin["url"] ] actual_visits = list(storage.origin_visit_get(origin_url)) for visit in actual_visits: del visit["visit"] # opaque identifier assert expected_visits == actual_visits input_contents = TEST_OBJECT_DICTS["content"] contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) assert len(contents) == len(input_contents) assert contents == {cont["sha1"]: [cont] for cont in input_contents} collision = 0 for record in caplog.records: logtext = record.getMessage() if "Colliding contents:" in logtext: collision += 1 assert collision == 0, "No collision should be detected" def test_storage_play_with_collision( - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: Tuple[Popen, int], - caplog, + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, ): """Another replayer scenario with collisions. This: - writes objects to the topic, including colliding contents - replayer consumes objects from the topic and replay them - This drops the colliding contents from the replay when detected """ - (_, port) = kafka_server kafka_prefix += ".swh.journal.objects" storage = get_storage(**storage_config) producer = Producer( { - "bootstrap.servers": "localhost:{}".format(port), + "bootstrap.servers": kafka_server, "client.id": "test producer", "enable.idempotence": "true", } ) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for object_type, objects in TEST_OBJECT_DICTS.items(): topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == "content": object_["ctime"] = now elif object_type == "origin_visit": nb_visits += 1 object_["visit"] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 # Create collision in input data # They are not written in the destination for content in DUPLICATE_CONTENTS: topic = make_topic(kafka_prefix, "content") producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(content), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the storage from Kafka replayer = JournalClient( - brokers="localhost:%d" % kafka_server[1], + brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert TEST_OBJECT_DICTS["revision"] == list( storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) ) assert TEST_OBJECT_DICTS["release"] == list( storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) ) origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] for origin in origins: origin_url = origin["url"] expected_visits = [ { **visit, "origin": origin_url, "date": dateutil.parser.parse(visit["date"]), } for visit in TEST_OBJECT_DICTS["origin_visit"] if visit["origin"] == origin["url"] ] actual_visits = list(storage.origin_visit_get(origin_url)) for visit in actual_visits: del visit["visit"] # opaque identifier assert expected_visits == actual_visits input_contents = TEST_OBJECT_DICTS["content"] contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) assert len(contents) == len(input_contents) assert contents == {cont["sha1"]: [cont] for cont in input_contents} nb_collisions = 0 actual_collision: Dict for record in caplog.records: logtext = record.getMessage() if "Collision detected:" in logtext: nb_collisions += 1 actual_collision = record.args["collision"] assert nb_collisions == 1, "1 collision should be detected" algo = "sha1" assert actual_collision["algo"] == algo expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo]) assert actual_collision["hash"] == expected_colliding_hash actual_colliding_hashes = actual_collision["objects"] assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) for content in DUPLICATE_CONTENTS: expected_content_hashes = { k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items() } assert expected_content_hashes in actual_colliding_hashes def _test_write_replay_origin_visit(visits: List[Dict]): """Helper function to write tests for origin_visit. Each visit (a dict) given in the 'visits' argument will be sent to a (mocked) kafka queue, which a in-memory-storage backed replayer is listening to. Check that corresponding origin visits entities are present in the storage and have correct values if they are not skipped. """ queue: List = [] replayer = MockedJournalClient(queue) writer = MockedKafkaWriter(queue) # Note that flipping the order of these two insertions will crash # the test, because the legacy origin_format does not allow to create # the origin when needed (type is missing) writer.send( "origin", "foo", { "url": "http://example.com/", "type": "git", # test the legacy origin format is accepted }, ) for visit in visits: writer.send("origin_visit", "foo", visit) queue_size = len(queue) assert replayer.stop_after_objects is None replayer.stop_after_objects = queue_size storage = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage) replayer.process(worker_fn) actual_visits = list(storage.origin_visit_get("http://example.com/")) assert len(actual_visits) == len(visits), actual_visits for vin, vout in zip(visits, actual_visits): vin = vin.copy() vout = vout.copy() assert vout.pop("origin") == "http://example.com/" vin.pop("origin") vin.setdefault("type", "git") vin.setdefault("metadata", None) assert vin == vout def test_write_replay_origin_visit(): """Test origin_visit when the 'origin' is just a string.""" now = datetime.datetime.now() visits = [ { "visit": 1, "origin": "http://example.com/", "date": now, "type": "git", "status": "partial", "snapshot": None, } ] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit1(): """Origin_visit with no types should make the replayer crash We expect the journal's origin_visit topic to no longer reference such visits. If it does, the replayer must crash so we can fix the journal's topic. """ now = datetime.datetime.now() visit = { "visit": 1, "origin": "http://example.com/", "date": now, "status": "partial", "snapshot": None, } now2 = datetime.datetime.now() visit2 = { "visit": 2, "origin": {"url": "http://example.com/"}, "date": now2, "status": "partial", "snapshot": None, } for origin_visit in [visit, visit2]: with pytest.raises(ValueError, match="Old origin visit format"): _test_write_replay_origin_visit([origin_visit]) def test_write_replay_legacy_origin_visit2(): """Test origin_visit when 'type' is missing from the visit, but not from the origin.""" now = datetime.datetime.now() visits = [ { "visit": 1, "origin": {"url": "http://example.com/", "type": "git",}, "date": now, "type": "git", "status": "partial", "snapshot": None, } ] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit3(): """Test origin_visit when the origin is a dict""" now = datetime.datetime.now() visits = [ { "visit": 1, "origin": {"url": "http://example.com/",}, "date": now, "type": "git", "status": "partial", "snapshot": None, } ] _test_write_replay_origin_visit(visits) hash_strategy = strategies.binary(min_size=20, max_size=20) @settings(max_examples=500) @given( strategies.sets(hash_strategy, min_size=0, max_size=500), strategies.sets(hash_strategy, min_size=10), ) def test_is_hash_in_bytearray(haystack, needles): array = b"".join(sorted(haystack)) needles |= haystack # Exhaustively test for all objects in the array for needle in needles: assert is_hash_in_bytearray(needle, array, len(haystack)) == ( needle in haystack ) diff --git a/tox.ini b/tox.ini index e8aa391..9020b04 100644 --- a/tox.ini +++ b/tox.ini @@ -1,40 +1,37 @@ [tox] envlist=black,flake8,mypy,py3 [testenv] -passenv=SWH_KAFKA_ROOT extras = testing deps = pytest-cov dev: pdbpp -setenv = - SWH_KAFKA_ROOT = {env:SWH_KAFKA_ROOT:swh/journal/tests/kafka} commands = pytest --cov={envsitepackagesdir}/swh/journal \ {envsitepackagesdir}/swh/journal \ --cov-branch \ --doctest-modules {posargs} [testenv:black] skip_install = true deps = black commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = git+https://github.com/PyCQA/pyflakes.git flake8 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy commands = mypy swh