diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py index b8faf86..6a5f5f5 100644 --- a/swh/journal/pytest_plugin.py +++ b/swh/journal/pytest_plugin.py @@ -1,222 +1,236 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random import string -from typing import Dict, Iterator +from typing import Collection, Dict, Iterator, Optional from collections import defaultdict import pytest from confluent_kafka import Consumer, KafkaException, Producer from confluent_kafka.admin import AdminClient +from swh.model.hashutil import hash_to_hex from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value from swh.journal.tests.journal_data import TEST_OBJECTS, TEST_OBJECT_DICTS def consume_messages(consumer, kafka_prefix, expected_messages): """Consume expected_messages from the consumer; Sort them all into a consumed_objects dict""" consumed_messages = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError( "Timed out fetching messages from kafka. " f"Only {fetched_messages}/{expected_messages} fetched" ) msg = consumer.poll(timeout=0.01) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 topic = msg.topic() assert topic.startswith(f"{kafka_prefix}.") or topic.startswith( f"{kafka_prefix}_privileged." ), "Unexpected topic" object_type = topic[len(kafka_prefix + ".") :] consumed_messages[object_type].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) return consumed_messages -def assert_all_objects_consumed(consumed_messages): - """Check whether all objects from TEST_OBJECT_DICTS have been consumed""" +def assert_all_objects_consumed( + consumed_messages: Dict, exclude: Optional[Collection] = None +): + """Check whether all objects from TEST_OBJECT_DICTS have been consumed + + `exclude` can be a list of object types for which we do not want to compare the + values (eg. for anonymized object). + + """ for object_type, known_values in TEST_OBJECT_DICTS.items(): known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]] if not consumed_messages[object_type]: return (received_keys, received_values) = zip(*consumed_messages[object_type]) - if object_type == "origin_visit": - for value in received_values: - del value["visit"] - elif object_type == "content": + if object_type in ("content", "skipped_content"): for value in received_values: del value["ctime"] for key in known_keys: - assert key in received_keys + assert key in received_keys, ( + f"expected {object_type} key {hash_to_hex(key)} " + "absent from consumed messages" + ) + + if exclude and object_type in exclude: + continue for value in known_values: - assert value in received_values + assert value in received_values, ( + f"expected {object_type} value {value!r} is " + "absent from consumed messages" + ) @pytest.fixture(scope="function") def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) @pytest.fixture(scope="function") def kafka_consumer_group(kafka_prefix: str): """Pick a random consumer group for kafka consumers on each call""" return "test-consumer-%s" % kafka_prefix @pytest.fixture(scope="function") def object_types(): """Set of object types to precreate topics for.""" return set(TEST_OBJECT_DICTS.keys()) @pytest.fixture(scope="function") def privileged_object_types(): """Set of object types to precreate privileged topics for.""" return {"revision", "release"} @pytest.fixture(scope="function") def kafka_server( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ) -> str: """A kafka server with existing topics Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type from the ``object_types`` list. Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with object_type from the ``privileged_object_types`` list. """ topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [ f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types ] # unfortunately, the Mock broker does not support the CreatTopic admin API, so we # have to create topics using a Producer. producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "bootstrap producer", "acks": "all", } ) for topic in topics: producer.produce(topic=topic, value=None) for i in range(10): if producer.flush(0.1) == 0: break return kafka_server_base @pytest.fixture(scope="session") def kafka_server_base() -> Iterator[str]: """Create a mock kafka cluster suitable for tests. Yield a connection string. Note: this is a generator to keep the mock broker alive during the whole test session. see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h """ admin = AdminClient({"test.mock.num.brokers": "1"}) metadata = admin.list_topics() brokers = [str(broker) for broker in metadata.brokers.values()] assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" broker_connstr, broker_id = brokers[0].split("/") yield broker_connstr TEST_CONFIG = { "consumer_id": "swh.journal.consumer", "stop_after_objects": 1, # will read 1 object and stop "storage": {"cls": "memory", "args": {}}, } @pytest.fixture def test_config( kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str], privileged_object_types: Iterator[str], ): """Test configuration needed for producer/consumer """ return { **TEST_CONFIG, "object_types": object_types, "privileged_object_types": privileged_object_types, "brokers": [kafka_server_base], "prefix": kafka_prefix, } @pytest.fixture def consumer( kafka_server: str, test_config: Dict, kafka_consumer_group: str ) -> Consumer: """Get a connected Kafka consumer. """ consumer = Consumer( { "bootstrap.servers": kafka_server, "auto.offset.reset": "earliest", "enable.auto.commit": True, "group.id": kafka_consumer_group, } ) prefix = test_config["prefix"] kafka_topics = [ f"{prefix}.{object_type}" for object_type in test_config["object_types"] ] + [ f"{prefix}_privileged.{object_type}" for object_type in test_config["privileged_object_types"] ] consumer.subscribe(kafka_topics) yield consumer consumer.close() diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py index 986eeb3..30a5796 100644 --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -1,112 +1,121 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Union, overload import msgpack from swh.core.api.serializers import msgpack_dumps, msgpack_loads from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.model import ( Content, Directory, Origin, OriginVisit, Release, Revision, SkippedContent, Snapshot, ) ModelObject = Union[ Content, Directory, Origin, OriginVisit, Release, Revision, SkippedContent, Snapshot ] KeyType = Union[Dict[str, str], Dict[str, bytes], bytes] # these @overload'ed versions of the object_key method aim at helping mypy figuring # the correct type-ing. @overload def object_key( object_type: str, object_: Union[Content, Directory, Revision, Release, Snapshot] ) -> bytes: ... @overload def object_key( object_type: str, object_: Union[Origin, SkippedContent] ) -> Dict[str, bytes]: ... @overload def object_key(object_type: str, object_: OriginVisit) -> Dict[str, str]: ... def object_key(object_type: str, object_) -> KeyType: if object_type in ("revision", "release", "directory", "snapshot"): return object_.id elif object_type == "content": return object_.sha1 # TODO: use a dict of hashes elif object_type == "skipped_content": return {hash: getattr(object_, hash) for hash in DEFAULT_ALGORITHMS} elif object_type == "origin": return {"url": object_.url} elif object_type == "origin_visit": return { "origin": object_.origin, "date": str(object_.date), } else: raise ValueError("Unknown object type: %s." % object_type) def stringify_key_item(k: str, v: Union[str, bytes]) -> str: """Turn the item of a dict key into a string""" if isinstance(v, str): return v if k == "url": return v.decode("utf-8") return v.hex() def pprint_key(key: KeyType) -> str: """Pretty-print a kafka key""" if isinstance(key, dict): return "{%s}" % ", ".join( f"{k}: {stringify_key_item(k, v)}" for k, v in key.items() ) elif isinstance(key, bytes): return key.hex() else: return key def key_to_kafka(key: KeyType) -> bytes: """Serialize a key, possibly a dict, in a predictable way""" p = msgpack.Packer(use_bin_type=True) if isinstance(key, dict): return p.pack_map_pairs(sorted(key.items())) else: return p.pack(key) def kafka_to_key(kafka_key: bytes) -> KeyType: """Deserialize a key""" return msgpack.loads(kafka_key, raw=False) def value_to_kafka(value: Any) -> bytes: """Serialize some data for storage in kafka""" return msgpack_dumps(value) def kafka_to_value(kafka_value: bytes) -> Any: """Deserialize some data stored in kafka""" - return msgpack_loads(kafka_value) + value = msgpack_loads(kafka_value) + if isinstance(value, list): + return tuple(value) + if isinstance(value, dict): + return ensure_tuples(value) + return value + + +def ensure_tuples(value: Dict) -> Dict: + return {k: tuple(v) if isinstance(v, list) else v for k, v in value.items()} diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py index 660c14f..ebd07f7 100644 --- a/swh/journal/tests/journal_data.py +++ b/swh/journal/tests/journal_data.py @@ -1,177 +1,276 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from typing import Any, Dict, List, Type from swh.model.hashutil import MultiHash, hash_to_bytes from swh.journal.serializers import ModelObject from swh.model.model import ( BaseModel, Content, Directory, Origin, OriginVisit, Release, Revision, SkippedContent, Snapshot, ) OBJECT_TYPES: Dict[Type[BaseModel], str] = { Content: "content", Directory: "directory", Origin: "origin", OriginVisit: "origin_visit", Release: "release", Revision: "revision", SkippedContent: "skipped_content", Snapshot: "snapshot", } UTC = datetime.timezone.utc CONTENTS = [ - {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",}, + { + **MultiHash.from_data(f"foo{i}".encode()).digest(), + "length": 4, + "status": "visible", + } + for i in range(10) +] +SKIPPED_CONTENTS = [ + { + **MultiHash.from_data(f"bar{i}".encode()).digest(), + "length": 4, + "status": "absent", + "reason": f"because chr({i}) != '*'", + } + for i in range(2) ] duplicate_content1 = { "length": 4, "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), "sha1_git": b"another-foo", "blake2s256": b"another-bar", "sha256": b"another-baz", "status": "visible", } # Craft a sha1 collision duplicate_content2 = duplicate_content1.copy() sha1_array = bytearray(duplicate_content1["sha1_git"]) sha1_array[0] += 1 duplicate_content2["sha1_git"] = bytes(sha1_array) DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] COMMITTERS = [ {"fullname": b"foo", "name": b"foo", "email": b"",}, {"fullname": b"bar", "name": b"bar", "email": b"",}, ] DATES = [ { "timestamp": {"seconds": 1234567891, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, { "timestamp": {"seconds": 1234567892, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, ] REVISIONS = [ { "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), "message": b"hello", "date": DATES[0], "committer": COMMITTERS[0], "author": COMMITTERS[0], "committer_date": DATES[0], "type": "git", "directory": b"\x01" * 20, "synthetic": False, "metadata": None, "parents": (), }, { "id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"), "message": b"hello again", "date": DATES[1], "committer": COMMITTERS[1], "author": COMMITTERS[1], "committer_date": DATES[1], "type": "hg", "directory": b"\x02" * 20, "synthetic": False, "metadata": None, "parents": (), }, ] RELEASES = [ { "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), "name": b"v0.0.1", "date": { "timestamp": {"seconds": 1234567890, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, "author": COMMITTERS[0], "target_type": "revision", "target": b"\x04" * 20, "message": b"foo", "synthetic": False, }, ] ORIGINS = [ {"url": "https://somewhere.org/den/fox",}, {"url": "https://overtherainbow.org/fox/den",}, ] ORIGIN_VISITS = [ { "origin": ORIGINS[0]["url"], "date": "2013-05-07 04:20:39.369271+00:00", - "snapshot": None, # TODO - "status": "ongoing", # TODO + "snapshot": None, + "status": "ongoing", "metadata": {"foo": "bar"}, "type": "git", + "visit": 1, + }, + { + "origin": ORIGINS[1]["url"], + "date": "2014-11-27 17:20:39+00:00", + "snapshot": None, + "status": "ongoing", + "metadata": {"baz": "qux"}, + "type": "hg", + "visit": 1, + }, + { + "origin": ORIGINS[0]["url"], + "date": "2018-11-27 17:20:39+00:00", + "snapshot": None, + "status": "ongoing", + "metadata": {"baz": "qux"}, + "type": "git", + "visit": 2, }, { "origin": ORIGINS[0]["url"], "date": "2018-11-27 17:20:39+00:00", - "snapshot": None, # TODO - "status": "ongoing", # TODO + "snapshot": hash_to_bytes("742cdc6be7bf6e895b055227c2300070f056e07b"), + "status": "full", "metadata": {"baz": "qux"}, "type": "git", + "visit": 3, + }, + { + "origin": ORIGINS[1]["url"], + "date": "2015-11-27 17:20:39+00:00", + "snapshot": hash_to_bytes("ecee48397a92b0d034e9752a17459f3691a73ef9"), + "status": "partial", + "metadata": {"something": "wrong occurred"}, + "type": "hg", + "visit": 2, }, ] + +DIRECTORIES = [ + {"id": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), "entries": ()}, + { + "id": hash_to_bytes("cc13247a0d6584f297ca37b5868d2cbd242aea03"), + "entries": ( + { + "name": b"file1.ext", + "perms": 0o644, + "type": "file", + "target": CONTENTS[0]["sha1_git"], + }, + { + "name": b"dir1", + "perms": 0o755, + "type": "dir", + "target": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), + }, + { + "name": b"subprepo1", + "perms": 0o160000, + "type": "rev", + "target": REVISIONS[1]["id"], + }, + ), + }, +] + + +SNAPSHOTS = [ + { + "id": hash_to_bytes("742cdc6be7bf6e895b055227c2300070f056e07b"), + "branches": { + b"master": {"target_type": "revision", "target": REVISIONS[0]["id"]} + }, + }, + { + "id": hash_to_bytes("ecee48397a92b0d034e9752a17459f3691a73ef9"), + "branches": { + b"target/revision": { + "target_type": "revision", + "target": REVISIONS[0]["id"], + }, + b"target/alias": {"target_type": "alias", "target": b"target/revision"}, + b"target/directory": { + "target_type": "directory", + "target": DIRECTORIES[0]["id"], + }, + b"target/release": {"target_type": "release", "target": RELEASES[0]["id"]}, + b"target/snapshot": { + "target_type": "snapshot", + "target": hash_to_bytes("742cdc6be7bf6e895b055227c2300070f056e07b"), + }, + }, + }, +] + + TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = { "content": CONTENTS, - "directory": [], + "directory": DIRECTORIES, "origin": ORIGINS, "origin_visit": ORIGIN_VISITS, "release": RELEASES, "revision": REVISIONS, - "snapshot": [], - "skipped_content": [], + "snapshot": SNAPSHOTS, + "skipped_content": SKIPPED_CONTENTS, } MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} TEST_OBJECTS: Dict[str, List[ModelObject]] = {} for object_type, objects in TEST_OBJECT_DICTS.items(): converted_objects: List[ModelObject] = [] model = MODEL_OBJECTS[object_type] for (num, obj_d) in enumerate(objects): - if object_type == "origin_visit": - obj_d = {**obj_d, "visit": num} - elif object_type == "content": + if object_type == "content": obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now(tz=UTC)} converted_objects.append(model.from_dict(obj_d)) TEST_OBJECTS[object_type] = converted_objects diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py index 79b2b31..1fe02cc 100644 --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,166 +1,166 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Iterable import pytest from confluent_kafka import Consumer, Producer from swh.model.model import Directory, Revision, Release from swh.journal.tests.journal_data import TEST_OBJECTS from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError def test_kafka_writer( kafka_prefix: str, kafka_server: str, consumer: Consumer, privileged_object_types: Iterable[str], ): writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, anonymize=False, ) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) for key, obj_dict in consumed_messages["revision"]: obj = Revision.from_dict(obj_dict) for person in (obj.author, obj.committer): assert not ( len(person.fullname) == 32 and person.name is None and person.email is None ) for key, obj_dict in consumed_messages["release"]: obj = Release.from_dict(obj_dict) for person in (obj.author,): assert not ( len(person.fullname) == 32 and person.name is None and person.email is None ) def test_kafka_writer_anonymized( kafka_prefix: str, kafka_server: str, consumer: Consumer, privileged_object_types: Iterable[str], ): writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, anonymize=True, ) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) expected_messages += len(objects) if object_type in privileged_object_types: expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) - assert_all_objects_consumed(consumed_messages) + assert_all_objects_consumed(consumed_messages, exclude=["revision", "release"]) for key, obj_dict in consumed_messages["revision"]: obj = Revision.from_dict(obj_dict) for person in (obj.author, obj.committer): assert ( len(person.fullname) == 32 and person.name is None and person.email is None ) for key, obj_dict in consumed_messages["release"]: obj = Release.from_dict(obj_dict) for person in (obj.author,): assert ( len(person.fullname) == 32 and person.name is None and person.email is None ) def test_write_delivery_failure( kafka_prefix: str, kafka_server: str, consumer: Consumer ): class MockKafkaError: """A mocked kafka error""" def str(self): return "Mocked Kafka Error" def name(self): return "SWH_MOCK_ERROR" class KafkaJournalWriterFailDelivery(KafkaJournalWriter): """A journal writer which always fails delivering messages""" def _on_delivery(self, error, message): """Replace the inbound error with a fake delivery error""" super()._on_delivery(MockKafkaError(), message) kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriterFailDelivery( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, ) empty_dir = Directory(entries=()) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert "Failed deliveries" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_MOCK_ERROR" def test_write_delivery_timeout( kafka_prefix: str, kafka_server: str, consumer: Consumer ): produced = [] class MockProducer(Producer): """A kafka producer which pretends to produce messages, but never sends any delivery acknowledgements""" def produce(self, **kwargs): produced.append(kwargs) writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, flush_timeout=1, producer_class=MockProducer, ) empty_dir = Directory(entries=()) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert len(produced) == 1 assert "timeout" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_FLUSH_TIMEOUT"