Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9341920
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
62 KB
Subscribers
None
View Options
diff --git a/bin/install-kafka.sh b/bin/install-kafka.sh
deleted file mode 100755
index 79fd5e6..0000000
--- a/bin/install-kafka.sh
+++ /dev/null
@@ -1,56 +0,0 @@
-#!/usr/bin/env bash
-
-set -xe
-
-SCALA_VERSION=2.12
-KAFKA_VERSION=2.4.0
-KAFKA_CHECKSUM=53b52f86ea56c9fac62046524f03f75665a089ea2dae554aefe3a3d2694f2da88b5ba8725d8be55f198ba80695443559ed9de7c0b2a2817f7a6141008ff79f49
-
-KAFKA_APP="kafka_${SCALA_VERSION}-${KAFKA_VERSION}"
-TARBALL="${KAFKA_APP}.tgz"
-URL="http://apache.mirrors.ovh.net/ftp.apache.org/dist/kafka/${KAFKA_VERSION}/${TARBALL}"
-CHECKSUMS="${TARBALL}.sha512"
-KAFKA_ROOT_DIR=swh/journal/tests
-KAFKA_DIR="${KAFKA_ROOT_DIR}/${KAFKA_APP}"
-KAFKA_LINK="${KAFKA_ROOT_DIR}/kafka"
-
-case $1 in
- "install")
- echo "Kafka installation started."
- if [ ! -f $TARBALL ]; then
- echo "Kafka download"
- wget $URL
- echo "${KAFKA_CHECKSUM} ${TARBALL}" > $CHECKSUMS
- sha512sum -c $CHECKSUMS
-
- if [ $? -ne 0 ]; then
- echo "Kafka download: failed to retrieve ${TARBALL}";
- exit 1
- fi
- echo "Kafka download done"
- fi
-
- if [ ! -d $KAFKA_DIR ]; then
- echo "Kafka extraction"
- tar xvf $TARBALL -C $KAFKA_ROOT_DIR
- pushd $KAFKA_ROOT_DIR && ln -nsf $KAFKA_APP kafka && popd
- echo "Kafka extraction done"
- fi
- echo "Kafka installation done. Kafka is installed at $KAFKA_DIR"
- ;;
- "clean")
- echo "Kafka cleanup started."
- [ -d $KAFKA_DIR ] && rm -rf $KAFKA_DIR
- [ -L $KAFKA_LINK ] && rm $KAFKA_LINK
- echo "Kafka cleanup done."
- ;;
- "clean-cache")
- echo "Kafka cleanup cache started."
- [ -f $TARBALL ] && rm $TARBALL
- [ -f $CHECKSUMS ] && rm $CHECKSUMS
- echo "Kafka cleanup cache done."
- ;;
- *)
- echo "Unknown command, do nothing"
- exit 1;
-esac
diff --git a/requirements-test.txt b/requirements-test.txt
index 7f2dc48..c727717 100644
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,4 +1,3 @@
pytest
swh.model >= 0.0.34
-pytest-kafka
hypothesis
diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py
index e7169e2..3b89a7b 100644
--- a/swh/journal/tests/conftest.py
+++ b/swh/journal/tests/conftest.py
@@ -1,358 +1,252 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
-import os
import pytest
import logging
import random
import string
-from confluent_kafka import Consumer
-from confluent_kafka.admin import AdminClient, ConfigResource
+from confluent_kafka import Consumer, Producer
from hypothesis.strategies import one_of
-from subprocess import Popen
-from typing import Any, Dict, Iterator, List, Tuple
-
-from pathlib import Path
-from pytest_kafka import (
- make_zookeeper_process,
- make_kafka_server,
- KAFKA_SERVER_CONFIG_TEMPLATE,
- ZOOKEEPER_CONFIG_TEMPLATE,
-)
+from typing import Any, Dict, Iterator, List
from swh.model import hypothesis_strategies as strategies
from swh.model.hashutil import MultiHash, hash_to_bytes
from swh.journal.serializers import ModelObject
from swh.journal.writer.kafka import OBJECT_TYPES
logger = logging.getLogger(__name__)
CONTENTS = [
{**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",},
]
duplicate_content1 = {
"length": 4,
"sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"),
"sha1_git": b"another-foo",
"blake2s256": b"another-bar",
"sha256": b"another-baz",
"status": "visible",
}
# Craft a sha1 collision
duplicate_content2 = duplicate_content1.copy()
sha1_array = bytearray(duplicate_content1["sha1_git"])
sha1_array[0] += 1
duplicate_content2["sha1_git"] = bytes(sha1_array)
DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2]
COMMITTERS = [
{"fullname": b"foo", "name": b"foo", "email": b"",},
{"fullname": b"bar", "name": b"bar", "email": b"",},
]
DATES = [
{
"timestamp": {"seconds": 1234567891, "microseconds": 0,},
"offset": 120,
"negative_utc": False,
},
{
"timestamp": {"seconds": 1234567892, "microseconds": 0,},
"offset": 120,
"negative_utc": False,
},
]
REVISIONS = [
{
"id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"),
"message": b"hello",
"date": DATES[0],
"committer": COMMITTERS[0],
"author": COMMITTERS[0],
"committer_date": DATES[0],
"type": "git",
"directory": b"\x01" * 20,
"synthetic": False,
"metadata": None,
"parents": [],
},
{
"id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"),
"message": b"hello again",
"date": DATES[1],
"committer": COMMITTERS[1],
"author": COMMITTERS[1],
"committer_date": DATES[1],
"type": "hg",
"directory": b"\x02" * 20,
"synthetic": False,
"metadata": None,
"parents": [],
},
]
RELEASES = [
{
"id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"),
"name": b"v0.0.1",
"date": {
"timestamp": {"seconds": 1234567890, "microseconds": 0,},
"offset": 120,
"negative_utc": False,
},
"author": COMMITTERS[0],
"target_type": "revision",
"target": b"\x04" * 20,
"message": b"foo",
"synthetic": False,
},
]
ORIGINS = [
{"url": "https://somewhere.org/den/fox",},
{"url": "https://overtherainbow.org/fox/den",},
]
ORIGIN_VISITS = [
{
"origin": ORIGINS[0]["url"],
"date": "2013-05-07 04:20:39.369271+00:00",
"snapshot": None, # TODO
"status": "ongoing", # TODO
"metadata": {"foo": "bar"},
"type": "git",
},
{
"origin": ORIGINS[0]["url"],
"date": "2018-11-27 17:20:39+00:00",
"snapshot": None, # TODO
"status": "ongoing", # TODO
"metadata": {"baz": "qux"},
"type": "git",
},
]
TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = {
"content": CONTENTS,
"revision": REVISIONS,
"release": RELEASES,
"origin": ORIGINS,
"origin_visit": ORIGIN_VISITS,
}
MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()}
TEST_OBJECTS: Dict[str, List[ModelObject]] = {}
for object_type, objects in TEST_OBJECT_DICTS.items():
converted_objects: List[ModelObject] = []
model = MODEL_OBJECTS[object_type]
for (num, obj_d) in enumerate(objects):
if object_type == "origin_visit":
obj_d = {**obj_d, "visit": num}
elif object_type == "content":
obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now()}
converted_objects.append(model.from_dict(obj_d))
TEST_OBJECTS[object_type] = converted_objects
-KAFKA_ROOT = os.environ.get("SWH_KAFKA_ROOT")
-KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + "/kafka"
-if not os.path.exists(KAFKA_ROOT):
- msg = (
- "Development error: %s must exist and target an "
- "existing kafka installation" % KAFKA_ROOT
- )
- raise ValueError(msg)
-
-KAFKA_SCRIPTS = Path(KAFKA_ROOT) / "bin"
-
-KAFKA_BIN = str(KAFKA_SCRIPTS / "kafka-server-start.sh")
-ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / "zookeeper-server-start.sh")
-
-ZK_CONFIG_TEMPLATE = ZOOKEEPER_CONFIG_TEMPLATE + "\nadmin.enableServer=false\n"
-KAFKA_CONFIG_TEMPLATE = KAFKA_SERVER_CONFIG_TEMPLATE + "\nmessage.max.bytes=104857600\n"
-
-# Those defines fixtures
-zookeeper_proc = make_zookeeper_process(
- ZOOKEEPER_BIN, zk_config_template=ZK_CONFIG_TEMPLATE, scope="session"
-)
-os.environ[
- "KAFKA_LOG4J_OPTS"
-] = "-Dlog4j.configuration=file:%s/log4j.properties" % os.path.dirname(__file__)
-session_kafka_server = make_kafka_server(
- KAFKA_BIN,
- "zookeeper_proc",
- kafka_config_template=KAFKA_CONFIG_TEMPLATE,
- scope="session",
-)
-
-kafka_logger = logging.getLogger("kafka")
-kafka_logger.setLevel(logging.WARN)
-
-
@pytest.fixture(scope="function")
def kafka_prefix():
"""Pick a random prefix for kafka topics on each call"""
return "".join(random.choice(string.ascii_lowercase) for _ in range(10))
@pytest.fixture(scope="function")
def kafka_consumer_group(kafka_prefix: str):
"""Pick a random consumer group for kafka consumers on each call"""
return "test-consumer-%s" % kafka_prefix
@pytest.fixture(scope="session")
-def kafka_admin_client(session_kafka_server: Tuple[Popen, int]) -> AdminClient:
- return AdminClient({"bootstrap.servers": "localhost:%s" % session_kafka_server[1]})
+def kafka_server() -> Iterator[str]:
+ p = Producer({"test.mock.num.brokers": "1"})
+ metadata = p.list_topics()
+ brokers = [str(broker) for broker in metadata.brokers.values()]
+ assert len(brokers) == 1, "More than one broker found in the kafka cluster?!"
-@pytest.fixture(scope="function")
-def kafka_server_config_overrides() -> Dict[str, str]:
- return {}
+ broker_connstr, broker_id = brokers[0].split("/")
+ ip, port_str = broker_connstr.split(":")
+ assert ip == "127.0.0.1"
+ assert int(port_str)
+ yield broker_connstr
-@pytest.fixture(scope="function")
-def kafka_server(
- session_kafka_server: Tuple[Popen, int],
- kafka_admin_client: AdminClient,
- kafka_server_config_overrides: Dict[str, str],
-) -> Iterator[Tuple[Popen, int]]:
- # No overrides, we can just return the original broker connection
- if not kafka_server_config_overrides:
- yield session_kafka_server
- return
-
- # This is the minimal operation that the kafka_admin_client gives to
- # retrieve the cluster metadata, which we need to get the numeric id of the
- # broker spawned by pytest_kafka.
- metadata = kafka_admin_client.list_topics("__consumer_offsets")
- broker_ids = [str(broker) for broker in metadata.brokers.keys()]
- assert len(broker_ids) == 1, "More than one broker found in the kafka cluster?!"
-
- # Pull the current broker configuration. describe_configs and alter_configs
- # generate a dict containing one concurrent.future per queried
- # ConfigResource, hence the use of .result()
- broker = ConfigResource("broker", broker_ids[0])
- futures = kafka_admin_client.describe_configs([broker])
- original_config = futures[broker].result()
-
- # Gather the list of settings we need to change in the broker
- # ConfigResource, and their original values in the to_restore dict
- to_restore = {}
- for key, new_value in kafka_server_config_overrides.items():
- if key not in original_config:
- raise ValueError(f"Cannot override unknown configuration {key}")
- orig_value = original_config[key].value
- if orig_value == new_value:
- continue
- if original_config[key].is_read_only:
- raise ValueError(f"Cannot override read-only configuration {key}")
-
- broker.set_config(key, new_value)
- to_restore[key] = orig_value
-
- # to_restore will be empty if all the config "overrides" are equal to the
- # original value. No need to wait for a config alteration if that's the
- # case. The result() will raise a KafkaException if the settings change
- # failed.
- if to_restore:
- futures = kafka_admin_client.alter_configs([broker])
- try:
- futures[broker].result()
- except Exception:
- raise
-
- yield session_kafka_server
-
- # Now we can restore the old setting values. Again, the result() will raise
- # a KafkaException if the settings change failed.
- if to_restore:
- for key, orig_value in to_restore.items():
- broker.set_config(key, orig_value)
-
- futures = kafka_admin_client.alter_configs([broker])
- try:
- futures[broker].result()
- except Exception:
- raise
+ p.flush()
TEST_CONFIG = {
"consumer_id": "swh.journal.consumer",
"object_types": TEST_OBJECT_DICTS.keys(),
"stop_after_objects": 1, # will read 1 object and stop
"storage": {"cls": "memory", "args": {}},
}
@pytest.fixture
-def test_config(kafka_server: Tuple[Popen, int], kafka_prefix: str):
+def test_config(kafka_server: str, kafka_prefix: str):
"""Test configuration needed for producer/consumer
"""
- _, port = kafka_server
return {
**TEST_CONFIG,
- "brokers": ["127.0.0.1:{}".format(port)],
+ "brokers": [kafka_server],
"prefix": kafka_prefix + ".swh.journal.objects",
}
@pytest.fixture
def consumer(
- kafka_server: Tuple[Popen, int], test_config: Dict, kafka_consumer_group: str,
+ kafka_server: str, test_config: Dict, kafka_consumer_group: str,
) -> Consumer:
"""Get a connected Kafka consumer.
"""
- _, kafka_port = kafka_server
consumer = Consumer(
{
- "bootstrap.servers": "127.0.0.1:{}".format(kafka_port),
+ "bootstrap.servers": kafka_server,
"auto.offset.reset": "earliest",
"enable.auto.commit": True,
"group.id": kafka_consumer_group,
}
)
kafka_topics = [
"%s.%s" % (test_config["prefix"], object_type)
for object_type in test_config["object_types"]
]
consumer.subscribe(kafka_topics)
yield consumer
consumer.close()
def objects_d():
return one_of(
strategies.origins().map(lambda x: ("origin", x.to_dict())),
strategies.origin_visits().map(lambda x: ("origin_visit", x.to_dict())),
strategies.snapshots().map(lambda x: ("snapshot", x.to_dict())),
strategies.releases().map(lambda x: ("release", x.to_dict())),
strategies.revisions().map(lambda x: ("revision", x.to_dict())),
strategies.directories().map(lambda x: ("directory", x.to_dict())),
strategies.skipped_contents().map(lambda x: ("skipped_content", x.to_dict())),
strategies.present_contents().map(lambda x: ("content", x.to_dict())),
)
diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py
index eb19869..fd4ce12 100644
--- a/swh/journal/tests/test_cli.py
+++ b/swh/journal/tests/test_cli.py
@@ -1,631 +1,618 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import Counter
import copy
import functools
import logging
import re
import tempfile
-from subprocess import Popen
-from typing import Any, Dict, Tuple
+from typing import Any, Dict
from unittest.mock import patch
from click.testing import CliRunner
from confluent_kafka import Producer
import pytest
import yaml
from swh.model.hashutil import hash_to_hex
from swh.objstorage.backends.in_memory import InMemoryObjStorage
from swh.storage import get_storage
from swh.journal.cli import cli
from swh.journal.replay import CONTENT_REPLAY_RETRIES
from swh.journal.serializers import key_to_kafka, value_to_kafka
logger = logging.getLogger(__name__)
CLI_CONFIG = {
"storage": {"cls": "memory",},
"objstorage_src": {"cls": "mocked", "name": "src",},
"objstorage_dst": {"cls": "mocked", "name": "dst",},
}
@pytest.fixture
def storage():
"""An swh-storage object that gets injected into the CLI functions."""
storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]}
storage = get_storage(**storage_config)
with patch("swh.journal.cli.get_storage") as get_storage_mock:
get_storage_mock.return_value = storage
yield storage
@pytest.fixture
def monkeypatch_retry_sleep(monkeypatch):
from swh.journal.replay import copy_object, obj_in_objstorage
monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None)
monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None)
def invoke(*args, env=None, journal_config=None):
config = copy.deepcopy(CLI_CONFIG)
if journal_config:
config["journal"] = journal_config
runner = CliRunner()
with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd:
yaml.dump(config, config_fd)
config_fd.seek(0)
args = ["-C" + config_fd.name] + list(args)
return runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,)
def test_replay(
- storage,
- kafka_prefix: str,
- kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
producer = Producer(
{
- "bootstrap.servers": "localhost:{}".format(kafka_port),
+ "bootstrap.servers": kafka_server,
"client.id": "test-producer",
"acks": "all",
}
)
snapshot = {
"id": b"foo",
"branches": {b"HEAD": {"target_type": "revision", "target": b"\x01" * 20,}},
} # type: Dict[str, Any]
producer.produce(
topic=kafka_prefix + ".snapshot",
key=key_to_kafka(snapshot["id"]),
value=value_to_kafka(snapshot),
)
producer.flush()
logger.debug("Flushed producer")
result = invoke(
"replay",
"--stop-after-objects",
"1",
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
assert storage.snapshot_get(snapshot["id"]) == {**snapshot, "next_branch": None}
def _patch_objstorages(names):
objstorages = {name: InMemoryObjStorage() for name in names}
def get_mock_objstorage(cls, **args):
assert cls == "mocked", cls
return objstorages[args["name"]]
def decorator(f):
@functools.wraps(f)
@patch("swh.journal.cli.get_objstorage")
def newf(get_objstorage_mock, *args, **kwargs):
get_objstorage_mock.side_effect = get_mock_objstorage
f(*args, objstorages=objstorages, **kwargs)
return newf
return decorator
NUM_CONTENTS = 10
-def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages):
+def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages):
producer = Producer(
{
- "bootstrap.servers": "127.0.0.1:{}".format(kafka_port),
+ "bootstrap.servers": kafka_server,
"client.id": "test-producer",
"acks": "all",
}
)
contents = {}
for i in range(NUM_CONTENTS):
content = b"\x00" * 19 + bytes([i])
sha1 = objstorages["src"].add(content)
contents[sha1] = content
producer.produce(
topic=kafka_prefix + ".content",
key=key_to_kafka(sha1),
value=key_to_kafka({"sha1": sha1, "status": "visible",}),
)
producer.flush()
return contents
@_patch_objstorages(["src", "dst"])
def test_replay_content(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ kafka_server: str,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
- contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages)
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
result = invoke(
"content-replay",
"--stop-after-objects",
str(NUM_CONTENTS),
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
for (sha1, content) in contents.items():
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
@_patch_objstorages(["src", "dst"])
def test_replay_content_structured_log(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ kafka_server: str,
caplog,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
- contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages)
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
caplog.set_level(logging.DEBUG, "swh.journal.replay")
expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents)
result = invoke(
"content-replay",
"--stop-after-objects",
str(NUM_CONTENTS),
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
copied = set()
for record in caplog.records:
logtext = record.getMessage()
if "copied" in logtext:
copied.add(record.args["obj_id"])
assert (
copied == expected_obj_ids
), "Mismatched logging; see captured log output for details."
@_patch_objstorages(["src", "dst"])
def test_replay_content_static_group_id(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ kafka_server: str,
caplog,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
- contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages)
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
# Setup log capture to fish the consumer settings out of the log messages
caplog.set_level(logging.DEBUG, "swh.journal.client")
result = invoke(
"content-replay",
"--stop-after-objects",
str(NUM_CONTENTS),
env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"},
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
consumer_settings = None
for record in caplog.records:
if "Consumer settings" in record.message:
consumer_settings = record.args
break
assert consumer_settings is not None, (
"Failed to get consumer settings out of the consumer log. "
"See log capture for details."
)
assert consumer_settings["group.instance.id"] == "static-group-instance-id"
assert consumer_settings["session.timeout.ms"] == 60 * 10 * 1000
assert consumer_settings["max.poll.interval.ms"] == 90 * 10 * 1000
for (sha1, content) in contents.items():
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
@_patch_objstorages(["src", "dst"])
def test_replay_content_exclude(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ kafka_server: str,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
- contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages)
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
excluded_contents = list(contents)[0::2] # picking half of them
with tempfile.NamedTemporaryFile(mode="w+b") as fd:
fd.write(b"".join(sorted(excluded_contents)))
fd.seek(0)
result = invoke(
"content-replay",
"--stop-after-objects",
str(NUM_CONTENTS),
"--exclude-sha1-file",
fd.name,
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
for (sha1, content) in contents.items():
if sha1 in excluded_contents:
assert sha1 not in objstorages["dst"], sha1
else:
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
NUM_CONTENTS_DST = 5
@_patch_objstorages(["src", "dst"])
@pytest.mark.parametrize(
"check_dst,expected_copied,expected_in_dst",
[
(True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST),
(False, NUM_CONTENTS, 0),
],
)
def test_replay_content_check_dst(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ kafka_server: str,
check_dst: bool,
expected_copied: int,
expected_in_dst: int,
caplog,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
- contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages)
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
for i, (sha1, content) in enumerate(contents.items()):
if i >= NUM_CONTENTS_DST:
break
objstorages["dst"].add(content, obj_id=sha1)
caplog.set_level(logging.DEBUG, "swh.journal.replay")
result = invoke(
"content-replay",
"--stop-after-objects",
str(NUM_CONTENTS),
"--check-dst" if check_dst else "--no-check-dst",
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
copied = 0
in_dst = 0
for record in caplog.records:
logtext = record.getMessage()
if "copied" in logtext:
copied += 1
elif "in dst" in logtext:
in_dst += 1
assert (
copied == expected_copied and in_dst == expected_in_dst
), "Unexpected amount of objects copied, see the captured log for details"
for (sha1, content) in contents.items():
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
class FlakyObjStorage(InMemoryObjStorage):
def __init__(self, *args, **kwargs):
state = kwargs.pop("state")
self.failures_left = Counter(kwargs.pop("failures"))
super().__init__(*args, **kwargs)
if state:
self.state = state
def flaky_operation(self, op, obj_id):
if self.failures_left[op, obj_id] > 0:
self.failures_left[op, obj_id] -= 1
raise RuntimeError("Failed %s on %s" % (op, hash_to_hex(obj_id)))
def get(self, obj_id):
self.flaky_operation("get", obj_id)
return super().get(obj_id)
def add(self, data, obj_id=None, check_presence=True):
self.flaky_operation("add", obj_id)
return super().add(data, obj_id=obj_id, check_presence=check_presence)
def __contains__(self, obj_id):
self.flaky_operation("in", obj_id)
return super().__contains__(obj_id)
@_patch_objstorages(["src", "dst"])
def test_replay_content_check_dst_retry(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ kafka_server: str,
monkeypatch_retry_sleep,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
- contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages)
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
failures = {}
for i, (sha1, content) in enumerate(contents.items()):
if i >= NUM_CONTENTS_DST:
break
objstorages["dst"].add(content, obj_id=sha1)
failures["in", sha1] = 1
orig_dst = objstorages["dst"]
objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures)
result = invoke(
"content-replay",
"--stop-after-objects",
str(NUM_CONTENTS),
"--check-dst",
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
for (sha1, content) in contents.items():
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
@_patch_objstorages(["src", "dst"])
def test_replay_content_failed_copy_retry(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ kafka_server: str,
caplog,
monkeypatch_retry_sleep,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
- contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages)
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
add_failures = {}
get_failures = {}
definitely_failed = set()
# We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times.
# We generate failures for 2 different operations, get and add.
num_retry_contents = 2 * CONTENT_REPLAY_RETRIES
assert (
num_retry_contents < NUM_CONTENTS
), "Need to generate more test contents to properly test retry behavior"
for i, sha1 in enumerate(contents):
if i >= num_retry_contents:
break
# This generates a number of failures, up to CONTENT_REPLAY_RETRIES
num_failures = (i % CONTENT_REPLAY_RETRIES) + 1
# This generates failures of add for the first CONTENT_REPLAY_RETRIES
# objects, then failures of get.
if i < CONTENT_REPLAY_RETRIES:
add_failures["add", sha1] = num_failures
else:
get_failures["get", sha1] = num_failures
# Only contents that have CONTENT_REPLAY_RETRIES or more are
# definitely failing
if num_failures >= CONTENT_REPLAY_RETRIES:
definitely_failed.add(hash_to_hex(sha1))
objstorages["dst"] = FlakyObjStorage(
state=objstorages["dst"].state, failures=add_failures,
)
objstorages["src"] = FlakyObjStorage(
state=objstorages["src"].state, failures=get_failures,
)
caplog.set_level(logging.DEBUG, "swh.journal.replay")
result = invoke(
"content-replay",
"--stop-after-objects",
str(NUM_CONTENTS),
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
copied = 0
actually_failed = set()
for record in caplog.records:
logtext = record.getMessage()
if "copied" in logtext:
copied += 1
elif "Failed operation" in logtext:
assert record.levelno == logging.ERROR
assert record.args["retries"] == CONTENT_REPLAY_RETRIES
actually_failed.add(record.args["obj_id"])
assert (
actually_failed == definitely_failed
), "Unexpected object copy failures; see captured log for details"
for (sha1, content) in contents.items():
if hash_to_hex(sha1) in definitely_failed:
assert sha1 not in objstorages["dst"]
continue
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
@_patch_objstorages(["src", "dst"])
def test_replay_content_objnotfound(
objstorages,
storage,
kafka_prefix: str,
kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
+ kafka_server: str,
caplog,
):
- (_, kafka_port) = kafka_server
kafka_prefix += ".swh.journal.objects"
- contents = _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages)
+ contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages)
num_contents_deleted = 5
contents_deleted = set()
for i, sha1 in enumerate(contents):
if i >= num_contents_deleted:
break
del objstorages["src"].state[sha1]
contents_deleted.add(hash_to_hex(sha1))
caplog.set_level(logging.DEBUG, "swh.journal.replay")
result = invoke(
"content-replay",
"--stop-after-objects",
str(NUM_CONTENTS),
journal_config={
- "brokers": ["127.0.0.1:%d" % kafka_port],
+ "brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
copied = 0
not_in_src = set()
for record in caplog.records:
logtext = record.getMessage()
if "copied" in logtext:
copied += 1
elif "object not found" in logtext:
# Check that the object id can be recovered from logs
assert record.levelno == logging.ERROR
not_in_src.add(record.args["obj_id"])
assert (
copied == NUM_CONTENTS - num_contents_deleted
), "Unexpected number of contents copied"
assert (
not_in_src == contents_deleted
), "Mismatch between deleted contents and not_in_src logs"
for (sha1, content) in contents.items():
if sha1 not in objstorages["src"]:
continue
assert sha1 in objstorages["dst"], sha1
assert objstorages["dst"].get(sha1) == content
diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py
index f0bd3cd..76d190c 100644
--- a/swh/journal/tests/test_client.py
+++ b/swh/journal/tests/test_client.py
@@ -1,144 +1,137 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from subprocess import Popen
-from typing import Dict, List, Tuple
+from typing import Dict, List
from unittest.mock import MagicMock
from confluent_kafka import Producer
import pytest
from swh.model.hypothesis_strategies import revisions
from swh.model.model import Content
from swh.journal.client import JournalClient
from swh.journal.serializers import key_to_kafka, value_to_kafka
-def test_client(
- kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int]
-):
- (_, port) = kafka_server
+def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str):
kafka_prefix += ".swh.journal.objects"
producer = Producer(
{
- "bootstrap.servers": "localhost:{}".format(port),
+ "bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
rev = revisions().example()
# Fill Kafka
producer.produce(
topic=kafka_prefix + ".revision",
key=key_to_kafka(rev.id),
value=value_to_kafka(rev.to_dict()),
)
producer.flush()
client = JournalClient(
- brokers="localhost:%d" % kafka_server[1],
+ brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=1,
)
worker_fn = MagicMock()
client.process(worker_fn)
worker_fn.assert_called_once_with({"revision": [rev.to_dict()]})
-def test_client_eof(
- kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int]
-):
- (_, port) = kafka_server
+def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str):
kafka_prefix += ".swh.journal.objects"
producer = Producer(
{
- "bootstrap.servers": "localhost:{}".format(port),
+ "bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
rev = revisions().example()
# Fill Kafka
producer.produce(
topic=kafka_prefix + ".revision",
key=key_to_kafka(rev.id),
value=value_to_kafka(rev.to_dict()),
)
producer.flush()
client = JournalClient(
- brokers="localhost:%d" % kafka_server[1],
+ brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=None,
stop_on_eof=True,
)
worker_fn = MagicMock()
client.process(worker_fn)
worker_fn.assert_called_once_with({"revision": [rev.to_dict()]})
@pytest.mark.parametrize("batch_size", [1, 5, 100])
def test_client_batch_size(
- kafka_prefix: str,
- kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
- batch_size: int,
+ kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int,
):
- (_, port) = kafka_server
kafka_prefix += ".swh.journal.objects"
num_objects = 2 * batch_size + 1
assert num_objects < 256, "Too many objects, generation will fail"
producer = Producer(
{
- "bootstrap.servers": "localhost:{}".format(port),
+ "bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
contents = [Content.from_data(bytes([i])) for i in range(num_objects)]
# Fill Kafka
for content in contents:
producer.produce(
topic=kafka_prefix + ".content",
key=key_to_kafka(content.sha1),
value=value_to_kafka(content.to_dict()),
)
producer.flush()
client = JournalClient(
- brokers=["localhost:%d" % kafka_server[1]],
+ brokers=[kafka_server],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=num_objects,
batch_size=batch_size,
)
collected_output: List[Dict] = []
def worker_fn(objects):
received = objects["content"]
assert len(received) <= batch_size
collected_output.extend(received)
client.process(worker_fn)
- assert collected_output == [content.to_dict() for content in contents]
+ expected_output = [content.to_dict() for content in contents]
+ assert len(collected_output) == len(expected_output)
+
+ for output in collected_output:
+ assert output in expected_output
diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py
index 0aa749e..1ff8712 100644
--- a/swh/journal/tests/test_kafka_writer.py
+++ b/swh/journal/tests/test_kafka_writer.py
@@ -1,223 +1,212 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
-from confluent_kafka import Consumer, Producer, KafkaException
-
import pytest
-from subprocess import Popen
-from typing import Tuple
+from confluent_kafka import Consumer, Producer, KafkaException
from swh.storage import get_storage
from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value
from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError
from swh.model.model import Directory, Origin, OriginVisit
from .conftest import TEST_OBJECTS, TEST_OBJECT_DICTS
def consume_messages(consumer, kafka_prefix, expected_messages):
"""Consume expected_messages from the consumer;
Sort them all into a consumed_objects dict"""
consumed_messages = defaultdict(list)
fetched_messages = 0
retries_left = 1000
while fetched_messages < expected_messages:
if retries_left == 0:
raise ValueError("Timed out fetching messages from kafka")
msg = consumer.poll(timeout=0.01)
if not msg:
retries_left -= 1
continue
error = msg.error()
if error is not None:
if error.fatal():
raise KafkaException(error)
retries_left -= 1
continue
fetched_messages += 1
topic = msg.topic()
assert topic.startswith(kafka_prefix + "."), "Unexpected topic"
object_type = topic[len(kafka_prefix + ".") :]
consumed_messages[object_type].append(
(kafka_to_key(msg.key()), kafka_to_value(msg.value()))
)
return consumed_messages
def assert_all_objects_consumed(consumed_messages):
"""Check whether all objects from TEST_OBJECT_DICTS have been consumed"""
for object_type, known_values in TEST_OBJECT_DICTS.items():
known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]]
(received_keys, received_values) = zip(*consumed_messages[object_type])
if object_type == "origin_visit":
for value in received_values:
del value["visit"]
elif object_type == "content":
for value in received_values:
del value["ctime"]
for key in known_keys:
assert key in received_keys
for value in known_values:
assert value in received_values
-def test_kafka_writer(
- kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer
-):
+def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer):
kafka_prefix += ".swh.journal.objects"
writer = KafkaJournalWriter(
- brokers=[f"localhost:{kafka_server[1]}"],
- client_id="kafka_writer",
- prefix=kafka_prefix,
+ brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix,
)
expected_messages = 0
for object_type, objects in TEST_OBJECTS.items():
writer.write_additions(object_type, objects)
expected_messages += len(objects)
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages)
assert_all_objects_consumed(consumed_messages)
-def test_storage_direct_writer(
- kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer
-):
+def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer):
kafka_prefix += ".swh.journal.objects"
writer_config = {
"cls": "kafka",
- "brokers": ["localhost:%d" % kafka_server[1]],
+ "brokers": [kafka_server],
"client_id": "kafka_writer",
"prefix": kafka_prefix,
}
storage_config = {
"cls": "pipeline",
"steps": [{"cls": "memory", "journal_writer": writer_config},],
}
storage = get_storage(**storage_config)
expected_messages = 0
for object_type, objects in TEST_OBJECTS.items():
method = getattr(storage, object_type + "_add")
if object_type in (
"content",
"directory",
"revision",
"release",
"snapshot",
"origin",
):
method(objects)
expected_messages += len(objects)
elif object_type in ("origin_visit",):
for obj in objects:
assert isinstance(obj, OriginVisit)
storage.origin_add_one(Origin(url=obj.origin))
visit = method(obj.origin, date=obj.date, type=obj.type)
expected_messages += 1
obj_d = obj.to_dict()
for k in ("visit", "origin", "date", "type"):
del obj_d[k]
storage.origin_visit_update(obj.origin, visit.visit, **obj_d)
expected_messages += 1
else:
assert False, object_type
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages)
assert_all_objects_consumed(consumed_messages)
def test_write_delivery_failure(
- kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer,
+ kafka_prefix: str, kafka_server: str, consumer: Consumer
):
class MockKafkaError:
"""A mocked kafka error"""
def str(self):
return "Mocked Kafka Error"
def name(self):
return "SWH_MOCK_ERROR"
class KafkaJournalWriterFailDelivery(KafkaJournalWriter):
"""A journal writer which always fails delivering messages"""
def _on_delivery(self, error, message):
"""Replace the inbound error with a fake delivery error"""
super()._on_delivery(MockKafkaError(), message)
kafka_prefix += ".swh.journal.objects"
writer = KafkaJournalWriterFailDelivery(
- brokers=["localhost:%d" % kafka_server[1]],
- client_id="kafka_writer",
- prefix=kafka_prefix,
+ brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix,
)
empty_dir = Directory(entries=[])
with pytest.raises(KafkaDeliveryError) as exc:
writer.write_addition("directory", empty_dir)
assert "Failed deliveries" in exc.value.message
assert len(exc.value.delivery_failures) == 1
delivery_failure = exc.value.delivery_failures[0]
assert delivery_failure.key == empty_dir.id
assert delivery_failure.code == "SWH_MOCK_ERROR"
def test_write_delivery_timeout(
- kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer
+ kafka_prefix: str, kafka_server: str, consumer: Consumer
):
produced = []
class MockProducer(Producer):
"""A kafka producer which pretends to produce messages, but never sends any
delivery acknowledgements"""
def produce(self, **kwargs):
produced.append(kwargs)
kafka_prefix += ".swh.journal.objects"
writer = KafkaJournalWriter(
- brokers=["localhost:%d" % kafka_server[1]],
+ brokers=[kafka_server],
client_id="kafka_writer",
prefix=kafka_prefix,
flush_timeout=1,
producer_class=MockProducer,
)
empty_dir = Directory(entries=[])
with pytest.raises(KafkaDeliveryError) as exc:
writer.write_addition("directory", empty_dir)
assert len(produced) == 1
assert "timeout" in exc.value.message
assert len(exc.value.delivery_failures) == 1
delivery_failure = exc.value.delivery_failures[0]
assert delivery_failure.key == empty_dir.id
assert delivery_failure.code == "SWH_FLUSH_TIMEOUT"
diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py
index 5925049..796236d 100644
--- a/swh/journal/tests/test_replay.py
+++ b/swh/journal/tests/test_replay.py
@@ -1,414 +1,405 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import functools
import logging
import random
-from subprocess import Popen
-from typing import Dict, List, Tuple
+from typing import Dict, List
import dateutil
import pytest
from confluent_kafka import Producer
from hypothesis import strategies, given, settings
from swh.storage import get_storage
from swh.journal.client import JournalClient
from swh.journal.serializers import key_to_kafka, value_to_kafka
from swh.journal.replay import process_replay_objects, is_hash_in_bytearray
from swh.model.hashutil import hash_to_hex
from swh.model.model import Content
from .conftest import TEST_OBJECT_DICTS, DUPLICATE_CONTENTS
from .utils import MockedJournalClient, MockedKafkaWriter
storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]}
def make_topic(kafka_prefix: str, object_type: str) -> str:
return kafka_prefix + "." + object_type
def test_storage_play(
- kafka_prefix: str,
- kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
- caplog,
+ kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog,
):
"""Optimal replayer scenario.
This:
- writes objects to the topic
- replayer consumes objects from the topic and replay them
"""
- (_, port) = kafka_server
kafka_prefix += ".swh.journal.objects"
storage = get_storage(**storage_config)
producer = Producer(
{
- "bootstrap.servers": "localhost:{}".format(port),
+ "bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
now = datetime.datetime.now(tz=datetime.timezone.utc)
# Fill Kafka
nb_sent = 0
nb_visits = 0
for object_type, objects in TEST_OBJECT_DICTS.items():
topic = make_topic(kafka_prefix, object_type)
for object_ in objects:
key = bytes(random.randint(0, 255) for _ in range(40))
object_ = object_.copy()
if object_type == "content":
object_["ctime"] = now
elif object_type == "origin_visit":
nb_visits += 1
object_["visit"] = nb_visits
producer.produce(
topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_),
)
nb_sent += 1
producer.flush()
caplog.set_level(logging.ERROR, "swh.journal.replay")
# Fill the storage from Kafka
replayer = JournalClient(
- brokers="localhost:%d" % kafka_server[1],
+ brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=nb_sent,
)
worker_fn = functools.partial(process_replay_objects, storage=storage)
nb_inserted = 0
while nb_inserted < nb_sent:
nb_inserted += replayer.process(worker_fn)
assert nb_sent == nb_inserted
# Check the objects were actually inserted in the storage
assert TEST_OBJECT_DICTS["revision"] == list(
storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]])
)
assert TEST_OBJECT_DICTS["release"] == list(
storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]])
)
origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]]))
assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins]
for origin in origins:
origin_url = origin["url"]
expected_visits = [
{
**visit,
"origin": origin_url,
"date": dateutil.parser.parse(visit["date"]),
}
for visit in TEST_OBJECT_DICTS["origin_visit"]
if visit["origin"] == origin["url"]
]
actual_visits = list(storage.origin_visit_get(origin_url))
for visit in actual_visits:
del visit["visit"] # opaque identifier
assert expected_visits == actual_visits
input_contents = TEST_OBJECT_DICTS["content"]
contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents])
assert len(contents) == len(input_contents)
assert contents == {cont["sha1"]: [cont] for cont in input_contents}
collision = 0
for record in caplog.records:
logtext = record.getMessage()
if "Colliding contents:" in logtext:
collision += 1
assert collision == 0, "No collision should be detected"
def test_storage_play_with_collision(
- kafka_prefix: str,
- kafka_consumer_group: str,
- kafka_server: Tuple[Popen, int],
- caplog,
+ kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog,
):
"""Another replayer scenario with collisions.
This:
- writes objects to the topic, including colliding contents
- replayer consumes objects from the topic and replay them
- This drops the colliding contents from the replay when detected
"""
- (_, port) = kafka_server
kafka_prefix += ".swh.journal.objects"
storage = get_storage(**storage_config)
producer = Producer(
{
- "bootstrap.servers": "localhost:{}".format(port),
+ "bootstrap.servers": kafka_server,
"client.id": "test producer",
"enable.idempotence": "true",
}
)
now = datetime.datetime.now(tz=datetime.timezone.utc)
# Fill Kafka
nb_sent = 0
nb_visits = 0
for object_type, objects in TEST_OBJECT_DICTS.items():
topic = make_topic(kafka_prefix, object_type)
for object_ in objects:
key = bytes(random.randint(0, 255) for _ in range(40))
object_ = object_.copy()
if object_type == "content":
object_["ctime"] = now
elif object_type == "origin_visit":
nb_visits += 1
object_["visit"] = nb_visits
producer.produce(
topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_),
)
nb_sent += 1
# Create collision in input data
# They are not written in the destination
for content in DUPLICATE_CONTENTS:
topic = make_topic(kafka_prefix, "content")
producer.produce(
topic=topic, key=key_to_kafka(key), value=value_to_kafka(content),
)
nb_sent += 1
producer.flush()
caplog.set_level(logging.ERROR, "swh.journal.replay")
# Fill the storage from Kafka
replayer = JournalClient(
- brokers="localhost:%d" % kafka_server[1],
+ brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=nb_sent,
)
worker_fn = functools.partial(process_replay_objects, storage=storage)
nb_inserted = 0
while nb_inserted < nb_sent:
nb_inserted += replayer.process(worker_fn)
assert nb_sent == nb_inserted
# Check the objects were actually inserted in the storage
assert TEST_OBJECT_DICTS["revision"] == list(
storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]])
)
assert TEST_OBJECT_DICTS["release"] == list(
storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]])
)
origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]]))
assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins]
for origin in origins:
origin_url = origin["url"]
expected_visits = [
{
**visit,
"origin": origin_url,
"date": dateutil.parser.parse(visit["date"]),
}
for visit in TEST_OBJECT_DICTS["origin_visit"]
if visit["origin"] == origin["url"]
]
actual_visits = list(storage.origin_visit_get(origin_url))
for visit in actual_visits:
del visit["visit"] # opaque identifier
assert expected_visits == actual_visits
input_contents = TEST_OBJECT_DICTS["content"]
contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents])
assert len(contents) == len(input_contents)
assert contents == {cont["sha1"]: [cont] for cont in input_contents}
nb_collisions = 0
actual_collision: Dict
for record in caplog.records:
logtext = record.getMessage()
if "Collision detected:" in logtext:
nb_collisions += 1
actual_collision = record.args["collision"]
assert nb_collisions == 1, "1 collision should be detected"
algo = "sha1"
assert actual_collision["algo"] == algo
expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo])
assert actual_collision["hash"] == expected_colliding_hash
actual_colliding_hashes = actual_collision["objects"]
assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS)
for content in DUPLICATE_CONTENTS:
expected_content_hashes = {
k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items()
}
assert expected_content_hashes in actual_colliding_hashes
def _test_write_replay_origin_visit(visits: List[Dict]):
"""Helper function to write tests for origin_visit.
Each visit (a dict) given in the 'visits' argument will be sent to
a (mocked) kafka queue, which a in-memory-storage backed replayer is
listening to.
Check that corresponding origin visits entities are present in the storage
and have correct values if they are not skipped.
"""
queue: List = []
replayer = MockedJournalClient(queue)
writer = MockedKafkaWriter(queue)
# Note that flipping the order of these two insertions will crash
# the test, because the legacy origin_format does not allow to create
# the origin when needed (type is missing)
writer.send(
"origin",
"foo",
{
"url": "http://example.com/",
"type": "git", # test the legacy origin format is accepted
},
)
for visit in visits:
writer.send("origin_visit", "foo", visit)
queue_size = len(queue)
assert replayer.stop_after_objects is None
replayer.stop_after_objects = queue_size
storage = get_storage(**storage_config)
worker_fn = functools.partial(process_replay_objects, storage=storage)
replayer.process(worker_fn)
actual_visits = list(storage.origin_visit_get("http://example.com/"))
assert len(actual_visits) == len(visits), actual_visits
for vin, vout in zip(visits, actual_visits):
vin = vin.copy()
vout = vout.copy()
assert vout.pop("origin") == "http://example.com/"
vin.pop("origin")
vin.setdefault("type", "git")
vin.setdefault("metadata", None)
assert vin == vout
def test_write_replay_origin_visit():
"""Test origin_visit when the 'origin' is just a string."""
now = datetime.datetime.now()
visits = [
{
"visit": 1,
"origin": "http://example.com/",
"date": now,
"type": "git",
"status": "partial",
"snapshot": None,
}
]
_test_write_replay_origin_visit(visits)
def test_write_replay_legacy_origin_visit1():
"""Origin_visit with no types should make the replayer crash
We expect the journal's origin_visit topic to no longer reference such
visits. If it does, the replayer must crash so we can fix the journal's
topic.
"""
now = datetime.datetime.now()
visit = {
"visit": 1,
"origin": "http://example.com/",
"date": now,
"status": "partial",
"snapshot": None,
}
now2 = datetime.datetime.now()
visit2 = {
"visit": 2,
"origin": {"url": "http://example.com/"},
"date": now2,
"status": "partial",
"snapshot": None,
}
for origin_visit in [visit, visit2]:
with pytest.raises(ValueError, match="Old origin visit format"):
_test_write_replay_origin_visit([origin_visit])
def test_write_replay_legacy_origin_visit2():
"""Test origin_visit when 'type' is missing from the visit, but not
from the origin."""
now = datetime.datetime.now()
visits = [
{
"visit": 1,
"origin": {"url": "http://example.com/", "type": "git",},
"date": now,
"type": "git",
"status": "partial",
"snapshot": None,
}
]
_test_write_replay_origin_visit(visits)
def test_write_replay_legacy_origin_visit3():
"""Test origin_visit when the origin is a dict"""
now = datetime.datetime.now()
visits = [
{
"visit": 1,
"origin": {"url": "http://example.com/",},
"date": now,
"type": "git",
"status": "partial",
"snapshot": None,
}
]
_test_write_replay_origin_visit(visits)
hash_strategy = strategies.binary(min_size=20, max_size=20)
@settings(max_examples=500)
@given(
strategies.sets(hash_strategy, min_size=0, max_size=500),
strategies.sets(hash_strategy, min_size=10),
)
def test_is_hash_in_bytearray(haystack, needles):
array = b"".join(sorted(haystack))
needles |= haystack # Exhaustively test for all objects in the array
for needle in needles:
assert is_hash_in_bytearray(needle, array, len(haystack)) == (
needle in haystack
)
diff --git a/tox.ini b/tox.ini
index e8aa391..9020b04 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,40 +1,37 @@
[tox]
envlist=black,flake8,mypy,py3
[testenv]
-passenv=SWH_KAFKA_ROOT
extras =
testing
deps =
pytest-cov
dev: pdbpp
-setenv =
- SWH_KAFKA_ROOT = {env:SWH_KAFKA_ROOT:swh/journal/tests/kafka}
commands =
pytest --cov={envsitepackagesdir}/swh/journal \
{envsitepackagesdir}/swh/journal \
--cov-branch \
--doctest-modules {posargs}
[testenv:black]
skip_install = true
deps =
black
commands =
{envpython} -m black --check swh
[testenv:flake8]
skip_install = true
deps =
git+https://github.com/PyCQA/pyflakes.git
flake8
commands =
{envpython} -m flake8
[testenv:mypy]
extras =
testing
deps =
mypy
commands =
mypy swh
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 12:22 PM (2 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3313746
Attached To
rDJNL Journal infrastructure
Event Timeline
Log In to Comment