Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/journal_data.py
- This file was copied from swh/journal/tests/conftest.py.
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import pytest | |||||
import logging | |||||
import random | |||||
import string | |||||
from confluent_kafka import Consumer, Producer | from typing import Any, Dict, List | ||||
from hypothesis.strategies import one_of | |||||
from typing import Any, Dict, Iterator, List | |||||
from swh.model import hypothesis_strategies as strategies | |||||
from swh.model.hashutil import MultiHash, hash_to_bytes | from swh.model.hashutil import MultiHash, hash_to_bytes | ||||
from swh.journal.serializers import ModelObject | from swh.journal.serializers import ModelObject | ||||
from swh.journal.writer.kafka import OBJECT_TYPES | from swh.journal.writer.kafka import OBJECT_TYPES | ||||
logger = logging.getLogger(__name__) | |||||
CONTENTS = [ | CONTENTS = [ | ||||
{**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",}, | {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",}, | ||||
] | ] | ||||
duplicate_content1 = { | duplicate_content1 = { | ||||
"length": 4, | "length": 4, | ||||
"sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), | "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), | ||||
"sha1_git": b"another-foo", | "sha1_git": b"another-foo", | ||||
▲ Show 20 Lines • Show All 120 Lines • ▼ Show 20 Lines | for (num, obj_d) in enumerate(objects): | ||||
if object_type == "origin_visit": | if object_type == "origin_visit": | ||||
obj_d = {**obj_d, "visit": num} | obj_d = {**obj_d, "visit": num} | ||||
elif object_type == "content": | elif object_type == "content": | ||||
obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now()} | obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now()} | ||||
converted_objects.append(model.from_dict(obj_d)) | converted_objects.append(model.from_dict(obj_d)) | ||||
TEST_OBJECTS[object_type] = converted_objects | TEST_OBJECTS[object_type] = converted_objects | ||||
@pytest.fixture(scope="function") | |||||
def kafka_prefix(): | |||||
"""Pick a random prefix for kafka topics on each call""" | |||||
return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) | |||||
@pytest.fixture(scope="function") | |||||
def kafka_consumer_group(kafka_prefix: str): | |||||
"""Pick a random consumer group for kafka consumers on each call""" | |||||
return "test-consumer-%s" % kafka_prefix | |||||
@pytest.fixture(scope="session") | |||||
def kafka_server() -> Iterator[str]: | |||||
p = Producer({"test.mock.num.brokers": "1"}) | |||||
metadata = p.list_topics() | |||||
brokers = [str(broker) for broker in metadata.brokers.values()] | |||||
assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" | |||||
broker_connstr, broker_id = brokers[0].split("/") | |||||
ip, port_str = broker_connstr.split(":") | |||||
assert ip == "127.0.0.1" | |||||
assert int(port_str) | |||||
yield broker_connstr | |||||
p.flush() | |||||
TEST_CONFIG = { | |||||
"consumer_id": "swh.journal.consumer", | |||||
"object_types": TEST_OBJECT_DICTS.keys(), | |||||
"stop_after_objects": 1, # will read 1 object and stop | |||||
"storage": {"cls": "memory", "args": {}}, | |||||
} | |||||
@pytest.fixture | |||||
def test_config(kafka_server: str, kafka_prefix: str): | |||||
"""Test configuration needed for producer/consumer | |||||
""" | |||||
return { | |||||
**TEST_CONFIG, | |||||
"brokers": [kafka_server], | |||||
"prefix": kafka_prefix + ".swh.journal.objects", | |||||
} | |||||
@pytest.fixture | |||||
def consumer( | |||||
kafka_server: str, test_config: Dict, kafka_consumer_group: str, | |||||
) -> Consumer: | |||||
"""Get a connected Kafka consumer. | |||||
""" | |||||
consumer = Consumer( | |||||
{ | |||||
"bootstrap.servers": kafka_server, | |||||
"auto.offset.reset": "earliest", | |||||
"enable.auto.commit": True, | |||||
"group.id": kafka_consumer_group, | |||||
} | |||||
) | |||||
kafka_topics = [ | |||||
"%s.%s" % (test_config["prefix"], object_type) | |||||
for object_type in test_config["object_types"] | |||||
] | |||||
consumer.subscribe(kafka_topics) | |||||
yield consumer | |||||
consumer.close() | |||||
def objects_d(): | |||||
return one_of( | |||||
strategies.origins().map(lambda x: ("origin", x.to_dict())), | |||||
strategies.origin_visits().map(lambda x: ("origin_visit", x.to_dict())), | |||||
strategies.snapshots().map(lambda x: ("snapshot", x.to_dict())), | |||||
strategies.releases().map(lambda x: ("release", x.to_dict())), | |||||
strategies.revisions().map(lambda x: ("revision", x.to_dict())), | |||||
strategies.directories().map(lambda x: ("directory", x.to_dict())), | |||||
strategies.skipped_contents().map(lambda x: ("skipped_content", x.to_dict())), | |||||
strategies.present_contents().map(lambda x: ("content", x.to_dict())), | |||||
) |