diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py index cb63c7a..44bbb74 100644 --- a/swh/journal/pytest_plugin.py +++ b/swh/journal/pytest_plugin.py @@ -1,261 +1,261 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import random import string from typing import Any, Collection, Dict, Iterator, Optional import attr from confluent_kafka import Consumer, KafkaException, Producer from confluent_kafka.admin import AdminClient import pytest from swh.journal.serializers import kafka_to_key, kafka_to_value, pprint_key from swh.model.tests.swh_model_data import TEST_OBJECTS def ensure_lists(value: Any) -> Any: """ >>> ensure_lists(["foo", 42]) ['foo', 42] >>> ensure_lists(("foo", 42)) ['foo', 42] >>> ensure_lists({"a": ["foo", 42]}) {'a': ['foo', 42]} >>> ensure_lists({"a": ("foo", 42)}) {'a': ['foo', 42]} """ if isinstance(value, (tuple, list)): return list(map(ensure_lists, value)) elif isinstance(value, dict): return dict(ensure_lists(list(value.items()))) else: return value def consume_messages(consumer, kafka_prefix, expected_messages): """Consume expected_messages from the consumer; Sort them all into a consumed_objects dict""" consumed_messages = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError( "Timed out fetching messages from kafka. " f"Only {fetched_messages}/{expected_messages} fetched" ) msg = consumer.poll(timeout=0.01) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 topic = msg.topic() assert topic.startswith(f"{kafka_prefix}.") or topic.startswith( f"{kafka_prefix}_privileged." ), "Unexpected topic" object_type = topic[len(kafka_prefix + ".") :] consumed_messages[object_type].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) return consumed_messages def assert_all_objects_consumed( consumed_messages: Dict, exclude: Optional[Collection] = None ): """Check whether all objects from TEST_OBJECTS have been consumed `exclude` can be a list of object types for which we do not want to compare the values (eg. for anonymized object). """ for object_type, known_objects in TEST_OBJECTS.items(): known_keys = [obj.unique_key() for obj in known_objects] if not consumed_messages[object_type]: return (received_keys, received_values) = zip(*consumed_messages[object_type]) if object_type in ("content", "skipped_content"): for value in received_values: value.pop("ctime", None) if object_type == "content": known_objects = [attr.evolve(o, data=None) for o in known_objects] for key in known_keys: assert key in received_keys, ( f"expected {object_type} key {pprint_key(key)} " "absent from consumed messages" ) if exclude and object_type in exclude: continue for value in known_objects: expected_value = value.to_dict() if value.object_type in ("content", "skipped_content"): expected_value.pop("ctime", None) assert ensure_lists(expected_value) in received_values, ( f"expected {object_type} value {value!r} is " "absent from consumed messages" ) @pytest.fixture(scope="function") def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) @pytest.fixture(scope="function") def kafka_consumer_group(kafka_prefix: str): """Pick a random consumer group for kafka consumers on each call""" return "test-consumer-%s" % kafka_prefix @pytest.fixture(scope="function") def object_types(): """Set of object types to precreate topics for.""" return set(TEST_OBJECTS.keys()) @pytest.fixture(scope="function") def privileged_object_types(): """Set of object types to precreate privileged topics for.""" return {"revision", "release"} @pytest.fixture(scope="function") def kafka_server( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ) -> str: """A kafka server with existing topics Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type from the ``object_types`` list. Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with object_type from the ``privileged_object_types`` list. """ topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [ f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types ] # unfortunately, the Mock broker does not support the CreatTopic admin API, so we # have to create topics using a Producer. producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "bootstrap producer", "acks": "all", } ) for topic in topics: producer.produce(topic=topic, value=None) for i in range(10): if producer.flush(0.1) == 0: break return kafka_server_base @pytest.fixture(scope="session") def kafka_server_base() -> Iterator[str]: """Create a mock kafka cluster suitable for tests. Yield a connection string. Note: this is a generator to keep the mock broker alive during the whole test session. see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h """ admin = AdminClient({"test.mock.num.brokers": "1"}) metadata = admin.list_topics() brokers = [str(broker) for broker in metadata.brokers.values()] assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" broker_connstr, broker_id = brokers[0].split("/") yield broker_connstr TEST_CONFIG = { "consumer_id": "swh.journal.consumer", - "stop_after_objects": 1, # will read 1 object and stop + "stop_on_eof": True, "storage": {"cls": "memory", "args": {}}, } @pytest.fixture def test_config( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ): """Test configuration needed for producer/consumer """ return { **TEST_CONFIG, "object_types": object_types, "privileged_object_types": privileged_object_types, "brokers": [kafka_server_base], "prefix": kafka_prefix, } @pytest.fixture def consumer( kafka_server: str, test_config: Dict, kafka_consumer_group: str ) -> Consumer: """Get a connected Kafka consumer. """ consumer = Consumer( { "bootstrap.servers": kafka_server, "auto.offset.reset": "earliest", "enable.auto.commit": True, "group.id": kafka_consumer_group, } ) prefix = test_config["prefix"] kafka_topics = [ f"{prefix}.{object_type}" for object_type in test_config["object_types"] ] + [ f"{prefix}_privileged.{object_type}" for object_type in test_config["privileged_object_types"] ] consumer.subscribe(kafka_topics) yield consumer # Explicitly perform the commit operation on the consumer before closing it # to avoid possible hang since confluent-kafka v1.6.0 consumer.commit() consumer.close() diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py index 3cbda30..82d6979 100644 --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -1,377 +1,395 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, List, cast from unittest.mock import MagicMock from confluent_kafka import Producer import pytest from swh.journal.client import JournalClient from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka from swh.model.model import Content, Revision from swh.model.tests.swh_model_data import TEST_OBJECTS REV = { "message": b"something cool", "author": {"fullname": b"Peter", "name": None, "email": b"peter@ouiche.lo"}, "committer": {"fullname": b"Stephen", "name": b"From Outer Space", "email": None}, "date": { "timestamp": {"seconds": 123456789, "microseconds": 123}, "offset": 120, "negative_utc": False, }, "committer_date": { "timestamp": {"seconds": 123123456, "microseconds": 0}, "offset": 0, "negative_utc": False, }, "type": "git", "directory": ( b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" b"\x01\x02\x03\x04\x05" ), "synthetic": False, "metadata": None, "parents": [], "id": b"\x8b\xeb\xd1\x9d\x07\xe2\x1e0\xe2 \x91X\x8d\xbd\x1c\xa8\x86\xdeB\x0c", } def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) # Fill Kafka producer.produce( topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, - stop_after_objects=1, + stop_on_eof=True, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"revision": [REV]}) -def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): +@pytest.mark.parametrize("count", [1, 2]) +def test_client_stop_after_objects( + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, count: int +): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) # Fill Kafka - producer.produce( - topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), - ) + revisions = cast(List[Revision], TEST_OBJECTS["revision"]) + for rev in revisions: + producer.produce( + topic=kafka_prefix + ".revision", + key=rev.id, + value=value_to_kafka(rev.to_dict()), + ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, - stop_after_objects=None, - stop_on_eof=True, + stop_on_eof=False, + stop_after_objects=count, ) worker_fn = MagicMock() client.process(worker_fn) - worker_fn.assert_called_once_with({"revision": [REV]}) + # this code below is not pretty, but needed since we have to deal with + # dicts (so no set) which can have values that are list vs tuple, and we do + # not know for sure how many calls of the worker_fn will happen during the + # consumption of the topic... + worker_fn.assert_called() + revs = [] # list of (unique) rev dicts we got from the client + for call in worker_fn.call_args_list: + callrevs = call[0][0]["revision"] + for rev in callrevs: + assert Revision.from_dict(rev) in revisions + if rev not in revs: + revs.append(rev) + assert len(revs) == count @pytest.mark.parametrize("batch_size", [1, 5, 100]) def test_client_batch_size( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int, ): num_objects = 2 * batch_size + 1 assert num_objects < 256, "Too many objects, generation will fail" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) contents = [Content.from_data(bytes([i])) for i in range(num_objects)] # Fill Kafka for content in contents: producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(content.sha1), value=value_to_kafka(content.to_dict()), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, - stop_after_objects=num_objects, + stop_on_eof=True, batch_size=batch_size, ) collected_output: List[Dict] = [] def worker_fn(objects): received = objects["content"] assert len(received) <= batch_size collected_output.extend(received) client.process(worker_fn) expected_output = [content.to_dict() for content in contents] assert len(collected_output) == len(expected_output) for output in collected_output: assert output in expected_output @pytest.fixture() def kafka_producer(kafka_prefix: str, kafka_server_base: str): producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } ) # Fill Kafka producer.produce( topic=kafka_prefix + ".something", key=key_to_kafka(b"key1"), value=value_to_kafka("value1"), ) producer.produce( topic=kafka_prefix + ".else", key=key_to_kafka(b"key1"), value=value_to_kafka("value2"), ) producer.flush() return producer def test_client_subscribe_all( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): client = JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, - stop_after_objects=2, + stop_on_eof=True, ) assert set(client.subscription) == { f"{kafka_prefix}.something", f"{kafka_prefix}.else", } worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with( {"something": ["value1"], "else": ["value2"],} ) def test_client_subscribe_one_topic( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): client = JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, - stop_after_objects=1, + stop_on_eof=True, object_types=["else"], ) assert client.subscription == [f"{kafka_prefix}.else"] worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"else": ["value2"]}) def test_client_subscribe_absent_topic( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, - stop_after_objects=1, + stop_on_eof=True, object_types=["really"], ) def test_client_subscribe_absent_prefix( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", - stop_after_objects=1, + stop_on_eof=True, ) with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", - stop_after_objects=1, + stop_on_eof=True, object_types=["else"], ) def test_client_subscriptions_with_anonymized_topics( kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str ): producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } ) # Fill Kafka with revision object on both the regular prefix (normally for # anonymized objects in this case) and privileged one producer.produce( topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), ) producer.produce( topic=kafka_prefix + "_privileged.revision", key=REV["id"], value=value_to_kafka(REV), ) producer.flush() # without privileged "channels" activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, - stop_after_objects=1, + stop_on_eof=True, privileged=False, ) # we only subscribed to "standard" topics assert client.subscription == [kafka_prefix + ".revision"] # with privileged "channels" activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, - stop_after_objects=1, privileged=True, ) # we only subscribed to "privileged" topics assert client.subscription == [kafka_prefix + "_privileged.revision"] def test_client_subscriptions_without_anonymized_topics( kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str ): producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } ) # Fill Kafka with revision objects only on the standard prefix producer.produce( topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), ) producer.flush() # without privileged channel activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, - stop_after_objects=1, + stop_on_eof=True, privileged=False, ) # we only subscribed to the standard prefix assert client.subscription == [kafka_prefix + ".revision"] # with privileged channel activated on the client side client = JournalClient( brokers=[kafka_server_base], group_id=kafka_consumer_group, prefix=kafka_prefix, - stop_after_objects=1, + stop_on_eof=True, privileged=True, ) # we also only subscribed to the standard prefix, since there is no priviled prefix # on the kafka broker assert client.subscription == [kafka_prefix + ".revision"] def test_client_with_deserializer( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str ): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) # Fill Kafka revisions = cast(List[Revision], TEST_OBJECTS["revision"]) for rev in revisions: producer.produce( topic=kafka_prefix + ".revision", key=rev.id, value=value_to_kafka(rev.to_dict()), ) producer.flush() def custom_deserializer(object_type, msg): assert object_type == "revision" obj = kafka_to_value(msg) # filter the first revision if obj["id"] == revisions[0].id: return None return Revision.from_dict(obj) client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, value_deserializer=custom_deserializer, ) worker_fn = MagicMock() client.process(worker_fn) # a commit seems to be needed to prevent some race condition situation # where the worker_fn has not yet been called at this point (not sure how) client.consumer.commit() # Check the first revision has not been passed to worker_fn worker_fn.assert_called_once_with({"revision": revisions[1:]}) diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py index 4070714..ece39ba 100644 --- a/swh/journal/tests/test_pytest_plugin.py +++ b/swh/journal/tests/test_pytest_plugin.py @@ -1,73 +1,73 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Iterator from confluent_kafka.admin import AdminClient def test_kafka_server(kafka_server_base: str): ip, port_str = kafka_server_base.split(":") assert ip == "127.0.0.1" assert int(port_str) admin = AdminClient({"bootstrap.servers": kafka_server_base}) topics = admin.list_topics() assert len(topics.brokers) == 1 def test_kafka_server_with_topics( kafka_server: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ): admin = AdminClient({"bootstrap.servers": kafka_server}) # check unprivileged topics are present topics = { topic for topic in admin.list_topics().topics if topic.startswith(f"{kafka_prefix}.") } assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types} # check privileged topics are present topics = { topic for topic in admin.list_topics().topics if topic.startswith(f"{kafka_prefix}_privileged.") } assert topics == { f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types } def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str): assert test_config == { "consumer_id": "swh.journal.consumer", - "stop_after_objects": 1, + "stop_on_eof": True, "storage": {"cls": "memory", "args": {}}, "object_types": { "content", "directory", "extid", "metadata_authority", "metadata_fetcher", "origin", "origin_visit", "origin_visit_status", "raw_extrinsic_metadata", "release", "revision", "snapshot", "skipped_content", }, "privileged_object_types": {"release", "revision",}, "brokers": [kafka_server_base], "prefix": kafka_prefix, }