Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/bin/install-kafka.sh b/bin/install-kafka.sh
deleted file mode 100755
index 79fd5e6..0000000
--- a/bin/install-kafka.sh
+++ /dev/null
@@ -1,56 +0,0 @@
-#!/usr/bin/env bash
-
-set -xe
-
-SCALA_VERSION=2.12
-KAFKA_VERSION=2.4.0
-KAFKA_CHECKSUM=53b52f86ea56c9fac62046524f03f75665a089ea2dae554aefe3a3d2694f2da88b5ba8725d8be55f198ba80695443559ed9de7c0b2a2817f7a6141008ff79f49
-
-KAFKA_APP="kafka_${SCALA_VERSION}-${KAFKA_VERSION}"
-TARBALL="${KAFKA_APP}.tgz"
-URL="http://apache.mirrors.ovh.net/ftp.apache.org/dist/kafka/${KAFKA_VERSION}/${TARBALL}"
-CHECKSUMS="${TARBALL}.sha512"
-KAFKA_ROOT_DIR=swh/journal/tests
-KAFKA_DIR="${KAFKA_ROOT_DIR}/${KAFKA_APP}"
-KAFKA_LINK="${KAFKA_ROOT_DIR}/kafka"
-
-case $1 in
- "install")
- echo "Kafka installation started."
- if [ ! -f $TARBALL ]; then
- echo "Kafka download"
- wget $URL
- echo "${KAFKA_CHECKSUM} ${TARBALL}" > $CHECKSUMS
- sha512sum -c $CHECKSUMS
-
- if [ $? -ne 0 ]; then
- echo "Kafka download: failed to retrieve ${TARBALL}";
- exit 1
- fi
- echo "Kafka download done"
- fi
-
- if [ ! -d $KAFKA_DIR ]; then
- echo "Kafka extraction"
- tar xvf $TARBALL -C $KAFKA_ROOT_DIR
- pushd $KAFKA_ROOT_DIR && ln -nsf $KAFKA_APP kafka && popd
- echo "Kafka extraction done"
- fi
- echo "Kafka installation done. Kafka is installed at $KAFKA_DIR"
- ;;
- "clean")
- echo "Kafka cleanup started."
- [ -d $KAFKA_DIR ] && rm -rf $KAFKA_DIR
- [ -L $KAFKA_LINK ] && rm $KAFKA_LINK
- echo "Kafka cleanup done."
- ;;
- "clean-cache")
- echo "Kafka cleanup cache started."
- [ -f $TARBALL ] && rm $TARBALL
- [ -f $CHECKSUMS ] && rm $CHECKSUMS
- echo "Kafka cleanup cache done."
- ;;
- *)
- echo "Unknown command, do nothing"
- exit 1;
-esac
diff --git a/requirements-test.txt b/requirements-test.txt
index 7f2dc48..c727717 100644
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,4 +1,3 @@
pytest
swh.model >= 0.0.34
-pytest-kafka
hypothesis
diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
index e7169e2..3b89a7b 100644
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -1,358 +1,252 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
-import os
import pytest
import logging
import random
import string
-from confluent_kafka import Consumer
-from confluent_kafka.admin import AdminClient, ConfigResource
+from confluent_kafka import Consumer, Producer
from hypothesis.strategies import one_of
-from subprocess import Popen
-from typing import Any, Dict, Iterator, List, Tuple
-
-from pathlib import Path
-from pytest_kafka import (
- make_zookeeper_process,
- make_kafka_server,
- KAFKA_SERVER_CONFIG_TEMPLATE,
- ZOOKEEPER_CONFIG_TEMPLATE,
-)
+from typing import Any, Dict, Iterator, List
from swh.model import hypothesis_strategies as strategies
from swh.model.hashutil import MultiHash, hash_to_bytes
from swh.journal.serializers import ModelObject
from swh.journal.writer.kafka import OBJECT_TYPES
logger = logging.getLogger(__name__)
CONTENTS = [
{**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",},
]
duplicate_content1 = {
"length": 4,
"sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"),
"sha1_git": b"another-foo",
"blake2s256": b"another-bar",
"sha256": b"another-baz",
"status": "visible",
}
# Craft a sha1 collision
duplicate_content2 = duplicate_content1.copy()
sha1_array = bytearray(duplicate_content1["sha1_git"])
sha1_array[0] += 1
duplicate_content2["sha1_git"] = bytes(sha1_array)
DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2]
COMMITTERS = [
{"fullname": b"foo", "name": b"foo", "email": b"",},
{"fullname": b"bar", "name": b"bar", "email": b"",},
]
DATES = [
{
"timestamp": {"seconds": 1234567891, "microseconds": 0,},
"offset": 120,
"negative_utc": False,
},
{
"timestamp": {"seconds": 1234567892, "microseconds": 0,},
"offset": 120,
"negative_utc": False,
},
]
REVISIONS = [
{
"id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"),
"message": b"hello",
"date": DATES[0],
"committer": COMMITTERS[0],
"author": COMMITTERS[0],
"committer_date": DATES[0],
"type": "git",
"directory": b"\x01" * 20,
"synthetic": False,
"metadata": None,
"parents": [],
},
{
"id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"),
"message": b"hello again",
"date": DATES[1],
"committer": COMMITTERS[1],
"author": COMMITTERS[1],
"committer_date": DATES[1],
"type": "hg",
"directory": b"\x02" * 20,
"synthetic": False,
"metadata": None,
"parents": [],
},
]
RELEASES = [
{
"id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
"name": b"v0.0.1",
"date": {
"timestamp": {"seconds": 1234567890, "microseconds": 0,},
"offset": 120,
"negative_utc": False,
},
"author": COMMITTERS[0],
"target_type": "revision",
"target": b"\x04" * 20,
"message": b"foo",
"synthetic": False,
},
]
ORIGINS = [
{"url": "https://somewhere.org/den/fox",},
{"url": "https://overtherainbow.org/fox/den",},
]
ORIGIN_VISITS = [
{
"origin": ORIGINS[0]["url"],
"date": "2013-05-07 04:20:39.369271+00:00",
"snapshot": None, # TODO
"status": "ongoing", # TODO
"metadata": {"foo": "bar"},
"type": "git",
},
{
"origin": ORIGINS[0]["url"],
"date": "2018-11-27 17:20:39+00:00",
"snapshot": None, # TODO
"status": "ongoing", # TODO
"metadata": {"baz": "qux"},
"type": "git",
},
]
TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = {
"content": CONTENTS,
"revision": REVISIONS,
"release": RELEASES,
"origin": ORIGINS,
"origin_visit": ORIGIN_VISITS,
}
MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()}
TEST_OBJECTS: Dict[str, List[ModelObject]] = {}
for object_type, objects in TEST_OBJECT_DICTS.items():
converted_objects: List[ModelObject] = []
model = MODEL_OBJECTS[object_type]
for (num, obj_d) in enumerate(objects):
if object_type == "origin_visit":
obj_d = {**obj_d, "visit": num}
elif object_type == "content":
obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now()}
converted_objects.append(model.from_dict(obj_d))
TEST_OBJECTS[object_type] = converted_objects
-KAFKA_ROOT = os.environ.get("SWH_KAFKA_ROOT")
-KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + "/kafka"
-if not os.path.exists(KAFKA_ROOT):
- msg = (
- "Development error: %s must exist and target an "
- "existing kafka installation" % KAFKA_ROOT
- )
- raise ValueError(msg)
-
-KAFKA_SCRIPTS = Path(KAFKA_ROOT) / "bin"
-
-KAFKA_BIN = str(KAFKA_SCRIPTS / "kafka-server-start.sh")
-ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / "zookeeper-server-start.sh")
-
-ZK_CONFIG_TEMPLATE = ZOOKEEPER_CONFIG_TEMPLATE + "\nadmin.enableServer=false\n"
-KAFKA_CONFIG_TEMPLATE = KAFKA_SERVER_CONFIG_TEMPLATE + "\nmessage.max.bytes=104857600\n"
-
-# Those defines fixtures
-zookeeper_proc = make_zookeeper_process(
- ZOOKEEPER_BIN, zk_config_template=ZK_CONFIG_TEMPLATE, scope="session"
-)
-os.environ[
- "KAFKA_LOG4J_OPTS"
-] = "-Dlog4j.configuration=file:%s/log4j.properties" % os.path.dirname(__file__)
-session_kafka_server = make_kafka_server(
- KAFKA_BIN,
- "zookeeper_proc",
- kafka_config_template=KAFKA_CONFIG_TEMPLATE,
- scope="session",
-)
-
-kafka_logger = logging.getLogger("kafka")
-kafka_logger.setLevel(logging.WARN)
-
-
@pytest.fixture(scope="function")
def kafka_prefix():
"""Pick a random prefix for kafka topics on each call"""
return "".join(random.choice(string.ascii_lowercase) for _ in range(10))
@pytest.fixture(scope="function")
def kafka_consumer_group(kafka_prefix: str):
"""Pick a random consumer group for kafka consumers on each call"""
return "test-consumer-%s" % kafka_prefix
@pytest.fixture(scope="session")
-def kafka_admin_client(session_kafka_server: Tuple[Popen, int]) -> AdminClient:
- return AdminClient({"bootstrap.servers": "localhost:%s" % session_kafka_server[1]})
+def kafka_server() -> Iterator[str]:
+ p = Producer({"test.mock.num.brokers": "1"})
+ metadata = p.list_topics()
+ brokers = [str(broker) for broker in metadata.brokers.values()]
+ assert len(brokers) == 1, "More than one broker found in the kafka cluster?!"
-@pytest.fixture(scope="function")
-def kafka_server_config_overrides() -> Dict[str, str]:
- return {}
+ broker_connstr, broker_id = brokers[0].split("/")
+ ip, port_str = broker_connstr.split(":")
+ assert ip == "127.0.0.1"
+ assert int(port_str)
+ yield broker_connstr
-@pytest.fixture(scope="function")
-def kafka_server(
- session_kafka_server: Tuple[Popen, int],
- kafka_admin_client: AdminClient,
- kafka_server_config_overrides: Dict[str, str],
-) -> Iterator[Tuple[Popen, int]]:
- # No overrides, we can just return the original broker connection
- if not kafka_server_config_overrides:
- yield session_kafka_server
- return
-
- # This is the minimal operation that the kafka_admin_client gives to
- # retrieve the cluster metadata, which we need to get the numeric id of the
- # broker spawned by pytest_kafka.
- metadata = kafka_admin_client.list_topics("__consumer_offsets")
- broker_ids = [str(broker) for broker in metadata.brokers.keys()]
- assert len(broker_ids) == 1, "More than one broker found in the kafka cluster?!"
-
- # Pull the current broker configuration. describe_configs and alter_configs
- # generate a dict containing one concurrent.future per queried
- # ConfigResource, hence the use of .result()
- broker = ConfigResource("broker", broker_ids[0])
- futures = kafka_admin_client.describe_configs([broker])
- original_config = futures[broker].result()
-
- # Gather the list of settings we need to change in the broker
- # ConfigResource, and their original values in the to_restore dict
- to_restore = {}
- for key, new_value in kafka_server_config_overrides.items():
- if key not in original_config:
- raise ValueError(f"Cannot override unknown configuration {key}")
- orig_value = original_config[key].value
- if orig_value == new_value:
- continue
- if original_config[key].is_read_only:
- raise ValueError(f"Cannot override read-only configuration {key}")
-
- broker.set_config(key, new_value)
- to_restore[key] = orig_value
-
- # to_restore will be empty if all the config "overrides" are equal to the
- # original value. No need to wait for a config alteration if that's the
- # case. The result() will raise a KafkaException if the settings change
- # failed.
- if to_restore:
- futures = kafka_admin_client.alter_configs([broker])
- try:
- futures[broker].result()
- except Exception:
- raise
-
- yield session_kafka_server
-
- # Now we can restore the old setting values. Again, the result() will raise
- # a KafkaException if the settings change failed.
- if to_restore:
- for key, orig_value in to_restore.items():
- broker.set_config(key, orig_value)
-
- futures = kafka_admin_client.alter_configs([broker])
- try:
- futures[broker].result()
- except Exception:
- raise
+ p.flush()
TEST_CONFIG = {
"consumer_id": "swh.journal.consumer",
"object_types": TEST_OBJECT_DICTS.keys(),
"stop_after_objects": 1, # will read 1 object and stop
"storage": {"cls": "memory", "args": {}},
}
@pytest.fixture
-def test_config(kafka_server: Tuple[Popen, int], kafka_prefix: str):
+def test_config(kafka_server: str, kafka_prefix: str):
"""Test configuration needed for producer/consumer
"""
- _, port = kafka_server
return {
**TEST_CONFIG,
- "brokers": ["127.0.0.1:{}".format(port)],
+ "brokers": [kafka_server],
"prefix": kafka_prefix + ".swh.journal.objects",
}
@pytest.fixture
def consumer(
- kafka_server: Tuple[Popen, int], test_config: Dict, kafka_consumer_group: str,
+ kafka_server: str, test_config: Dict, kafka_consumer_group: str,
) -> Consumer:
"""Get a connected Kafka consumer.
"""
- _, kafka_port = kafka_server
consumer = Consumer(
{
- "bootstrap.servers": "127.0.0.1:{}".format(kafka_port),
+ "bootstrap.servers": kafka_server,
"auto.offset.reset": "earliest",
"enable.auto.commit": True,
"group.id": kafka_consumer_group,
}
)
kafka_topics = [
"%s.%s" % (test_config["prefix"], object_type)
for object_type in test_config["object_types"]
]
consumer.subscribe(kafka_topics)
yield consumer
consumer.close()
def objects_d():
return one_of(
strategies.origins().map(lambda x: ("origin", x.to_dict())),
strategies.origin_visits().map(lambda x: ("origin_visit", x.to_dict())),
strategies.snapshots().map(lambda x: ("snapshot", x.to_dict())),
strategies.releases().map(lambda x: ("release", x.to_dict())),
strategies.revisions().map(lambda x: ("revision", x.to_dict())),
strategies.directories().map(lambda x: ("directory", x.to_dict())),
strategies.skipped_contents().map(lambda x: ("skipped_content", x.to_dict())),
strategies.present_contents().map(lambda x: ("content", x.to_dict())),
)
diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py
index eb19869..fd4ce12 100644
--- a/swh/journal/tests/test_cli.py
+++ b/swh/journal/tests/test_cli.py
@@ -1,631 +1,618 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import Counter
import copy
import functools
import logging
import re
import tempfile
-from subprocess import Popen
-from typing import Any, Dict, Tuple
+from typing import Any, Dict
from unittest.mock import patch
from click.testing import CliRunner
from confluent_kafka import Producer
import pytest
import yaml
from swh.model.hashutil import hash_to_hex
from swh.objstorage.backends.in_memory import InMemoryObjStorage
from swh.storage import get_storage
from swh.journal.cli import cli
from swh.journal.replay import CONTENT_REPLAY_RETRIES
from swh.journal.serializers import key_to_kafka, value_to_kafka
logger = logging.getLogger(__name__)
CLI_CONFIG = {
"storage": {"cls": "memory",},
"objstorage_src": {"cls": "mocked", "name": "src",},
"objstorage_dst": {"cls": "mocked", "name": "dst",},
}
@pytest.fixture
def storage():
"""An swh-storage object that gets injected into the CLI functions."""
storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]}
storage = get_storage(**storage_config)
with patch("swh.journal.cli.get_storage") as get_storage_mock:
get_storage_mock.return_value = storage
yield storage
@pytest.fixture
def monkeypatch_retry_sleep(monkeypatch):
from swh.journal.replay import copy_object, obj_in_objstorage
monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None)
monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None)
def invoke(*args, env=None, journal_config=None):
config = copy.deepcopy(CLI_CONFIG)
if journal_config:
config["journal"] = journal_config
runner = CliRunner()
with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd:
yaml.dump(config, config_fd)
config_fd.seek(0)
args = ["-C" + config_fd.name] + list(args)
return runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,)
def test_replay(
- storage,
- kafka_prefix: str,
- kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
producer = Producer(
{
- "bootstrap.servers": "localhost:{}".format(kafka_port),
+ "bootstrap.servers": kafka_server,
"client.id": "test-producer",
"acks": "all",
}
)
snapshot = {
"id": b"foo",
"branches": {b"HEAD": {"target_type": "revision", "target": b"\x01" * 20,}},
} # type: Dict[str, Any]
producer.produce(
topic=kafka_prefix + ".snapshot",
key=key_to_kafka(snapshot["id"]),
value=value_to_kafka(snapshot),
)
producer.flush()
logger.debug("Flushed producer")
result = invoke(
"replay",
"--stop-after-objects",
"1",
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
assert storage.snapshot_get(snapshot["id"]) == {**snapshot, "next_branch": None}
def _patch_objstorages(names):
objstorages = {name: InMemoryObjStorage() for name in names}
def get_mock_objstorage(cls, **args):
assert cls == "mocked", cls
return objstorages[args["name"]]
def decorator(f):
@functools.wraps(f)
@patch("swh.journal.cli.get_objstorage")
def newf(get_objstorage_mock, *args, **kwargs):
get_objstorage_mock.side_effect = get_mock_objstorage
f(*args, objstorages=objstorages, **kwargs)
return newf
return decorator
NUM_CONTENTS = 10
-def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages):
+def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages):
producer = Producer(
{
- "bootstrap.servers": "127.0.0.1:{}".format(kafka_port),
+ "bootstrap.servers": kafka_server,
"client.id": "test-producer",
"acks": "all",
}
)
contents = {}
for i in range(NUM_CONTENTS):
content = b"\x00" * 19 + bytes([i])
sha1 = objstorages["src"].add(content)
contents[sha1] = content
producer.produce(
topic=kafka_prefix + ".content",
key=key_to_kafka(sha1),
value=key_to_kafka({"sha1": sha1, "status": "visible",}),
)
producer.flush()
return contents
@_patch_objstorages(["src", "dst"])
def test_replay_content(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ kafka_server: str,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
- contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages)
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
result = invoke(
"content-replay",
"--stop-after-objects",
str(NUM_CONTENTS),
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
for (sha1, content) in contents.items():
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
@_patch_objstorages(["src", "dst"])
def test_replay_content_structured_log(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ kafka_server: str,
caplog,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
- contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages)
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
caplog.set_level(logging.DEBUG, "swh.journal.replay")
expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents)
result = invoke(
"content-replay",
"--stop-after-objects",
str(NUM_CONTENTS),
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
copied = set()
for record in caplog.records:
logtext = record.getMessage()
if "copied" in logtext:
copied.add(record.args["obj_id"])
assert (
copied == expected_obj_ids
), "Mismatched logging; see captured log output for details."
@_patch_objstorages(["src", "dst"])
def test_replay_content_static_group_id(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ kafka_server: str,
caplog,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
- contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages)
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
# Setup log capture to fish the consumer settings out of the log messages
caplog.set_level(logging.DEBUG, "swh.journal.client")
result = invoke(
"content-replay",
"--stop-after-objects",
str(NUM_CONTENTS),
env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"},
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
consumer_settings = None
for record in caplog.records:
if "Consumer settings" in record.message:
consumer_settings = record.args
break
assert consumer_settings is not None, (
"Failed to get consumer settings out of the consumer log. "
"See log capture for details."
)
assert consumer_settings["group.instance.id"] == "static-group-instance-id"
assert consumer_settings["session.timeout.ms"] == 60 * 10 * 1000
assert consumer_settings["max.poll.interval.ms"] == 90 * 10 * 1000
for (sha1, content) in contents.items():
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
@_patch_objstorages(["src", "dst"])
def test_replay_content_exclude(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ kafka_server: str,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
- contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages)
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
excluded_contents = list(contents)[0::2] # picking half of them
with tempfile.NamedTemporaryFile(mode="w+b") as fd:
fd.write(b"".join(sorted(excluded_contents)))
fd.seek(0)
result = invoke(
"content-replay",
"--stop-after-objects",
str(NUM_CONTENTS),
"--exclude-sha1-file",
fd.name,
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
for (sha1, content) in contents.items():
if sha1 in excluded_contents:
assert sha1 not in objstorages["dst"], sha1
else:
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
NUM_CONTENTS_DST = 5
@_patch_objstorages(["src", "dst"])
@pytest.mark.parametrize(
"check_dst,expected_copied,expected_in_dst",
[
(True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST),
(False, NUM_CONTENTS, 0),
],
)
def test_replay_content_check_dst(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ kafka_server: str,
check_dst: bool,
expected_copied: int,
expected_in_dst: int,
caplog,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
- contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages)
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
for i, (sha1, content) in enumerate(contents.items()):
if i >= NUM_CONTENTS_DST:
break
objstorages["dst"].add(content, obj_id=sha1)
caplog.set_level(logging.DEBUG, "swh.journal.replay")
result = invoke(
"content-replay",
"--stop-after-objects",
str(NUM_CONTENTS),
"--check-dst" if check_dst else "--no-check-dst",
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
copied = 0
in_dst = 0
for record in caplog.records:
logtext = record.getMessage()
if "copied" in logtext:
copied += 1
elif "in dst" in logtext:
in_dst += 1
assert (
copied == expected_copied and in_dst == expected_in_dst
), "Unexpected amount of objects copied, see the captured log for details"
for (sha1, content) in contents.items():
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
class FlakyObjStorage(InMemoryObjStorage):
def __init__(self, *args, **kwargs):
state = kwargs.pop("state")
self.failures_left = Counter(kwargs.pop("failures"))
super().__init__(*args, **kwargs)
if state:
self.state = state
def flaky_operation(self, op, obj_id):
if self.failures_left[op, obj_id] > 0:
self.failures_left[op, obj_id] -= 1
raise RuntimeError("Failed %s on %s" % (op, hash_to_hex(obj_id)))
def get(self, obj_id):
self.flaky_operation("get", obj_id)
return super().get(obj_id)
def add(self, data, obj_id=None, check_presence=True):
self.flaky_operation("add", obj_id)
return super().add(data, obj_id=obj_id, check_presence=check_presence)
def __contains__(self, obj_id):
self.flaky_operation("in", obj_id)
return super().__contains__(obj_id)
@_patch_objstorages(["src", "dst"])
def test_replay_content_check_dst_retry(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ kafka_server: str,
monkeypatch_retry_sleep,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
- contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages)
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
failures = {}
for i, (sha1, content) in enumerate(contents.items()):
if i >= NUM_CONTENTS_DST:
break
objstorages["dst"].add(content, obj_id=sha1)
failures["in", sha1] = 1
orig_dst = objstorages["dst"]
objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures)
result = invoke(
"content-replay",
"--stop-after-objects",
str(NUM_CONTENTS),
"--check-dst",
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
for (sha1, content) in contents.items():
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
@_patch_objstorages(["src", "dst"])
def test_replay_content_failed_copy_retry(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ kafka_server: str,
caplog,
monkeypatch_retry_sleep,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
- contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages)
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
add_failures = {}
get_failures = {}
definitely_failed = set()
# We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times.
# We generate failures for 2 different operations, get and add.
num_retry_contents = 2 * CONTENT_REPLAY_RETRIES
assert (
num_retry_contents < NUM_CONTENTS
), "Need to generate more test contents to properly test retry behavior"
for i, sha1 in enumerate(contents):
if i >= num_retry_contents:
break
# This generates a number of failures, up to CONTENT_REPLAY_RETRIES
num_failures = (i % CONTENT_REPLAY_RETRIES) + 1
# This generates failures of add for the first CONTENT_REPLAY_RETRIES
# objects, then failures of get.
if i < CONTENT_REPLAY_RETRIES:
add_failures["add", sha1] = num_failures
else:
get_failures["get", sha1] = num_failures
# Only contents that have CONTENT_REPLAY_RETRIES or more are
# definitely failing
if num_failures >= CONTENT_REPLAY_RETRIES:
definitely_failed.add(hash_to_hex(sha1))
objstorages["dst"] = FlakyObjStorage(
state=objstorages["dst"].state, failures=add_failures,
)
objstorages["src"] = FlakyObjStorage(
state=objstorages["src"].state, failures=get_failures,
)
caplog.set_level(logging.DEBUG, "swh.journal.replay")
result = invoke(
"content-replay",
"--stop-after-objects",
str(NUM_CONTENTS),
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
copied = 0
actually_failed = set()
for record in caplog.records:
logtext = record.getMessage()
if "copied" in logtext:
copied += 1
elif "Failed operation" in logtext:
assert record.levelno == logging.ERROR
assert record.args["retries"] == CONTENT_REPLAY_RETRIES
actually_failed.add(record.args["obj_id"])
assert (
actually_failed == definitely_failed
), "Unexpected object copy failures; see captured log for details"
for (sha1, content) in contents.items():
if hash_to_hex(sha1) in definitely_failed:
assert sha1 not in objstorages["dst"]
continue
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
@_patch_objstorages(["src", "dst"])
def test_replay_content_objnotfound(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ kafka_server: str,
caplog,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
- contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages)
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
num_contents_deleted = 5
contents_deleted = set()
for i, sha1 in enumerate(contents):
if i >= num_contents_deleted:
break
del objstorages["src"].state[sha1]
contents_deleted.add(hash_to_hex(sha1))
caplog.set_level(logging.DEBUG, "swh.journal.replay")
result = invoke(
"content-replay",
"--stop-after-objects",
str(NUM_CONTENTS),
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
copied = 0
not_in_src = set()
for record in caplog.records:
logtext = record.getMessage()
if "copied" in logtext:
copied += 1
elif "object not found" in logtext:
# Check that the object id can be recovered from logs
assert record.levelno == logging.ERROR
not_in_src.add(record.args["obj_id"])
assert (
copied == NUM_CONTENTS - num_contents_deleted
), "Unexpected number of contents copied"
assert (
not_in_src == contents_deleted
), "Mismatch between deleted contents and not_in_src logs"
for (sha1, content) in contents.items():
if sha1 not in objstorages["src"]:
continue
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py
index f0bd3cd..76d190c 100644
--- a/swh/journal/tests/test_client.py
+++ b/swh/journal/tests/test_client.py
@@ -1,144 +1,137 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from subprocess import Popen
-from typing import Dict, List, Tuple
+from typing import Dict, List
from unittest.mock import MagicMock
from confluent_kafka import Producer
import pytest
from swh.model.hypothesis_strategies import revisions
from swh.model.model import Content
from swh.journal.client import JournalClient
from swh.journal.serializers import key_to_kafka, value_to_kafka
-def test_client(
- kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int]
-):
- (_, port) = kafka_server
+def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str):
kafka_prefix += ".swh.journal.objects"
producer = Producer(
{
- "bootstrap.servers": "localhost:{}".format(port),
+ "bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
rev = revisions().example()
# Fill Kafka
producer.produce(
topic=kafka_prefix + ".revision",
key=key_to_kafka(rev.id),
value=value_to_kafka(rev.to_dict()),
)
producer.flush()
client = JournalClient(
- brokers="localhost:%d" % kafka_server[1],
+ brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=1,
)
worker_fn = MagicMock()
client.process(worker_fn)
worker_fn.assert_called_once_with({"revision": [rev.to_dict()]})
-def test_client_eof(
- kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int]
-):
- (_, port) = kafka_server
+def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str):
kafka_prefix += ".swh.journal.objects"
producer = Producer(
{
- "bootstrap.servers": "localhost:{}".format(port),
+ "bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
rev = revisions().example()
# Fill Kafka
producer.produce(
topic=kafka_prefix + ".revision",
key=key_to_kafka(rev.id),
value=value_to_kafka(rev.to_dict()),
)
producer.flush()
client = JournalClient(
- brokers="localhost:%d" % kafka_server[1],
+ brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=None,
stop_on_eof=True,
)
worker_fn = MagicMock()
client.process(worker_fn)
worker_fn.assert_called_once_with({"revision": [rev.to_dict()]})
@pytest.mark.parametrize("batch_size", [1, 5, 100])
def test_client_batch_size(
- kafka_prefix: str,
- kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
- batch_size: int,
+ kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int,
):
- (_, port) = kafka_server
kafka_prefix += ".swh.journal.objects"
num_objects = 2 * batch_size + 1
assert num_objects < 256, "Too many objects, generation will fail"
producer = Producer(
{
- "bootstrap.servers": "localhost:{}".format(port),
+ "bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
contents = [Content.from_data(bytes([i])) for i in range(num_objects)]
# Fill Kafka
for content in contents:
producer.produce(
topic=kafka_prefix + ".content",
key=key_to_kafka(content.sha1),
value=value_to_kafka(content.to_dict()),
)
producer.flush()
client = JournalClient(
- brokers=["localhost:%d" % kafka_server[1]],
+ brokers=[kafka_server],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=num_objects,
batch_size=batch_size,
)
collected_output: List[Dict] = []
def worker_fn(objects):
received = objects["content"]
assert len(received) <= batch_size
collected_output.extend(received)
client.process(worker_fn)
- assert collected_output == [content.to_dict() for content in contents]
+ expected_output = [content.to_dict() for content in contents]
+ assert len(collected_output) == len(expected_output)
+
+ for output in collected_output:
+ assert output in expected_output
diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py
index 0aa749e..1ff8712 100644
--- a/swh/journal/tests/test_kafka_writer.py
+++ b/swh/journal/tests/test_kafka_writer.py
@@ -1,223 +1,212 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
-from confluent_kafka import Consumer, Producer, KafkaException
-
import pytest
-from subprocess import Popen
-from typing import Tuple
+from confluent_kafka import Consumer, Producer, KafkaException
from swh.storage import get_storage
from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value
from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError
from swh.model.model import Directory, Origin, OriginVisit
from .conftest import TEST_OBJECTS, TEST_OBJECT_DICTS
def consume_messages(consumer, kafka_prefix, expected_messages):
"""Consume expected_messages from the consumer;
Sort them all into a consumed_objects dict"""
consumed_messages = defaultdict(list)
fetched_messages = 0
retries_left = 1000
while fetched_messages < expected_messages:
if retries_left == 0:
raise ValueError("Timed out fetching messages from kafka")
msg = consumer.poll(timeout=0.01)
if not msg:
retries_left -= 1
continue
error = msg.error()
if error is not None:
if error.fatal():
raise KafkaException(error)
retries_left -= 1
continue
fetched_messages += 1
topic = msg.topic()
assert topic.startswith(kafka_prefix + "."), "Unexpected topic"
object_type = topic[len(kafka_prefix + ".") :]
consumed_messages[object_type].append(
(kafka_to_key(msg.key()), kafka_to_value(msg.value()))
)
return consumed_messages
def assert_all_objects_consumed(consumed_messages):
"""Check whether all objects from TEST_OBJECT_DICTS have been consumed"""
for object_type, known_values in TEST_OBJECT_DICTS.items():
known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]]
(received_keys, received_values) = zip(*consumed_messages[object_type])
if object_type == "origin_visit":
for value in received_values:
del value["visit"]
elif object_type == "content":
for value in received_values:
del value["ctime"]
for key in known_keys:
assert key in received_keys
for value in known_values:
assert value in received_values
-def test_kafka_writer(
- kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer
-):
+def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer):
kafka_prefix += ".swh.journal.objects"
writer = KafkaJournalWriter(
- brokers=[f"localhost:{kafka_server[1]}"],
- client_id="kafka_writer",
- prefix=kafka_prefix,
+ brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix,
)
expected_messages = 0
for object_type, objects in TEST_OBJECTS.items():
writer.write_additions(object_type, objects)
expected_messages += len(objects)
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages)
assert_all_objects_consumed(consumed_messages)
-def test_storage_direct_writer(
- kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer
-):
+def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer):
kafka_prefix += ".swh.journal.objects"
writer_config = {
"cls": "kafka",
- "brokers": ["localhost:%d" % kafka_server[1]],
+ "brokers": [kafka_server],
"client_id": "kafka_writer",
"prefix": kafka_prefix,
}
storage_config = {
"cls": "pipeline",
"steps": [{"cls": "memory", "journal_writer": writer_config},],
}
storage = get_storage(**storage_config)
expected_messages = 0
for object_type, objects in TEST_OBJECTS.items():
method = getattr(storage, object_type + "_add")
if object_type in (
"content",
"directory",
"revision",
"release",
"snapshot",
"origin",
):
method(objects)
expected_messages += len(objects)
elif object_type in ("origin_visit",):
for obj in objects:
assert isinstance(obj, OriginVisit)
storage.origin_add_one(Origin(url=obj.origin))
visit = method(obj.origin, date=obj.date, type=obj.type)
expected_messages += 1
obj_d = obj.to_dict()
for k in ("visit", "origin", "date", "type"):
del obj_d[k]
storage.origin_visit_update(obj.origin, visit.visit, **obj_d)
expected_messages += 1
else:
assert False, object_type
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages)
assert_all_objects_consumed(consumed_messages)
def test_write_delivery_failure(
- kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer,
+ kafka_prefix: str, kafka_server: str, consumer: Consumer
):
class MockKafkaError:
"""A mocked kafka error"""
def str(self):
return "Mocked Kafka Error"
def name(self):
return "SWH_MOCK_ERROR"
class KafkaJournalWriterFailDelivery(KafkaJournalWriter):
"""A journal writer which always fails delivering messages"""
def _on_delivery(self, error, message):
"""Replace the inbound error with a fake delivery error"""
super()._on_delivery(MockKafkaError(), message)
kafka_prefix += ".swh.journal.objects"
writer = KafkaJournalWriterFailDelivery(
- brokers=["localhost:%d" % kafka_server[1]],
- client_id="kafka_writer",
- prefix=kafka_prefix,
+ brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix,
)
empty_dir = Directory(entries=[])
with pytest.raises(KafkaDeliveryError) as exc:
writer.write_addition("directory", empty_dir)
assert "Failed deliveries" in exc.value.message
assert len(exc.value.delivery_failures) == 1
delivery_failure = exc.value.delivery_failures[0]
assert delivery_failure.key == empty_dir.id
assert delivery_failure.code == "SWH_MOCK_ERROR"
def test_write_delivery_timeout(
- kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer
+ kafka_prefix: str, kafka_server: str, consumer: Consumer
):
produced = []
class MockProducer(Producer):
"""A kafka producer which pretends to produce messages, but never sends any
delivery acknowledgements"""
def produce(self, **kwargs):
produced.append(kwargs)
kafka_prefix += ".swh.journal.objects"
writer = KafkaJournalWriter(
- brokers=["localhost:%d" % kafka_server[1]],
+ brokers=[kafka_server],
client_id="kafka_writer",
prefix=kafka_prefix,
flush_timeout=1,
producer_class=MockProducer,
)
empty_dir = Directory(entries=[])
with pytest.raises(KafkaDeliveryError) as exc:
writer.write_addition("directory", empty_dir)
assert len(produced) == 1
assert "timeout" in exc.value.message
assert len(exc.value.delivery_failures) == 1
delivery_failure = exc.value.delivery_failures[0]
assert delivery_failure.key == empty_dir.id
assert delivery_failure.code == "SWH_FLUSH_TIMEOUT"
diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py
index 5925049..796236d 100644
--- a/swh/journal/tests/test_replay.py
+++ b/swh/journal/tests/test_replay.py
@@ -1,414 +1,405 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import functools
import logging
import random
-from subprocess import Popen
-from typing import Dict, List, Tuple
+from typing import Dict, List
import dateutil
import pytest
from confluent_kafka import Producer
from hypothesis import strategies, given, settings
from swh.storage import get_storage
from swh.journal.client import JournalClient
from swh.journal.serializers import key_to_kafka, value_to_kafka
from swh.journal.replay import process_replay_objects, is_hash_in_bytearray
from swh.model.hashutil import hash_to_hex
from swh.model.model import Content
from .conftest import TEST_OBJECT_DICTS, DUPLICATE_CONTENTS
from .utils import MockedJournalClient, MockedKafkaWriter
storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]}
def make_topic(kafka_prefix: str, object_type: str) -> str:
return kafka_prefix + "." + object_type
def test_storage_play(
- kafka_prefix: str,
- kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
- caplog,
+ kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog,
):
"""Optimal replayer scenario.
This:
- writes objects to the topic
- replayer consumes objects from the topic and replay them
"""
- (_, port) = kafka_server
kafka_prefix += ".swh.journal.objects"
storage = get_storage(**storage_config)
producer = Producer(
{
- "bootstrap.servers": "localhost:{}".format(port),
+ "bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
now = datetime.datetime.now(tz=datetime.timezone.utc)
# Fill Kafka
nb_sent = 0
nb_visits = 0
for object_type, objects in TEST_OBJECT_DICTS.items():
topic = make_topic(kafka_prefix, object_type)
for object_ in objects:
key = bytes(random.randint(0, 255) for _ in range(40))
object_ = object_.copy()
if object_type == "content":
object_["ctime"] = now
elif object_type == "origin_visit":
nb_visits += 1
object_["visit"] = nb_visits
producer.produce(
topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_),
)
nb_sent += 1
producer.flush()
caplog.set_level(logging.ERROR, "swh.journal.replay")
# Fill the storage from Kafka
replayer = JournalClient(
- brokers="localhost:%d" % kafka_server[1],
+ brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=nb_sent,
)
worker_fn = functools.partial(process_replay_objects, storage=storage)
nb_inserted = 0
while nb_inserted < nb_sent:
nb_inserted += replayer.process(worker_fn)
assert nb_sent == nb_inserted
# Check the objects were actually inserted in the storage
assert TEST_OBJECT_DICTS["revision"] == list(
storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]])
)
assert TEST_OBJECT_DICTS["release"] == list(
storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]])
)
origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]]))
assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins]
for origin in origins:
origin_url = origin["url"]
expected_visits = [
{
**visit,
"origin": origin_url,
"date": dateutil.parser.parse(visit["date"]),
}
for visit in TEST_OBJECT_DICTS["origin_visit"]
if visit["origin"] == origin["url"]
]
actual_visits = list(storage.origin_visit_get(origin_url))
for visit in actual_visits:
del visit["visit"] # opaque identifier
assert expected_visits == actual_visits
input_contents = TEST_OBJECT_DICTS["content"]
contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents])
assert len(contents) == len(input_contents)
assert contents == {cont["sha1"]: [cont] for cont in input_contents}
collision = 0
for record in caplog.records:
logtext = record.getMessage()
if "Colliding contents:" in logtext:
collision += 1
assert collision == 0, "No collision should be detected"
def test_storage_play_with_collision(
- kafka_prefix: str,
- kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
- caplog,
+ kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog,
):
"""Another replayer scenario with collisions.
This:
- writes objects to the topic, including colliding contents
- replayer consumes objects from the topic and replay them
- This drops the colliding contents from the replay when detected
"""
- (_, port) = kafka_server
kafka_prefix += ".swh.journal.objects"
storage = get_storage(**storage_config)
producer = Producer(
{
- "bootstrap.servers": "localhost:{}".format(port),
+ "bootstrap.servers": kafka_server,
"client.id": "test producer",
"enable.idempotence": "true",
}
)
now = datetime.datetime.now(tz=datetime.timezone.utc)
# Fill Kafka
nb_sent = 0
nb_visits = 0
for object_type, objects in TEST_OBJECT_DICTS.items():
topic = make_topic(kafka_prefix, object_type)
for object_ in objects:
key = bytes(random.randint(0, 255) for _ in range(40))
object_ = object_.copy()
if object_type == "content":
object_["ctime"] = now
elif object_type == "origin_visit":
nb_visits += 1
object_["visit"] = nb_visits
producer.produce(
topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_),
)
nb_sent += 1
# Create collision in input data
# They are not written in the destination
for content in DUPLICATE_CONTENTS:
topic = make_topic(kafka_prefix, "content")
producer.produce(
topic=topic, key=key_to_kafka(key), value=value_to_kafka(content),
)
nb_sent += 1
producer.flush()
caplog.set_level(logging.ERROR, "swh.journal.replay")
# Fill the storage from Kafka
replayer = JournalClient(
- brokers="localhost:%d" % kafka_server[1],
+ brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=nb_sent,
)
worker_fn = functools.partial(process_replay_objects, storage=storage)
nb_inserted = 0
while nb_inserted < nb_sent:
nb_inserted += replayer.process(worker_fn)
assert nb_sent == nb_inserted
# Check the objects were actually inserted in the storage
assert TEST_OBJECT_DICTS["revision"] == list(
storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]])
)
assert TEST_OBJECT_DICTS["release"] == list(
storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]])
)
origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]]))
assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins]
for origin in origins:
origin_url = origin["url"]
expected_visits = [
{
**visit,
"origin": origin_url,
"date": dateutil.parser.parse(visit["date"]),
}
for visit in TEST_OBJECT_DICTS["origin_visit"]
if visit["origin"] == origin["url"]
]
actual_visits = list(storage.origin_visit_get(origin_url))
for visit in actual_visits:
del visit["visit"] # opaque identifier
assert expected_visits == actual_visits
input_contents = TEST_OBJECT_DICTS["content"]
contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents])
assert len(contents) == len(input_contents)
assert contents == {cont["sha1"]: [cont] for cont in input_contents}
nb_collisions = 0
actual_collision: Dict
for record in caplog.records:
logtext = record.getMessage()
if "Collision detected:" in logtext:
nb_collisions += 1
actual_collision = record.args["collision"]
assert nb_collisions == 1, "1 collision should be detected"
algo = "sha1"
assert actual_collision["algo"] == algo
expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo])
assert actual_collision["hash"] == expected_colliding_hash
actual_colliding_hashes = actual_collision["objects"]
assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS)
for content in DUPLICATE_CONTENTS:
expected_content_hashes = {
k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items()
}
assert expected_content_hashes in actual_colliding_hashes
def _test_write_replay_origin_visit(visits: List[Dict]):
"""Helper function to write tests for origin_visit.
Each visit (a dict) given in the 'visits' argument will be sent to
a (mocked) kafka queue, which a in-memory-storage backed replayer is
listening to.
Check that corresponding origin visits entities are present in the storage
and have correct values if they are not skipped.
"""
queue: List = []
replayer = MockedJournalClient(queue)
writer = MockedKafkaWriter(queue)
# Note that flipping the order of these two insertions will crash
# the test, because the legacy origin_format does not allow to create
# the origin when needed (type is missing)
writer.send(
"origin",
"foo",
{
"url": "http://example.com/",
"type": "git", # test the legacy origin format is accepted
},
)
for visit in visits:
writer.send("origin_visit", "foo", visit)
queue_size = len(queue)
assert replayer.stop_after_objects is None
replayer.stop_after_objects = queue_size
storage = get_storage(**storage_config)
worker_fn = functools.partial(process_replay_objects, storage=storage)
replayer.process(worker_fn)
actual_visits = list(storage.origin_visit_get("http://example.com/"))
assert len(actual_visits) == len(visits), actual_visits
for vin, vout in zip(visits, actual_visits):
vin = vin.copy()
vout = vout.copy()
assert vout.pop("origin") == "http://example.com/"
vin.pop("origin")
vin.setdefault("type", "git")
vin.setdefault("metadata", None)
assert vin == vout
def test_write_replay_origin_visit():
"""Test origin_visit when the 'origin' is just a string."""
now = datetime.datetime.now()
visits = [
{
"visit": 1,
"origin": "http://example.com/",
"date": now,
"type": "git",
"status": "partial",
"snapshot": None,
}
]
_test_write_replay_origin_visit(visits)
def test_write_replay_legacy_origin_visit1():
"""Origin_visit with no types should make the replayer crash
We expect the journal's origin_visit topic to no longer reference such
visits. If it does, the replayer must crash so we can fix the journal's
topic.
"""
now = datetime.datetime.now()
visit = {
"visit": 1,
"origin": "http://example.com/",
"date": now,
"status": "partial",
"snapshot": None,
}
now2 = datetime.datetime.now()
visit2 = {
"visit": 2,
"origin": {"url": "http://example.com/"},
"date": now2,
"status": "partial",
"snapshot": None,
}
for origin_visit in [visit, visit2]:
with pytest.raises(ValueError, match="Old origin visit format"):
_test_write_replay_origin_visit([origin_visit])
def test_write_replay_legacy_origin_visit2():
"""Test origin_visit when 'type' is missing from the visit, but not
from the origin."""
now = datetime.datetime.now()
visits = [
{
"visit": 1,
"origin": {"url": "http://example.com/", "type": "git",},
"date": now,
"type": "git",
"status": "partial",
"snapshot": None,
}
]
_test_write_replay_origin_visit(visits)
def test_write_replay_legacy_origin_visit3():
"""Test origin_visit when the origin is a dict"""
now = datetime.datetime.now()
visits = [
{
"visit": 1,
"origin": {"url": "http://example.com/",},
"date": now,
"type": "git",
"status": "partial",
"snapshot": None,
}
]
_test_write_replay_origin_visit(visits)
hash_strategy = strategies.binary(min_size=20, max_size=20)
@settings(max_examples=500)
@given(
strategies.sets(hash_strategy, min_size=0, max_size=500),
strategies.sets(hash_strategy, min_size=10),
)
def test_is_hash_in_bytearray(haystack, needles):
array = b"".join(sorted(haystack))
needles |= haystack # Exhaustively test for all objects in the array
for needle in needles:
assert is_hash_in_bytearray(needle, array, len(haystack)) == (
needle in haystack
)
diff --git a/tox.ini b/tox.ini
index e8aa391..9020b04 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,40 +1,37 @@
[tox]
envlist=black,flake8,mypy,py3
[testenv]
-passenv=SWH_KAFKA_ROOT
extras =
testing
deps =
pytest-cov
dev: pdbpp
-setenv =
- SWH_KAFKA_ROOT = {env:SWH_KAFKA_ROOT:swh/journal/tests/kafka}
commands =
pytest --cov={envsitepackagesdir}/swh/journal \
{envsitepackagesdir}/swh/journal \
--cov-branch \
--doctest-modules {posargs}
[testenv:black]
skip_install = true
deps =
black
commands =
{envpython} -m black --check swh
[testenv:flake8]
skip_install = true
deps =
git+https://github.com/PyCQA/pyflakes.git
flake8
commands =
{envpython} -m flake8
[testenv:mypy]
extras =
testing
deps =
mypy
commands =
mypy swh

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 12:22 PM (2 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3313746

Event Timeline