diff --git a/setup.py b/setup.py index 86921a0..c4dab38 100755 --- a/setup.py +++ b/setup.py @@ -1,72 +1,74 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from setuptools import setup, find_packages from os import path from io import open here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = "requirements-%s.txt" % name else: reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith("#"): continue requirements.append(line) return requirements setup( name="swh.journal", description="Software Heritage Journal utilities", long_description=long_description, long_description_content_type="text/markdown", python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/diffusion/DJNL/", packages=find_packages(), scripts=[], entry_points=""" [console_scripts] swh-journal=swh.journal.cli:main [swh.cli.subcommands] journal=swh.journal.cli:cli + [pytest11] + pytest_swh_journal = swh.journal.pytest_plugin """, install_requires=parse_requirements() + parse_requirements("swh"), setup_requires=["vcversioner"], extras_require={"testing": parse_requirements("test")}, vcversioner={}, include_package_data=True, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ "Bug Reports": "https://forge.softwareheritage.org/maniphest", "Funding": "https://www.softwareheritage.org/donate", "Source": "https://forge.softwareheritage.org/source/swh-journal", }, ) diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py new file mode 100644 index 0000000..deea93d --- /dev/null +++ b/swh/journal/pytest_plugin.py @@ -0,0 +1,153 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import random +import string + +from typing import Dict, Iterator +from collections import defaultdict + +import pytest + +from confluent_kafka import Consumer, Producer, KafkaException + +from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value +from swh.journal.tests.journal_data import TEST_OBJECTS, TEST_OBJECT_DICTS + + +def consume_messages(consumer, kafka_prefix, expected_messages): + """Consume expected_messages from the consumer; + Sort them all into a consumed_objects dict""" + consumed_messages = defaultdict(list) + + fetched_messages = 0 + retries_left = 1000 + + while fetched_messages < expected_messages: + if retries_left == 0: + raise ValueError("Timed out fetching messages from kafka") + + msg = consumer.poll(timeout=0.01) + + if not msg: + retries_left -= 1 + continue + + error = msg.error() + if error is not None: + if error.fatal(): + raise KafkaException(error) + retries_left -= 1 + continue + + fetched_messages += 1 + topic = msg.topic() + assert topic.startswith(kafka_prefix + "."), "Unexpected topic" + object_type = topic[len(kafka_prefix + ".") :] + + consumed_messages[object_type].append( + (kafka_to_key(msg.key()), kafka_to_value(msg.value())) + ) + + return consumed_messages + + +def assert_all_objects_consumed(consumed_messages): + """Check whether all objects from TEST_OBJECT_DICTS have been consumed""" + for object_type, known_values in TEST_OBJECT_DICTS.items(): + known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]] + + (received_keys, received_values) = zip(*consumed_messages[object_type]) + + if object_type == "origin_visit": + for value in received_values: + del value["visit"] + elif object_type == "content": + for value in received_values: + del value["ctime"] + + for key in known_keys: + assert key in received_keys + + for value in known_values: + assert value in received_values + + +@pytest.fixture(scope="function") +def kafka_prefix(): + """Pick a random prefix for kafka topics on each call""" + return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) + + +@pytest.fixture(scope="function") +def kafka_consumer_group(kafka_prefix: str): + """Pick a random consumer group for kafka consumers on each call""" + return "test-consumer-%s" % kafka_prefix + + +@pytest.fixture(scope="session") +def kafka_server() -> Iterator[str]: + p = Producer({"test.mock.num.brokers": "1"}) + + metadata = p.list_topics() + brokers = [str(broker) for broker in metadata.brokers.values()] + assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" + + broker_connstr, broker_id = brokers[0].split("/") + ip, port_str = broker_connstr.split(":") + assert ip == "127.0.0.1" + assert int(port_str) + + yield broker_connstr + + p.flush() + + +TEST_CONFIG = { + "consumer_id": "swh.journal.consumer", + "object_types": TEST_OBJECT_DICTS.keys(), + "stop_after_objects": 1, # will read 1 object and stop + "storage": {"cls": "memory", "args": {}}, +} + + +@pytest.fixture +def test_config(kafka_server: str, kafka_prefix: str): + """Test configuration needed for producer/consumer + + """ + return { + **TEST_CONFIG, + "brokers": [kafka_server], + "prefix": kafka_prefix + ".swh.journal.objects", + } + + +@pytest.fixture +def consumer( + kafka_server: str, test_config: Dict, kafka_consumer_group: str, +) -> Consumer: + """Get a connected Kafka consumer. + + """ + consumer = Consumer( + { + "bootstrap.servers": kafka_server, + "auto.offset.reset": "earliest", + "enable.auto.commit": True, + "group.id": kafka_consumer_group, + } + ) + + kafka_topics = [ + "%s.%s" % (test_config["prefix"], object_type) + for object_type in test_config["object_types"] + ] + + consumer.subscribe(kafka_topics) + + yield consumer + + consumer.close() diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index 3b89a7b..ec2a3cf 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,252 +1,28 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime -import pytest import logging -import random -import string - -from confluent_kafka import Consumer, Producer from hypothesis.strategies import one_of -from typing import Any, Dict, Iterator, List from swh.model import hypothesis_strategies as strategies -from swh.model.hashutil import MultiHash, hash_to_bytes - -from swh.journal.serializers import ModelObject -from swh.journal.writer.kafka import OBJECT_TYPES +# for bw compat +from swh.journal.tests.journal_data import * # noqa logger = logging.getLogger(__name__) -CONTENTS = [ - {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",}, -] - -duplicate_content1 = { - "length": 4, - "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), - "sha1_git": b"another-foo", - "blake2s256": b"another-bar", - "sha256": b"another-baz", - "status": "visible", -} - -# Craft a sha1 collision -duplicate_content2 = duplicate_content1.copy() -sha1_array = bytearray(duplicate_content1["sha1_git"]) -sha1_array[0] += 1 -duplicate_content2["sha1_git"] = bytes(sha1_array) - - -DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] - - -COMMITTERS = [ - {"fullname": b"foo", "name": b"foo", "email": b"",}, - {"fullname": b"bar", "name": b"bar", "email": b"",}, -] - -DATES = [ - { - "timestamp": {"seconds": 1234567891, "microseconds": 0,}, - "offset": 120, - "negative_utc": False, - }, - { - "timestamp": {"seconds": 1234567892, "microseconds": 0,}, - "offset": 120, - "negative_utc": False, - }, -] - -REVISIONS = [ - { - "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), - "message": b"hello", - "date": DATES[0], - "committer": COMMITTERS[0], - "author": COMMITTERS[0], - "committer_date": DATES[0], - "type": "git", - "directory": b"\x01" * 20, - "synthetic": False, - "metadata": None, - "parents": [], - }, - { - "id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"), - "message": b"hello again", - "date": DATES[1], - "committer": COMMITTERS[1], - "author": COMMITTERS[1], - "committer_date": DATES[1], - "type": "hg", - "directory": b"\x02" * 20, - "synthetic": False, - "metadata": None, - "parents": [], - }, -] - -RELEASES = [ - { - "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), - "name": b"v0.0.1", - "date": { - "timestamp": {"seconds": 1234567890, "microseconds": 0,}, - "offset": 120, - "negative_utc": False, - }, - "author": COMMITTERS[0], - "target_type": "revision", - "target": b"\x04" * 20, - "message": b"foo", - "synthetic": False, - }, -] - -ORIGINS = [ - {"url": "https://somewhere.org/den/fox",}, - {"url": "https://overtherainbow.org/fox/den",}, -] - -ORIGIN_VISITS = [ - { - "origin": ORIGINS[0]["url"], - "date": "2013-05-07 04:20:39.369271+00:00", - "snapshot": None, # TODO - "status": "ongoing", # TODO - "metadata": {"foo": "bar"}, - "type": "git", - }, - { - "origin": ORIGINS[0]["url"], - "date": "2018-11-27 17:20:39+00:00", - "snapshot": None, # TODO - "status": "ongoing", # TODO - "metadata": {"baz": "qux"}, - "type": "git", - }, -] - -TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = { - "content": CONTENTS, - "revision": REVISIONS, - "release": RELEASES, - "origin": ORIGINS, - "origin_visit": ORIGIN_VISITS, -} - -MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} - -TEST_OBJECTS: Dict[str, List[ModelObject]] = {} - -for object_type, objects in TEST_OBJECT_DICTS.items(): - converted_objects: List[ModelObject] = [] - model = MODEL_OBJECTS[object_type] - - for (num, obj_d) in enumerate(objects): - if object_type == "origin_visit": - obj_d = {**obj_d, "visit": num} - elif object_type == "content": - obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now()} - - converted_objects.append(model.from_dict(obj_d)) - - TEST_OBJECTS[object_type] = converted_objects - - -@pytest.fixture(scope="function") -def kafka_prefix(): - """Pick a random prefix for kafka topics on each call""" - return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) - - -@pytest.fixture(scope="function") -def kafka_consumer_group(kafka_prefix: str): - """Pick a random consumer group for kafka consumers on each call""" - return "test-consumer-%s" % kafka_prefix - - -@pytest.fixture(scope="session") -def kafka_server() -> Iterator[str]: - p = Producer({"test.mock.num.brokers": "1"}) - - metadata = p.list_topics() - brokers = [str(broker) for broker in metadata.brokers.values()] - assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" - - broker_connstr, broker_id = brokers[0].split("/") - ip, port_str = broker_connstr.split(":") - assert ip == "127.0.0.1" - assert int(port_str) - - yield broker_connstr - - p.flush() - - -TEST_CONFIG = { - "consumer_id": "swh.journal.consumer", - "object_types": TEST_OBJECT_DICTS.keys(), - "stop_after_objects": 1, # will read 1 object and stop - "storage": {"cls": "memory", "args": {}}, -} - - -@pytest.fixture -def test_config(kafka_server: str, kafka_prefix: str): - """Test configuration needed for producer/consumer - - """ - return { - **TEST_CONFIG, - "brokers": [kafka_server], - "prefix": kafka_prefix + ".swh.journal.objects", - } - - -@pytest.fixture -def consumer( - kafka_server: str, test_config: Dict, kafka_consumer_group: str, -) -> Consumer: - """Get a connected Kafka consumer. - - """ - consumer = Consumer( - { - "bootstrap.servers": kafka_server, - "auto.offset.reset": "earliest", - "enable.auto.commit": True, - "group.id": kafka_consumer_group, - } - ) - - kafka_topics = [ - "%s.%s" % (test_config["prefix"], object_type) - for object_type in test_config["object_types"] - ] - - consumer.subscribe(kafka_topics) - - yield consumer - - consumer.close() - def objects_d(): return one_of( strategies.origins().map(lambda x: ("origin", x.to_dict())), strategies.origin_visits().map(lambda x: ("origin_visit", x.to_dict())), strategies.snapshots().map(lambda x: ("snapshot", x.to_dict())), strategies.releases().map(lambda x: ("release", x.to_dict())), strategies.revisions().map(lambda x: ("revision", x.to_dict())), strategies.directories().map(lambda x: ("directory", x.to_dict())), strategies.skipped_contents().map(lambda x: ("skipped_content", x.to_dict())), strategies.present_contents().map(lambda x: ("content", x.to_dict())), ) diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/journal_data.py similarity index 57% copy from swh/journal/tests/conftest.py copy to swh/journal/tests/journal_data.py index 3b89a7b..12d0c64 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/journal_data.py @@ -1,252 +1,150 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime -import pytest -import logging -import random -import string -from confluent_kafka import Consumer, Producer +from typing import Any, Dict, List -from hypothesis.strategies import one_of -from typing import Any, Dict, Iterator, List - -from swh.model import hypothesis_strategies as strategies from swh.model.hashutil import MultiHash, hash_to_bytes - from swh.journal.serializers import ModelObject from swh.journal.writer.kafka import OBJECT_TYPES -logger = logging.getLogger(__name__) - CONTENTS = [ {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",}, ] duplicate_content1 = { "length": 4, "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), "sha1_git": b"another-foo", "blake2s256": b"another-bar", "sha256": b"another-baz", "status": "visible", } # Craft a sha1 collision duplicate_content2 = duplicate_content1.copy() sha1_array = bytearray(duplicate_content1["sha1_git"]) sha1_array[0] += 1 duplicate_content2["sha1_git"] = bytes(sha1_array) DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] COMMITTERS = [ {"fullname": b"foo", "name": b"foo", "email": b"",}, {"fullname": b"bar", "name": b"bar", "email": b"",}, ] DATES = [ { "timestamp": {"seconds": 1234567891, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, { "timestamp": {"seconds": 1234567892, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, ] REVISIONS = [ { "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), "message": b"hello", "date": DATES[0], "committer": COMMITTERS[0], "author": COMMITTERS[0], "committer_date": DATES[0], "type": "git", "directory": b"\x01" * 20, "synthetic": False, "metadata": None, "parents": [], }, { "id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"), "message": b"hello again", "date": DATES[1], "committer": COMMITTERS[1], "author": COMMITTERS[1], "committer_date": DATES[1], "type": "hg", "directory": b"\x02" * 20, "synthetic": False, "metadata": None, "parents": [], }, ] RELEASES = [ { "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), "name": b"v0.0.1", "date": { "timestamp": {"seconds": 1234567890, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, "author": COMMITTERS[0], "target_type": "revision", "target": b"\x04" * 20, "message": b"foo", "synthetic": False, }, ] ORIGINS = [ {"url": "https://somewhere.org/den/fox",}, {"url": "https://overtherainbow.org/fox/den",}, ] ORIGIN_VISITS = [ { "origin": ORIGINS[0]["url"], "date": "2013-05-07 04:20:39.369271+00:00", "snapshot": None, # TODO "status": "ongoing", # TODO "metadata": {"foo": "bar"}, "type": "git", }, { "origin": ORIGINS[0]["url"], "date": "2018-11-27 17:20:39+00:00", "snapshot": None, # TODO "status": "ongoing", # TODO "metadata": {"baz": "qux"}, "type": "git", }, ] TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = { "content": CONTENTS, "revision": REVISIONS, "release": RELEASES, "origin": ORIGINS, "origin_visit": ORIGIN_VISITS, } MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} TEST_OBJECTS: Dict[str, List[ModelObject]] = {} for object_type, objects in TEST_OBJECT_DICTS.items(): converted_objects: List[ModelObject] = [] model = MODEL_OBJECTS[object_type] for (num, obj_d) in enumerate(objects): if object_type == "origin_visit": obj_d = {**obj_d, "visit": num} elif object_type == "content": obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now()} converted_objects.append(model.from_dict(obj_d)) TEST_OBJECTS[object_type] = converted_objects - - -@pytest.fixture(scope="function") -def kafka_prefix(): - """Pick a random prefix for kafka topics on each call""" - return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) - - -@pytest.fixture(scope="function") -def kafka_consumer_group(kafka_prefix: str): - """Pick a random consumer group for kafka consumers on each call""" - return "test-consumer-%s" % kafka_prefix - - -@pytest.fixture(scope="session") -def kafka_server() -> Iterator[str]: - p = Producer({"test.mock.num.brokers": "1"}) - - metadata = p.list_topics() - brokers = [str(broker) for broker in metadata.brokers.values()] - assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" - - broker_connstr, broker_id = brokers[0].split("/") - ip, port_str = broker_connstr.split(":") - assert ip == "127.0.0.1" - assert int(port_str) - - yield broker_connstr - - p.flush() - - -TEST_CONFIG = { - "consumer_id": "swh.journal.consumer", - "object_types": TEST_OBJECT_DICTS.keys(), - "stop_after_objects": 1, # will read 1 object and stop - "storage": {"cls": "memory", "args": {}}, -} - - -@pytest.fixture -def test_config(kafka_server: str, kafka_prefix: str): - """Test configuration needed for producer/consumer - - """ - return { - **TEST_CONFIG, - "brokers": [kafka_server], - "prefix": kafka_prefix + ".swh.journal.objects", - } - - -@pytest.fixture -def consumer( - kafka_server: str, test_config: Dict, kafka_consumer_group: str, -) -> Consumer: - """Get a connected Kafka consumer. - - """ - consumer = Consumer( - { - "bootstrap.servers": kafka_server, - "auto.offset.reset": "earliest", - "enable.auto.commit": True, - "group.id": kafka_consumer_group, - } - ) - - kafka_topics = [ - "%s.%s" % (test_config["prefix"], object_type) - for object_type in test_config["object_types"] - ] - - consumer.subscribe(kafka_topics) - - yield consumer - - consumer.close() - - -def objects_d(): - return one_of( - strategies.origins().map(lambda x: ("origin", x.to_dict())), - strategies.origin_visits().map(lambda x: ("origin_visit", x.to_dict())), - strategies.snapshots().map(lambda x: ("snapshot", x.to_dict())), - strategies.releases().map(lambda x: ("release", x.to_dict())), - strategies.revisions().map(lambda x: ("revision", x.to_dict())), - strategies.directories().map(lambda x: ("directory", x.to_dict())), - strategies.skipped_contents().map(lambda x: ("skipped_content", x.to_dict())), - strategies.present_contents().map(lambda x: ("content", x.to_dict())), - ) diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py index 1ff8712..fa5e7d3 100644 --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,212 +1,151 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections import defaultdict - import pytest -from confluent_kafka import Consumer, Producer, KafkaException +from confluent_kafka import Consumer, Producer from swh.storage import get_storage -from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value -from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError - from swh.model.model import Directory, Origin, OriginVisit -from .conftest import TEST_OBJECTS, TEST_OBJECT_DICTS - - -def consume_messages(consumer, kafka_prefix, expected_messages): - """Consume expected_messages from the consumer; - Sort them all into a consumed_objects dict""" - consumed_messages = defaultdict(list) - - fetched_messages = 0 - retries_left = 1000 - - while fetched_messages < expected_messages: - if retries_left == 0: - raise ValueError("Timed out fetching messages from kafka") - - msg = consumer.poll(timeout=0.01) - - if not msg: - retries_left -= 1 - continue - - error = msg.error() - if error is not None: - if error.fatal(): - raise KafkaException(error) - retries_left -= 1 - continue - - fetched_messages += 1 - topic = msg.topic() - assert topic.startswith(kafka_prefix + "."), "Unexpected topic" - object_type = topic[len(kafka_prefix + ".") :] - - consumed_messages[object_type].append( - (kafka_to_key(msg.key()), kafka_to_value(msg.value())) - ) - - return consumed_messages - - -def assert_all_objects_consumed(consumed_messages): - """Check whether all objects from TEST_OBJECT_DICTS have been consumed""" - for object_type, known_values in TEST_OBJECT_DICTS.items(): - known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]] - - (received_keys, received_values) = zip(*consumed_messages[object_type]) - - if object_type == "origin_visit": - for value in received_values: - del value["visit"] - elif object_type == "content": - for value in received_values: - del value["ctime"] - - for key in known_keys: - assert key in received_keys - - for value in known_values: - assert value in received_values +from swh.journal.tests.journal_data import TEST_OBJECTS +from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed +from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer): kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, ) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): kafka_prefix += ".swh.journal.objects" writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, } storage_config = { "cls": "pipeline", "steps": [{"cls": "memory", "journal_writer": writer_config},], } storage = get_storage(**storage_config) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): method = getattr(storage, object_type + "_add") if object_type in ( "content", "directory", "revision", "release", "snapshot", "origin", ): method(objects) expected_messages += len(objects) elif object_type in ("origin_visit",): for obj in objects: assert isinstance(obj, OriginVisit) storage.origin_add_one(Origin(url=obj.origin)) visit = method(obj.origin, date=obj.date, type=obj.type) expected_messages += 1 obj_d = obj.to_dict() for k in ("visit", "origin", "date", "type"): del obj_d[k] storage.origin_visit_update(obj.origin, visit.visit, **obj_d) expected_messages += 1 else: assert False, object_type consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) def test_write_delivery_failure( kafka_prefix: str, kafka_server: str, consumer: Consumer ): class MockKafkaError: """A mocked kafka error""" def str(self): return "Mocked Kafka Error" def name(self): return "SWH_MOCK_ERROR" class KafkaJournalWriterFailDelivery(KafkaJournalWriter): """A journal writer which always fails delivering messages""" def _on_delivery(self, error, message): """Replace the inbound error with a fake delivery error""" super()._on_delivery(MockKafkaError(), message) kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriterFailDelivery( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, ) empty_dir = Directory(entries=[]) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert "Failed deliveries" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_MOCK_ERROR" def test_write_delivery_timeout( kafka_prefix: str, kafka_server: str, consumer: Consumer ): produced = [] class MockProducer(Producer): """A kafka producer which pretends to produce messages, but never sends any delivery acknowledgements""" def produce(self, **kwargs): produced.append(kwargs) kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, flush_timeout=1, producer_class=MockProducer, ) empty_dir = Directory(entries=[]) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert len(produced) == 1 assert "timeout" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_FLUSH_TIMEOUT"