diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,2 @@ swh.core[db,http] >= 0.0.60 -swh.model >= 0.0.61 +swh.model >= 0.2 diff --git a/swh/journal/client.py b/swh/journal/client.py --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -92,6 +92,7 @@ group_id: str, prefix: Optional[str] = None, object_types: Optional[List[str]] = None, + privileged: bool = False, stop_after_objects: Optional[int] = None, batch_size: int = 200, process_timeout: Optional[float] = None, @@ -164,33 +165,42 @@ logger.debug("Consumer settings: %s", consumer_settings) self.consumer = Consumer(consumer_settings) - - existing_topics = self.consumer.list_topics(timeout=10).topics.keys() - if not any(topic.startswith(f"{prefix}.") for topic in existing_topics): + if privileged: + privileged_prefix = f"{prefix}_privileged" + else: # do not attempt to subscribe to privileged topics + privileged_prefix = f"{prefix}" + existing_topics = [ + topic + for topic in self.consumer.list_topics(timeout=10).topics.keys() + if ( + topic.startswith(f"{prefix}.") + or topic.startswith(f"{privileged_prefix}.") + ) + ] + if not existing_topics: raise ValueError( f"The prefix {prefix} does not match any existing topic " "on the kafka broker" ) - if object_types: - unknown_topics = [] - for object_type in object_types: - topic = f"{prefix}.{object_type}" - if topic not in existing_topics: - unknown_topics.append(topic) - if unknown_topics: - raise ValueError( - f"Topic(s) {','.join(unknown_topics)} " - "are unknown on the kafka broker" - ) - self.subscription = [ - f"{prefix}.{object_type}" for object_type in object_types - ] - else: - # subscribe to every topic under the prefix - self.subscription = [ - topic for topic in existing_topics if topic.startswith(prefix) - ] + if not object_types: + object_types = list({topic.split(".")[-1] for topic in existing_topics}) + + self.subscription = [] + unknown_types = [] + for object_type in object_types: + topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}") + for topic in topics: + if topic in existing_topics: + self.subscription.append(topic) + break + else: + unknown_types.append(object_type) + if unknown_types: + raise ValueError( + f"Topic(s) for object types {','.join(unknown_types)} " + "are unknown on the kafka broker" + ) logger.debug(f"Upstream topics: {existing_topics}") self.subscribe() diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py --- a/swh/journal/pytest_plugin.py +++ b/swh/journal/pytest_plugin.py @@ -48,7 +48,9 @@ fetched_messages += 1 topic = msg.topic() - assert topic.startswith(kafka_prefix + "."), "Unexpected topic" + assert topic.startswith(f"{kafka_prefix}.") or topic.startswith( + f"{kafka_prefix}_privileged." + ), "Unexpected topic" object_type = topic[len(kafka_prefix + ".") :] consumed_messages[object_type].append( @@ -100,14 +102,31 @@ return set(TEST_OBJECT_DICTS.keys()) +@pytest.fixture(scope="function") +def privileged_object_types(): + """Set of object types to precreate privileged topics for.""" + return {"revision", "release"} + + @pytest.fixture(scope="function") def kafka_server( - kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str] + kafka_server_base: str, + kafka_prefix: str, + object_types: Iterator[str], + privileged_object_types: Iterator[str], ) -> str: """A kafka server with existing topics - topics are built from the ``kafka_prefix`` and the ``object_types`` list""" - topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type + from the ``object_types`` list. + + Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with + object_type from the ``privileged_object_types`` list. + + """ + topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [ + f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types + ] # unfortunately, the Mock broker does not support the CreatTopic admin API, so we # have to create topics using a Producer. @@ -156,13 +175,19 @@ @pytest.fixture -def test_config(kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str]): +def test_config( + kafka_server_base: str, + kafka_prefix: str, + object_types: Iterator[str], + privileged_object_types: Iterator[str], +): """Test configuration needed for producer/consumer """ return { **TEST_CONFIG, "object_types": object_types, + "privileged_object_types": privileged_object_types, "brokers": [kafka_server_base], "prefix": kafka_prefix, } @@ -170,7 +195,7 @@ @pytest.fixture def consumer( - kafka_server: str, test_config: Dict, kafka_consumer_group: str, + kafka_server: str, test_config: Dict, kafka_consumer_group: str ) -> Consumer: """Get a connected Kafka consumer. @@ -183,12 +208,13 @@ "group.id": kafka_consumer_group, } ) - + prefix = test_config["prefix"] kafka_topics = [ - "%s.%s" % (test_config["prefix"], object_type) - for object_type in test_config["object_types"] + f"{prefix}.{object_type}" for object_type in test_config["object_types"] + ] + [ + f"{prefix}_privileged.{object_type}" + for object_type in test_config["privileged_object_types"] ] - consumer.subscribe(kafka_topics) yield consumer diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -241,3 +241,90 @@ stop_after_objects=1, object_types=["else"], ) + + +def test_client_subscriptions_with_anonymized_topics( + kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str +): + producer = Producer( + { + "bootstrap.servers": kafka_server_base, + "client.id": "test producer", + "acks": "all", + } + ) + + # Fill Kafka with revision object on both the regular prefix (normally for + # anonymized objects in this case) and privileged one + producer.produce( + topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), + ) + producer.produce( + topic=kafka_prefix + "_privileged.revision", + key=REV["id"], + value=value_to_kafka(REV), + ) + producer.flush() + + # without privileged "channels" activated on the client side + client = JournalClient( + brokers=[kafka_server_base], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=1, + privileged=False, + ) + # we only subscribed to "standard" topics + assert client.subscription == [kafka_prefix + ".revision"] + + # with privileged "channels" activated on the client side + client = JournalClient( + brokers=[kafka_server_base], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=1, + privileged=True, + ) + # we only subscribed to "privileged" topics + assert client.subscription == [kafka_prefix + "_privileged.revision"] + + +def test_client_subscriptions_without_anonymized_topics( + kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str +): + producer = Producer( + { + "bootstrap.servers": kafka_server_base, + "client.id": "test producer", + "acks": "all", + } + ) + + # Fill Kafka with revision objects only on the standard prefix + producer.produce( + topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), + ) + producer.flush() + + # without privileged channel activated on the client side + client = JournalClient( + brokers=[kafka_server_base], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=1, + privileged=False, + ) + # we only subscribed to the standard prefix + assert client.subscription == [kafka_prefix + ".revision"] + + # with privileged channel activated on the client side + client = JournalClient( + brokers=[kafka_server_base], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=1, + privileged=True, + ) + # we also only subscribed to the standard prefix, since there is no priviled prefix + # on the kafka broker + assert client.subscription == [kafka_prefix + ".revision"] diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -3,19 +3,29 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Iterable + import pytest from confluent_kafka import Consumer, Producer -from swh.model.model import Directory +from swh.model.model import Directory, Revision, Release from swh.journal.tests.journal_data import TEST_OBJECTS from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError -def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer): +def test_kafka_writer( + kafka_prefix: str, + kafka_server: str, + consumer: Consumer, + privileged_object_types: Iterable[str], +): writer = KafkaJournalWriter( - brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, + brokers=[kafka_server], + client_id="kafka_writer", + prefix=kafka_prefix, + anonymize=False, ) expected_messages = 0 @@ -27,6 +37,65 @@ consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) + for key, obj_dict in consumed_messages["revision"]: + obj = Revision.from_dict(obj_dict) + for person in (obj.author, obj.committer): + assert not ( + len(person.fullname) == 32 + and person.name is None + and person.email is None + ) + for key, obj_dict in consumed_messages["release"]: + obj = Release.from_dict(obj_dict) + for person in (obj.author,): + assert not ( + len(person.fullname) == 32 + and person.name is None + and person.email is None + ) + + +def test_kafka_writer_anonymized( + kafka_prefix: str, + kafka_server: str, + consumer: Consumer, + privileged_object_types: Iterable[str], +): + writer = KafkaJournalWriter( + brokers=[kafka_server], + client_id="kafka_writer", + prefix=kafka_prefix, + anonymize=True, + ) + + expected_messages = 0 + + for object_type, objects in TEST_OBJECTS.items(): + writer.write_additions(object_type, objects) + expected_messages += len(objects) + if object_type in privileged_object_types: + expected_messages += len(objects) + + consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) + assert_all_objects_consumed(consumed_messages) + + for key, obj_dict in consumed_messages["revision"]: + obj = Revision.from_dict(obj_dict) + for person in (obj.author, obj.committer): + assert ( + len(person.fullname) == 32 + and person.name is None + and person.email is None + ) + for key, obj_dict in consumed_messages["release"]: + obj = Release.from_dict(obj_dict) + for person in (obj.author,): + assert ( + len(person.fullname) == 32 + and person.name is None + and person.email is None + ) + def test_write_delivery_failure( kafka_prefix: str, kafka_server: str, consumer: Consumer diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py --- a/swh/journal/tests/test_pytest_plugin.py +++ b/swh/journal/tests/test_pytest_plugin.py @@ -20,9 +20,14 @@ def test_kafka_server_with_topics( - kafka_server: str, kafka_prefix: str, object_types: Iterator[str] + kafka_server: str, + kafka_prefix: str, + object_types: Iterator[str], + privileged_object_types: Iterator[str], ): admin = AdminClient({"bootstrap.servers": kafka_server}) + + # check unprivileged topics are present topics = { topic for topic in admin.list_topics().topics @@ -30,6 +35,16 @@ } assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types} + # check privileged topics are present + topics = { + topic + for topic in admin.list_topics().topics + if topic.startswith(f"{kafka_prefix}_privileged.") + } + assert topics == { + f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types + } + def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str): assert test_config == { @@ -46,6 +61,7 @@ "snapshot", "skipped_content", }, + "privileged_object_types": {"release", "revision",}, "brokers": [kafka_server_base], "prefix": kafka_prefix, } diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py --- a/swh/journal/writer/inmemory.py +++ b/swh/journal/writer/inmemory.py @@ -20,13 +20,21 @@ # Share the list of objects across processes, for RemoteAPI tests. self.manager = Manager() self.objects = self.manager.list() + self.privileged_objects = self.manager.list() - def write_addition(self, object_type: str, object_: ModelObject) -> None: + def write_addition( + self, object_type: str, object_: ModelObject, privileged: bool = False + ) -> None: assert isinstance(object_, BaseModel) - self.objects.append((object_type, object_)) + if privileged: + self.privileged_objects.append((object_type, object_)) + else: + self.objects.append((object_type, object_)) write_update = write_addition - def write_additions(self, object_type: str, objects: List[ModelObject]) -> None: + def write_additions( + self, object_type: str, objects: List[ModelObject], privileged: bool = False + ) -> None: for object_ in objects: - self.write_addition(object_type, object_) + self.write_addition(object_type, object_, privileged) diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -60,18 +60,34 @@ class KafkaJournalWriter: - """This class is instantiated and used by swh-storage to write incoming - new objects to Kafka before adding them to the storage backend - (eg. postgresql) itself. + """This class is used to write serialized versions of swh.model.model objects to a + series of Kafka topics. + + Topics used to send objects representations are built from a ``prefix`` plus the + type of the object: + + ``{prefix}.{object_type}`` + + Objects can be sent as is, or can be anonymized. The anonymization feature, when + activated, will write anonymized versions of model objects in the main topic, and + stock (non-anonymized) objects will be sent to a dedicated (privileged) set of + topics: + + ``{prefix}_privileged.{object_type}`` + + The anonymization of a swh.model object is the result of calling its + ``BaseModel.anonymize()`` method. An object is considered anonymizable if this + method returns a (non-None) value. Args: - brokers: list of broker addresses and ports - prefix: the prefix used to build the topic names for objects - client_id: the id of the writer sent to kafka - producer_config: extra configuration keys passed to the `Producer` + brokers: list of broker addresses and ports. + prefix: the prefix used to build the topic names for objects. + client_id: the id of the writer sent to kafka. + producer_config: extra configuration keys passed to the `Producer`. flush_timeout: timeout, in seconds, after which the `flush` operation will fail if some message deliveries are still pending. - producer_class: override for the kafka producer class + producer_class: override for the kafka producer class. + anonymize: if True, activate the anonymization feature. """ @@ -83,8 +99,11 @@ producer_config: Optional[Dict] = None, flush_timeout: float = 120, producer_class: Type[Producer] = Producer, + anonymize: bool = False, ): self._prefix = prefix + self._prefix_privileged = f"{self._prefix}_privileged" + self.anonymize = anonymize if not producer_config: producer_config = {} @@ -190,8 +209,20 @@ def _write_addition(self, object_type: str, object_: ModelObject) -> None: """Write a single object to the journal""" - topic = f"{self._prefix}.{object_type}" key = object_key(object_type, object_) + + if self.anonymize: + anon_object_ = object_.anonymize() + if anon_object_: # can be either None, or an anonymized object + # if the object is anonymizable, send the non-anonymized version in the + # privileged channel + topic = f"{self._prefix_privileged}.{object_type}" + dict_ = self._sanitize_object(object_type, object_) + logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) + self.send(topic, key=key, value=dict_) + object_ = anon_object_ + + topic = f"{self._prefix}.{object_type}" dict_ = self._sanitize_object(object_type, object_) logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) self.send(topic, key=key, value=dict_)