diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py index e60e863..1af8cdf 100644 --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -1,36 +1,90 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Union +from typing import Any, Dict, Union, overload import msgpack from swh.core.api.serializers import msgpack_dumps, msgpack_loads +from swh.model.hashutil import DEFAULT_ALGORITHMS +from swh.model.model import ( + Content, + Directory, + Origin, + OriginVisit, + Release, + Revision, + SkippedContent, + Snapshot, +) + +ModelObject = Union[ + Content, Directory, Origin, OriginVisit, Release, Revision, SkippedContent, Snapshot +] KeyType = Union[Dict[str, str], Dict[str, bytes], bytes] +# these @overload'ed versions of the object_key method aim at helping mypy figuring +# the correct type-ing. +@overload +def object_key( + object_type: str, object_: Union[Content, Directory, Revision, Release, Snapshot] +) -> bytes: + ... + + +@overload +def object_key( + object_type: str, object_: Union[Origin, SkippedContent] +) -> Dict[str, bytes]: + ... + + +@overload +def object_key(object_type: str, object_: OriginVisit) -> Dict[str, str]: + ... + + +def object_key(object_type: str, object_) -> KeyType: + if object_type in ("revision", "release", "directory", "snapshot"): + return object_.id + elif object_type == "content": + return object_.sha1 # TODO: use a dict of hashes + elif object_type == "skipped_content": + return {hash: getattr(object_, hash) for hash in DEFAULT_ALGORITHMS} + elif object_type == "origin": + return {"url": object_.url} + elif object_type == "origin_visit": + return { + "origin": object_.origin, + "date": str(object_.date), + } + else: + raise ValueError("Unknown object type: %s." % object_type) + + def key_to_kafka(key: KeyType) -> bytes: """Serialize a key, possibly a dict, in a predictable way""" p = msgpack.Packer(use_bin_type=True) if isinstance(key, dict): return p.pack_map_pairs(sorted(key.items())) else: return p.pack(key) def kafka_to_key(kafka_key: bytes) -> KeyType: """Deserialize a key""" return msgpack.loads(kafka_key) def value_to_kafka(value: Any) -> bytes: """Serialize some data for storage in kafka""" return msgpack_dumps(value) def kafka_to_value(kafka_value: bytes) -> Any: """Deserialize some data stored in kafka""" return msgpack_loads(kafka_value) diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index 5a01b06..1701ff9 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,358 +1,358 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import os import pytest import logging import random import string from confluent_kafka import Consumer from confluent_kafka.admin import AdminClient, ConfigResource from hypothesis.strategies import one_of from subprocess import Popen from typing import Any, Dict, Iterator, List, Optional, Tuple from pathlib import Path from pytest_kafka import ( make_zookeeper_process, make_kafka_server, KAFKA_SERVER_CONFIG_TEMPLATE, ZOOKEEPER_CONFIG_TEMPLATE, ) from swh.model import hypothesis_strategies as strategies from swh.model.hashutil import MultiHash, hash_to_bytes - -from swh.journal.writer.kafka import OBJECT_TYPES, ModelObject +from swh.journal.serializers import ModelObject +from swh.journal.writer.kafka import OBJECT_TYPES logger = logging.getLogger(__name__) CONTENTS = [ {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",}, ] duplicate_content1 = { "length": 4, "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), "sha1_git": b"another-foo", "blake2s256": b"another-bar", "sha256": b"another-baz", "status": "visible", } # Craft a sha1 collision duplicate_content2 = duplicate_content1.copy() sha1_array = bytearray(duplicate_content1["sha1_git"]) sha1_array[0] += 1 duplicate_content2["sha1_git"] = bytes(sha1_array) DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] COMMITTERS = [ {"fullname": b"foo", "name": b"foo", "email": b"",}, {"fullname": b"bar", "name": b"bar", "email": b"",}, ] DATES = [ { "timestamp": {"seconds": 1234567891, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, { "timestamp": {"seconds": 1234567892, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, ] REVISIONS = [ { "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), "message": b"hello", "date": DATES[0], "committer": COMMITTERS[0], "author": COMMITTERS[0], "committer_date": DATES[0], "type": "git", "directory": b"\x01" * 20, "synthetic": False, "metadata": None, "parents": [], }, { "id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"), "message": b"hello again", "date": DATES[1], "committer": COMMITTERS[1], "author": COMMITTERS[1], "committer_date": DATES[1], "type": "hg", "directory": b"\x02" * 20, "synthetic": False, "metadata": None, "parents": [], }, ] RELEASES = [ { "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), "name": b"v0.0.1", "date": { "timestamp": {"seconds": 1234567890, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, "author": COMMITTERS[0], "target_type": "revision", "target": b"\x04" * 20, "message": b"foo", "synthetic": False, }, ] ORIGINS = [ {"url": "https://somewhere.org/den/fox",}, {"url": "https://overtherainbow.org/fox/den",}, ] ORIGIN_VISITS = [ { "origin": ORIGINS[0]["url"], "date": "2013-05-07 04:20:39.369271+00:00", "snapshot": None, # TODO "status": "ongoing", # TODO "metadata": {"foo": "bar"}, "type": "git", }, { "origin": ORIGINS[0]["url"], "date": "2018-11-27 17:20:39+00:00", "snapshot": None, # TODO "status": "ongoing", # TODO "metadata": {"baz": "qux"}, "type": "git", }, ] TEST_OBJECT_DICTS: Dict[str, Tuple[Optional[str], List[Dict[str, Any]]]] = { "content": ("sha1", CONTENTS), "revision": ("id", REVISIONS), "release": ("id", RELEASES), "origin": (None, ORIGINS), "origin_visit": (None, ORIGIN_VISITS), } MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} TEST_OBJECTS: Dict[str, List[ModelObject]] = {} for object_type, (_, objects) in TEST_OBJECT_DICTS.items(): converted_objects: List[ModelObject] = [] model = MODEL_OBJECTS[object_type] for (num, obj_d) in enumerate(objects): if object_type == "origin_visit": obj_d = {**obj_d, "visit": num} elif object_type == "content": obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now()} converted_objects.append(model.from_dict(obj_d)) TEST_OBJECTS[object_type] = converted_objects KAFKA_ROOT = os.environ.get("SWH_KAFKA_ROOT") KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + "/kafka" if not os.path.exists(KAFKA_ROOT): msg = ( "Development error: %s must exist and target an " "existing kafka installation" % KAFKA_ROOT ) raise ValueError(msg) KAFKA_SCRIPTS = Path(KAFKA_ROOT) / "bin" KAFKA_BIN = str(KAFKA_SCRIPTS / "kafka-server-start.sh") ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / "zookeeper-server-start.sh") ZK_CONFIG_TEMPLATE = ZOOKEEPER_CONFIG_TEMPLATE + "\nadmin.enableServer=false\n" KAFKA_CONFIG_TEMPLATE = KAFKA_SERVER_CONFIG_TEMPLATE + "\nmessage.max.bytes=104857600\n" # Those defines fixtures zookeeper_proc = make_zookeeper_process( ZOOKEEPER_BIN, zk_config_template=ZK_CONFIG_TEMPLATE, scope="session" ) os.environ[ "KAFKA_LOG4J_OPTS" ] = "-Dlog4j.configuration=file:%s/log4j.properties" % os.path.dirname(__file__) session_kafka_server = make_kafka_server( KAFKA_BIN, "zookeeper_proc", kafka_config_template=KAFKA_CONFIG_TEMPLATE, scope="session", ) kafka_logger = logging.getLogger("kafka") kafka_logger.setLevel(logging.WARN) @pytest.fixture(scope="function") def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) @pytest.fixture(scope="function") def kafka_consumer_group(kafka_prefix: str): """Pick a random consumer group for kafka consumers on each call""" return "test-consumer-%s" % kafka_prefix @pytest.fixture(scope="session") def kafka_admin_client(session_kafka_server: Tuple[Popen, int]) -> AdminClient: return AdminClient({"bootstrap.servers": "localhost:%s" % session_kafka_server[1]}) @pytest.fixture(scope="function") def kafka_server_config_overrides() -> Dict[str, str]: return {} @pytest.fixture(scope="function") def kafka_server( session_kafka_server: Tuple[Popen, int], kafka_admin_client: AdminClient, kafka_server_config_overrides: Dict[str, str], ) -> Iterator[Tuple[Popen, int]]: # No overrides, we can just return the original broker connection if not kafka_server_config_overrides: yield session_kafka_server return # This is the minimal operation that the kafka_admin_client gives to # retrieve the cluster metadata, which we need to get the numeric id of the # broker spawned by pytest_kafka. metadata = kafka_admin_client.list_topics("__consumer_offsets") broker_ids = [str(broker) for broker in metadata.brokers.keys()] assert len(broker_ids) == 1, "More than one broker found in the kafka cluster?!" # Pull the current broker configuration. describe_configs and alter_configs # generate a dict containing one concurrent.future per queried # ConfigResource, hence the use of .result() broker = ConfigResource("broker", broker_ids[0]) futures = kafka_admin_client.describe_configs([broker]) original_config = futures[broker].result() # Gather the list of settings we need to change in the broker # ConfigResource, and their original values in the to_restore dict to_restore = {} for key, new_value in kafka_server_config_overrides.items(): if key not in original_config: raise ValueError(f"Cannot override unknown configuration {key}") orig_value = original_config[key].value if orig_value == new_value: continue if original_config[key].is_read_only: raise ValueError(f"Cannot override read-only configuration {key}") broker.set_config(key, new_value) to_restore[key] = orig_value # to_restore will be empty if all the config "overrides" are equal to the # original value. No need to wait for a config alteration if that's the # case. The result() will raise a KafkaException if the settings change # failed. if to_restore: futures = kafka_admin_client.alter_configs([broker]) try: futures[broker].result() except Exception: raise yield session_kafka_server # Now we can restore the old setting values. Again, the result() will raise # a KafkaException if the settings change failed. if to_restore: for key, orig_value in to_restore.items(): broker.set_config(key, orig_value) futures = kafka_admin_client.alter_configs([broker]) try: futures[broker].result() except Exception: raise TEST_CONFIG = { "consumer_id": "swh.journal.consumer", "object_types": TEST_OBJECT_DICTS.keys(), "stop_after_objects": 1, # will read 1 object and stop "storage": {"cls": "memory", "args": {}}, } @pytest.fixture def test_config(kafka_server: Tuple[Popen, int], kafka_prefix: str): """Test configuration needed for producer/consumer """ _, port = kafka_server return { **TEST_CONFIG, "brokers": ["127.0.0.1:{}".format(port)], "prefix": kafka_prefix + ".swh.journal.objects", } @pytest.fixture def consumer( kafka_server: Tuple[Popen, int], test_config: Dict, kafka_consumer_group: str, ) -> Consumer: """Get a connected Kafka consumer. """ _, kafka_port = kafka_server consumer = Consumer( { "bootstrap.servers": "127.0.0.1:{}".format(kafka_port), "auto.offset.reset": "earliest", "enable.auto.commit": True, "group.id": kafka_consumer_group, } ) kafka_topics = [ "%s.%s" % (test_config["prefix"], object_type) for object_type in test_config["object_types"] ] consumer.subscribe(kafka_topics) yield consumer consumer.close() def objects_d(): return one_of( strategies.origins().map(lambda x: ("origin", x.to_dict())), strategies.origin_visits().map(lambda x: ("origin_visit", x.to_dict())), strategies.snapshots().map(lambda x: ("snapshot", x.to_dict())), strategies.releases().map(lambda x: ("release", x.to_dict())), strategies.revisions().map(lambda x: ("revision", x.to_dict())), strategies.directories().map(lambda x: ("directory", x.to_dict())), strategies.skipped_contents().map(lambda x: ("skipped_content", x.to_dict())), strategies.present_contents().map(lambda x: ("content", x.to_dict())), ) diff --git a/swh/journal/tests/test_serializers.py b/swh/journal/tests/test_serializers.py index 0814bf4..376cb23 100644 --- a/swh/journal/tests/test_serializers.py +++ b/swh/journal/tests/test_serializers.py @@ -1,27 +1,36 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import OrderedDict import itertools from swh.journal import serializers +from .conftest import TEST_OBJECTS + def test_key_to_kafka_repeatable(): """Check the kafka key encoding is repeatable""" base_dict = { "a": "foo", "b": "bar", "c": "baz", } key = serializers.key_to_kafka(base_dict) for dict_keys in itertools.permutations(base_dict): d = OrderedDict() for k in dict_keys: d[k] = base_dict[k] assert key == serializers.key_to_kafka(d) + + +def test_get_key(): + """Test whether get_key works on all our objects""" + for object_type, objects in TEST_OBJECTS.items(): + for obj in objects: + assert serializers.object_key(object_type, obj) is not None diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py index e19484a..a5ba778 100644 --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -1,180 +1,140 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging -from typing import Dict, Iterable, Optional, Type, Union, overload +from typing import Dict, Iterable, Optional, Type from confluent_kafka import Producer, KafkaException -from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.model import ( BaseModel, Content, Directory, Origin, OriginVisit, Release, Revision, SkippedContent, Snapshot, ) -from swh.journal.serializers import KeyType, key_to_kafka, value_to_kafka +from swh.journal.serializers import ( + KeyType, + ModelObject, + object_key, + key_to_kafka, + value_to_kafka, +) logger = logging.getLogger(__name__) OBJECT_TYPES: Dict[Type[BaseModel], str] = { Content: "content", Directory: "directory", Origin: "origin", OriginVisit: "origin_visit", Release: "release", Revision: "revision", SkippedContent: "skipped_content", Snapshot: "snapshot", } -ModelObject = Union[ - Content, Directory, Origin, OriginVisit, Release, Revision, SkippedContent, Snapshot -] - class KafkaJournalWriter: """This class is instantiated and used by swh-storage to write incoming new objects to Kafka before adding them to the storage backend (eg. postgresql) itself. Args: brokers: list of broker addresses and ports prefix: the prefix used to build the topic names for objects client_id: the id of the writer sent to kafka producer_config: extra configuration keys passed to the `Producer` """ def __init__( self, brokers: Iterable[str], prefix: str, client_id: str, producer_config: Optional[Dict] = None, ): self._prefix = prefix if not producer_config: producer_config = {} if "message.max.bytes" not in producer_config: producer_config = { "message.max.bytes": 100 * 1024 * 1024, **producer_config, } self.producer = Producer( { "bootstrap.servers": ",".join(brokers), "client.id": client_id, "on_delivery": self._on_delivery, "error_cb": self._error_cb, "logger": logger, "acks": "all", **producer_config, } ) def _error_cb(self, error): if error.fatal(): raise KafkaException(error) logger.info("Received non-fatal kafka error: %s", error) def _on_delivery(self, error, message): if error is not None: self._error_cb(error) def send(self, topic: str, key: KeyType, value): kafka_key = key_to_kafka(key) self.producer.produce( topic=topic, key=kafka_key, value=value_to_kafka(value), ) # Need to service the callbacks regularly by calling poll self.producer.poll(0) def flush(self): self.producer.flush() - # these @overload'ed versions of the _get_key method aim at helping mypy figuring - # the correct type-ing. - @overload - def _get_key( - self, object_type: str, object_: Union[Revision, Release, Directory, Snapshot] - ) -> bytes: - ... - - @overload - def _get_key(self, object_type: str, object_: Content) -> bytes: - ... - - @overload - def _get_key(self, object_type: str, object_: SkippedContent) -> Dict[str, bytes]: - ... - - @overload - def _get_key(self, object_type: str, object_: Origin) -> Dict[str, bytes]: - ... - - @overload - def _get_key(self, object_type: str, object_: OriginVisit) -> Dict[str, str]: - ... - - def _get_key(self, object_type: str, object_) -> KeyType: - if object_type in ("revision", "release", "directory", "snapshot"): - return object_.id - elif object_type == "content": - return object_.sha1 # TODO: use a dict of hashes - elif object_type == "skipped_content": - return {hash: getattr(object_, hash) for hash in DEFAULT_ALGORITHMS} - elif object_type == "origin": - return {"url": object_.url} - elif object_type == "origin_visit": - return { - "origin": object_.origin, - "date": str(object_.date), - } - else: - raise ValueError("Unknown object type: %s." % object_type) - def _sanitize_object( self, object_type: str, object_: ModelObject ) -> Dict[str, str]: dict_ = object_.to_dict() if object_type == "origin_visit": # :( dict_["date"] = str(dict_["date"]) if object_type == "content": dict_.pop("data", None) return dict_ def _write_addition(self, object_type: str, object_: ModelObject) -> None: """Write a single object to the journal""" topic = f"{self._prefix}.{object_type}" - key = self._get_key(object_type, object_) + key = object_key(object_type, object_) dict_ = self._sanitize_object(object_type, object_) logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) self.send(topic, key=key, value=dict_) def write_addition(self, object_type: str, object_: ModelObject) -> None: """Write a single object to the journal""" self._write_addition(object_type, object_) self.flush() write_update = write_addition def write_additions(self, object_type: str, objects: Iterable[ModelObject]) -> None: """Write a set of objects to the journal""" for object_ in objects: self._write_addition(object_type, object_) self.flush()