diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -51,6 +51,8 @@ swh-journal=swh.journal.cli:main [swh.cli.subcommands] journal=swh.journal.cli:cli + [pytest11] + pytest_swh_journal = swh.journal.pytest_plugin """, install_requires=parse_requirements() + parse_requirements("swh"), setup_requires=["vcversioner"], diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py new file mode 100644 --- /dev/null +++ b/swh/journal/pytest_plugin.py @@ -0,0 +1,153 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import random +import string + +from typing import Dict, Iterator +from collections import defaultdict + +import pytest + +from confluent_kafka import Consumer, Producer, KafkaException + +from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value +from swh.journal.tests.journal_data import TEST_OBJECTS, TEST_OBJECT_DICTS + + +def consume_messages(consumer, kafka_prefix, expected_messages): + """Consume expected_messages from the consumer; + Sort them all into a consumed_objects dict""" + consumed_messages = defaultdict(list) + + fetched_messages = 0 + retries_left = 1000 + + while fetched_messages < expected_messages: + if retries_left == 0: + raise ValueError("Timed out fetching messages from kafka") + + msg = consumer.poll(timeout=0.01) + + if not msg: + retries_left -= 1 + continue + + error = msg.error() + if error is not None: + if error.fatal(): + raise KafkaException(error) + retries_left -= 1 + continue + + fetched_messages += 1 + topic = msg.topic() + assert topic.startswith(kafka_prefix + "."), "Unexpected topic" + object_type = topic[len(kafka_prefix + ".") :] + + consumed_messages[object_type].append( + (kafka_to_key(msg.key()), kafka_to_value(msg.value())) + ) + + return consumed_messages + + +def assert_all_objects_consumed(consumed_messages): + """Check whether all objects from TEST_OBJECT_DICTS have been consumed""" + for object_type, known_values in TEST_OBJECT_DICTS.items(): + known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]] + + (received_keys, received_values) = zip(*consumed_messages[object_type]) + + if object_type == "origin_visit": + for value in received_values: + del value["visit"] + elif object_type == "content": + for value in received_values: + del value["ctime"] + + for key in known_keys: + assert key in received_keys + + for value in known_values: + assert value in received_values + + +@pytest.fixture(scope="function") +def kafka_prefix(): + """Pick a random prefix for kafka topics on each call""" + return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) + + +@pytest.fixture(scope="function") +def kafka_consumer_group(kafka_prefix: str): + """Pick a random consumer group for kafka consumers on each call""" + return "test-consumer-%s" % kafka_prefix + + +@pytest.fixture(scope="session") +def kafka_server() -> Iterator[str]: + p = Producer({"test.mock.num.brokers": "1"}) + + metadata = p.list_topics() + brokers = [str(broker) for broker in metadata.brokers.values()] + assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" + + broker_connstr, broker_id = brokers[0].split("/") + ip, port_str = broker_connstr.split(":") + assert ip == "127.0.0.1" + assert int(port_str) + + yield broker_connstr + + p.flush() + + +TEST_CONFIG = { + "consumer_id": "swh.journal.consumer", + "object_types": TEST_OBJECT_DICTS.keys(), + "stop_after_objects": 1, # will read 1 object and stop + "storage": {"cls": "memory", "args": {}}, +} + + +@pytest.fixture +def test_config(kafka_server: str, kafka_prefix: str): + """Test configuration needed for producer/consumer + + """ + return { + **TEST_CONFIG, + "brokers": [kafka_server], + "prefix": kafka_prefix + ".swh.journal.objects", + } + + +@pytest.fixture +def consumer( + kafka_server: str, test_config: Dict, kafka_consumer_group: str, +) -> Consumer: + """Get a connected Kafka consumer. + + """ + consumer = Consumer( + { + "bootstrap.servers": kafka_server, + "auto.offset.reset": "earliest", + "enable.auto.commit": True, + "group.id": kafka_consumer_group, + } + ) + + kafka_topics = [ + "%s.%s" % (test_config["prefix"], object_type) + for object_type in test_config["object_types"] + ] + + consumer.subscribe(kafka_topics) + + yield consumer + + consumer.close() diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,243 +1,19 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime -import pytest import logging -import random -import string - -from confluent_kafka import Consumer, Producer from hypothesis.strategies import one_of -from typing import Any, Dict, Iterator, List from swh.model import hypothesis_strategies as strategies -from swh.model.hashutil import MultiHash, hash_to_bytes - -from swh.journal.serializers import ModelObject -from swh.journal.writer.kafka import OBJECT_TYPES +# for bw compat +from swh.journal.tests.journal_data import * # noqa logger = logging.getLogger(__name__) -CONTENTS = [ - {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",}, -] - -duplicate_content1 = { - "length": 4, - "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), - "sha1_git": b"another-foo", - "blake2s256": b"another-bar", - "sha256": b"another-baz", - "status": "visible", -} - -# Craft a sha1 collision -duplicate_content2 = duplicate_content1.copy() -sha1_array = bytearray(duplicate_content1["sha1_git"]) -sha1_array[0] += 1 -duplicate_content2["sha1_git"] = bytes(sha1_array) - - -DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] - - -COMMITTERS = [ - {"fullname": b"foo", "name": b"foo", "email": b"",}, - {"fullname": b"bar", "name": b"bar", "email": b"",}, -] - -DATES = [ - { - "timestamp": {"seconds": 1234567891, "microseconds": 0,}, - "offset": 120, - "negative_utc": False, - }, - { - "timestamp": {"seconds": 1234567892, "microseconds": 0,}, - "offset": 120, - "negative_utc": False, - }, -] - -REVISIONS = [ - { - "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), - "message": b"hello", - "date": DATES[0], - "committer": COMMITTERS[0], - "author": COMMITTERS[0], - "committer_date": DATES[0], - "type": "git", - "directory": b"\x01" * 20, - "synthetic": False, - "metadata": None, - "parents": [], - }, - { - "id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"), - "message": b"hello again", - "date": DATES[1], - "committer": COMMITTERS[1], - "author": COMMITTERS[1], - "committer_date": DATES[1], - "type": "hg", - "directory": b"\x02" * 20, - "synthetic": False, - "metadata": None, - "parents": [], - }, -] - -RELEASES = [ - { - "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), - "name": b"v0.0.1", - "date": { - "timestamp": {"seconds": 1234567890, "microseconds": 0,}, - "offset": 120, - "negative_utc": False, - }, - "author": COMMITTERS[0], - "target_type": "revision", - "target": b"\x04" * 20, - "message": b"foo", - "synthetic": False, - }, -] - -ORIGINS = [ - {"url": "https://somewhere.org/den/fox",}, - {"url": "https://overtherainbow.org/fox/den",}, -] - -ORIGIN_VISITS = [ - { - "origin": ORIGINS[0]["url"], - "date": "2013-05-07 04:20:39.369271+00:00", - "snapshot": None, # TODO - "status": "ongoing", # TODO - "metadata": {"foo": "bar"}, - "type": "git", - }, - { - "origin": ORIGINS[0]["url"], - "date": "2018-11-27 17:20:39+00:00", - "snapshot": None, # TODO - "status": "ongoing", # TODO - "metadata": {"baz": "qux"}, - "type": "git", - }, -] - -TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = { - "content": CONTENTS, - "revision": REVISIONS, - "release": RELEASES, - "origin": ORIGINS, - "origin_visit": ORIGIN_VISITS, -} - -MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} - -TEST_OBJECTS: Dict[str, List[ModelObject]] = {} - -for object_type, objects in TEST_OBJECT_DICTS.items(): - converted_objects: List[ModelObject] = [] - model = MODEL_OBJECTS[object_type] - - for (num, obj_d) in enumerate(objects): - if object_type == "origin_visit": - obj_d = {**obj_d, "visit": num} - elif object_type == "content": - obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now()} - - converted_objects.append(model.from_dict(obj_d)) - - TEST_OBJECTS[object_type] = converted_objects - - -@pytest.fixture(scope="function") -def kafka_prefix(): - """Pick a random prefix for kafka topics on each call""" - return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) - - -@pytest.fixture(scope="function") -def kafka_consumer_group(kafka_prefix: str): - """Pick a random consumer group for kafka consumers on each call""" - return "test-consumer-%s" % kafka_prefix - - -@pytest.fixture(scope="session") -def kafka_server() -> Iterator[str]: - p = Producer({"test.mock.num.brokers": "1"}) - - metadata = p.list_topics() - brokers = [str(broker) for broker in metadata.brokers.values()] - assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" - - broker_connstr, broker_id = brokers[0].split("/") - ip, port_str = broker_connstr.split(":") - assert ip == "127.0.0.1" - assert int(port_str) - - yield broker_connstr - - p.flush() - - -TEST_CONFIG = { - "consumer_id": "swh.journal.consumer", - "object_types": TEST_OBJECT_DICTS.keys(), - "stop_after_objects": 1, # will read 1 object and stop - "storage": {"cls": "memory", "args": {}}, -} - - -@pytest.fixture -def test_config(kafka_server: str, kafka_prefix: str): - """Test configuration needed for producer/consumer - - """ - return { - **TEST_CONFIG, - "brokers": [kafka_server], - "prefix": kafka_prefix + ".swh.journal.objects", - } - - -@pytest.fixture -def consumer( - kafka_server: str, test_config: Dict, kafka_consumer_group: str, -) -> Consumer: - """Get a connected Kafka consumer. - - """ - consumer = Consumer( - { - "bootstrap.servers": kafka_server, - "auto.offset.reset": "earliest", - "enable.auto.commit": True, - "group.id": kafka_consumer_group, - } - ) - - kafka_topics = [ - "%s.%s" % (test_config["prefix"], object_type) - for object_type in test_config["object_types"] - ] - - consumer.subscribe(kafka_topics) - - yield consumer - - consumer.close() - def objects_d(): return one_of( diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/journal_data.py copy from swh/journal/tests/conftest.py copy to swh/journal/tests/journal_data.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/journal_data.py @@ -1,28 +1,17 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime -import pytest -import logging -import random -import string -from confluent_kafka import Consumer, Producer +from typing import Any, Dict, List -from hypothesis.strategies import one_of -from typing import Any, Dict, Iterator, List - -from swh.model import hypothesis_strategies as strategies from swh.model.hashutil import MultiHash, hash_to_bytes - from swh.journal.serializers import ModelObject from swh.journal.writer.kafka import OBJECT_TYPES -logger = logging.getLogger(__name__) - CONTENTS = [ {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",}, ] @@ -159,94 +148,3 @@ converted_objects.append(model.from_dict(obj_d)) TEST_OBJECTS[object_type] = converted_objects - - -@pytest.fixture(scope="function") -def kafka_prefix(): - """Pick a random prefix for kafka topics on each call""" - return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) - - -@pytest.fixture(scope="function") -def kafka_consumer_group(kafka_prefix: str): - """Pick a random consumer group for kafka consumers on each call""" - return "test-consumer-%s" % kafka_prefix - - -@pytest.fixture(scope="session") -def kafka_server() -> Iterator[str]: - p = Producer({"test.mock.num.brokers": "1"}) - - metadata = p.list_topics() - brokers = [str(broker) for broker in metadata.brokers.values()] - assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" - - broker_connstr, broker_id = brokers[0].split("/") - ip, port_str = broker_connstr.split(":") - assert ip == "127.0.0.1" - assert int(port_str) - - yield broker_connstr - - p.flush() - - -TEST_CONFIG = { - "consumer_id": "swh.journal.consumer", - "object_types": TEST_OBJECT_DICTS.keys(), - "stop_after_objects": 1, # will read 1 object and stop - "storage": {"cls": "memory", "args": {}}, -} - - -@pytest.fixture -def test_config(kafka_server: str, kafka_prefix: str): - """Test configuration needed for producer/consumer - - """ - return { - **TEST_CONFIG, - "brokers": [kafka_server], - "prefix": kafka_prefix + ".swh.journal.objects", - } - - -@pytest.fixture -def consumer( - kafka_server: str, test_config: Dict, kafka_consumer_group: str, -) -> Consumer: - """Get a connected Kafka consumer. - - """ - consumer = Consumer( - { - "bootstrap.servers": kafka_server, - "auto.offset.reset": "earliest", - "enable.auto.commit": True, - "group.id": kafka_consumer_group, - } - ) - - kafka_topics = [ - "%s.%s" % (test_config["prefix"], object_type) - for object_type in test_config["object_types"] - ] - - consumer.subscribe(kafka_topics) - - yield consumer - - consumer.close() - - -def objects_d(): - return one_of( - strategies.origins().map(lambda x: ("origin", x.to_dict())), - strategies.origin_visits().map(lambda x: ("origin_visit", x.to_dict())), - strategies.snapshots().map(lambda x: ("snapshot", x.to_dict())), - strategies.releases().map(lambda x: ("release", x.to_dict())), - strategies.revisions().map(lambda x: ("revision", x.to_dict())), - strategies.directories().map(lambda x: ("directory", x.to_dict())), - strategies.skipped_contents().map(lambda x: ("skipped_content", x.to_dict())), - strategies.present_contents().map(lambda x: ("content", x.to_dict())), - ) diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -3,77 +3,16 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections import defaultdict - import pytest -from confluent_kafka import Consumer, Producer, KafkaException +from confluent_kafka import Consumer, Producer from swh.storage import get_storage -from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value -from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError - from swh.model.model import Directory, Origin, OriginVisit -from .conftest import TEST_OBJECTS, TEST_OBJECT_DICTS - - -def consume_messages(consumer, kafka_prefix, expected_messages): - """Consume expected_messages from the consumer; - Sort them all into a consumed_objects dict""" - consumed_messages = defaultdict(list) - - fetched_messages = 0 - retries_left = 1000 - - while fetched_messages < expected_messages: - if retries_left == 0: - raise ValueError("Timed out fetching messages from kafka") - - msg = consumer.poll(timeout=0.01) - - if not msg: - retries_left -= 1 - continue - - error = msg.error() - if error is not None: - if error.fatal(): - raise KafkaException(error) - retries_left -= 1 - continue - - fetched_messages += 1 - topic = msg.topic() - assert topic.startswith(kafka_prefix + "."), "Unexpected topic" - object_type = topic[len(kafka_prefix + ".") :] - - consumed_messages[object_type].append( - (kafka_to_key(msg.key()), kafka_to_value(msg.value())) - ) - - return consumed_messages - - -def assert_all_objects_consumed(consumed_messages): - """Check whether all objects from TEST_OBJECT_DICTS have been consumed""" - for object_type, known_values in TEST_OBJECT_DICTS.items(): - known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]] - - (received_keys, received_values) = zip(*consumed_messages[object_type]) - - if object_type == "origin_visit": - for value in received_values: - del value["visit"] - elif object_type == "content": - for value in received_values: - del value["ctime"] - - for key in known_keys: - assert key in received_keys - - for value in known_values: - assert value in received_values +from swh.journal.tests.journal_data import TEST_OBJECTS +from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed +from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer):