diff --git a/PKG-INFO b/PKG-INFO index bf26bc5..9ab960a 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,72 +1,72 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.1.0 +Version: 0.2.0 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/requirements-swh.txt b/requirements-swh.txt index 8f55b09..3e9b72d 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,2 @@ swh.core[db,http] >= 0.0.60 -swh.model >= 0.0.61 +swh.model >= 0.3 diff --git a/swh.journal.egg-info/PKG-INFO b/swh.journal.egg-info/PKG-INFO index bf26bc5..9ab960a 100644 --- a/swh.journal.egg-info/PKG-INFO +++ b/swh.journal.egg-info/PKG-INFO @@ -1,72 +1,72 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.1.0 +Version: 0.2.0 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-journal/ Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.journal.egg-info/requires.txt b/swh.journal.egg-info/requires.txt index 7e018c5..e0ca22d 100644 --- a/swh.journal.egg-info/requires.txt +++ b/swh.journal.egg-info/requires.txt @@ -1,10 +1,10 @@ confluent-kafka msgpack tenacity vcversioner swh.core[db,http]>=0.0.60 -swh.model>=0.0.61 +swh.model>=0.3 [testing] pytest hypothesis diff --git a/swh/journal/client.py b/swh/journal/client.py index 039025d..68b9a22 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,295 +1,305 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging import os import time from typing import Any, Dict, List, Optional, Set, Tuple, Union from confluent_kafka import Consumer, KafkaException, KafkaError from .serializers import kafka_to_value from swh.journal import DEFAULT_PREFIX logger = logging.getLogger(__name__) rdkafka_logger = logging.getLogger(__name__ + ".rdkafka") # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ["earliest", "latest"] # Errors that Kafka raises too often and are not useful; therefore they # we lower their log level to DEBUG instead of INFO. _SPAMMY_ERRORS = [ KafkaError._NO_OFFSET, ] def get_journal_client(cls: str, **kwargs: Any): """Factory function to instantiate a journal client object. Currently, only the "kafka" journal client is supported. """ if cls == "kafka": return JournalClient(**kwargs) raise ValueError("Unknown journal client class `%s`" % cls) def _error_cb(error): if error.fatal(): raise KafkaException(error) if error.code() in _SPAMMY_ERRORS: logger.debug("Received non-fatal kafka error: %s", error) else: logger.info("Received non-fatal kafka error: %s", error) def _on_commit(error, partitions): if error is not None: _error_cb(error) class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the `prefix` argument is None (default value), it will take the default value `'swh.journal.objects'`. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all existing kafka topic under the prefix). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. Messages are processed by the `worker_fn` callback passed to the `process` method, in batches of maximum `batch_size` messages (defaults to 200). If set, the processing stops after processing `stop_after_objects` messages in total. `stop_on_eof` stops the processing when the client has reached the end of each partition in turn. `auto_offset_reset` sets the behavior of the client when the consumer group initializes: `'earliest'` (the default) processes all objects since the inception of the topics; `''` Any other named argument is passed directly to KafkaConsumer(). """ def __init__( self, brokers: Union[str, List[str]], group_id: str, prefix: Optional[str] = None, object_types: Optional[List[str]] = None, + privileged: bool = False, stop_after_objects: Optional[int] = None, batch_size: int = 200, process_timeout: Optional[float] = None, auto_offset_reset: str = "earliest", stop_on_eof: bool = False, **kwargs, ): if prefix is None: prefix = DEFAULT_PREFIX if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( "Option 'auto_offset_reset' only accept %s, not %s" % (ACCEPTED_OFFSET_RESET, auto_offset_reset) ) if batch_size <= 0: raise ValueError("Option 'batch_size' needs to be positive") self.value_deserializer = kafka_to_value if isinstance(brokers, str): brokers = [brokers] debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) if debug_logging and "debug" not in kwargs: kwargs["debug"] = "consumer" # Static group instance id management group_instance_id = os.environ.get("KAFKA_GROUP_INSTANCE_ID") if group_instance_id: kwargs["group.instance.id"] = group_instance_id if "group.instance.id" in kwargs: # When doing static consumer group membership, set a higher default # session timeout. The session timeout is the duration after which # the broker considers that a consumer has left the consumer group # for good, and triggers a rebalance. Considering our current # processing pattern, 10 minutes gives the consumer ample time to # restart before that happens. if "session.timeout.ms" not in kwargs: kwargs["session.timeout.ms"] = 10 * 60 * 1000 # 10 minutes if "session.timeout.ms" in kwargs: # When the session timeout is set, rdkafka requires the max poll # interval to be set to a higher value; the max poll interval is # rdkafka's way of figuring out whether the client's message # processing thread has stalled: when the max poll interval lapses # between two calls to consumer.poll(), rdkafka leaves the consumer # group and terminates the connection to the brokers. # # We default to 1.5 times the session timeout if "max.poll.interval.ms" not in kwargs: kwargs["max.poll.interval.ms"] = kwargs["session.timeout.ms"] // 2 * 3 consumer_settings = { **kwargs, "bootstrap.servers": ",".join(brokers), "auto.offset.reset": auto_offset_reset, "group.id": group_id, "on_commit": _on_commit, "error_cb": _error_cb, "enable.auto.commit": False, "logger": rdkafka_logger, } self.stop_on_eof = stop_on_eof if self.stop_on_eof: consumer_settings["enable.partition.eof"] = True logger.debug("Consumer settings: %s", consumer_settings) self.consumer = Consumer(consumer_settings) - - existing_topics = self.consumer.list_topics(timeout=10).topics.keys() - if not any(topic.startswith(f"{prefix}.") for topic in existing_topics): + if privileged: + privileged_prefix = f"{prefix}_privileged" + else: # do not attempt to subscribe to privileged topics + privileged_prefix = f"{prefix}" + existing_topics = [ + topic + for topic in self.consumer.list_topics(timeout=10).topics.keys() + if ( + topic.startswith(f"{prefix}.") + or topic.startswith(f"{privileged_prefix}.") + ) + ] + if not existing_topics: raise ValueError( f"The prefix {prefix} does not match any existing topic " "on the kafka broker" ) - if object_types: - unknown_topics = [] - for object_type in object_types: - topic = f"{prefix}.{object_type}" - if topic not in existing_topics: - unknown_topics.append(topic) - if unknown_topics: - raise ValueError( - f"Topic(s) {','.join(unknown_topics)} " - "are unknown on the kafka broker" - ) - self.subscription = [ - f"{prefix}.{object_type}" for object_type in object_types - ] - else: - # subscribe to every topic under the prefix - self.subscription = [ - topic for topic in existing_topics if topic.startswith(prefix) - ] + if not object_types: + object_types = list({topic.split(".")[-1] for topic in existing_topics}) + + self.subscription = [] + unknown_types = [] + for object_type in object_types: + topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}") + for topic in topics: + if topic in existing_topics: + self.subscription.append(topic) + break + else: + unknown_types.append(object_type) + if unknown_types: + raise ValueError( + f"Topic(s) for object types {','.join(unknown_types)} " + "are unknown on the kafka broker" + ) logger.debug(f"Upstream topics: {existing_topics}") self.subscribe() self.stop_after_objects = stop_after_objects self.process_timeout = process_timeout self.eof_reached: Set[Tuple[str, str]] = set() self.batch_size = batch_size def subscribe(self): """Subscribe to topics listed in self.subscription This can be overridden if you need, for instance, to manually assign partitions. """ logger.debug(f"Subscribing to: {self.subscription}") self.consumer.subscribe(topics=self.subscription) def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ start_time = time.monotonic() total_objects_processed = 0 while True: # timeout for message poll timeout = 1.0 elapsed = time.monotonic() - start_time if self.process_timeout: # +0.01 to prevent busy-waiting on / spamming consumer.poll. # consumer.consume() returns shortly before X expired # (a matter of milliseconds), so after it returns a first # time, it would then be called with a timeout in the order # of milliseconds, therefore returning immediately, then be # called again, etc. if elapsed + 0.01 >= self.process_timeout: break timeout = self.process_timeout - elapsed batch_size = self.batch_size if self.stop_after_objects: if total_objects_processed >= self.stop_after_objects: break # clamp batch size to avoid overrunning stop_after_objects batch_size = min( self.stop_after_objects - total_objects_processed, batch_size, ) messages = self.consumer.consume(timeout=timeout, num_messages=batch_size) if not messages: continue batch_processed, at_eof = self.handle_messages(messages, worker_fn) total_objects_processed += batch_processed if at_eof: break return total_objects_processed def handle_messages(self, messages, worker_fn): objects: Dict[str, List[Any]] = defaultdict(list) nb_processed = 0 for message in messages: error = message.error() if error is not None: if error.code() == KafkaError._PARTITION_EOF: self.eof_reached.add((message.topic(), message.partition())) else: _error_cb(error) continue if message.value() is None: # ignore message with no payload, these can be generated in tests continue nb_processed += 1 object_type = message.topic().split(".")[-1] objects[object_type].append(self.deserialize_message(message)) if objects: worker_fn(dict(objects)) self.consumer.commit() at_eof = self.stop_on_eof and all( (tp.topic, tp.partition) in self.eof_reached for tp in self.consumer.assignment() ) return nb_processed, at_eof def deserialize_message(self, message): return self.value_deserializer(message.value()) def close(self): self.consumer.close() diff --git a/swh/journal/pytest_plugin.py b/swh/journal/pytest_plugin.py index 1b4efbb..6a5f5f5 100644 --- a/swh/journal/pytest_plugin.py +++ b/swh/journal/pytest_plugin.py @@ -1,196 +1,236 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random import string -from typing import Dict, Iterator +from typing import Collection, Dict, Iterator, Optional from collections import defaultdict import pytest from confluent_kafka import Consumer, KafkaException, Producer from confluent_kafka.admin import AdminClient +from swh.model.hashutil import hash_to_hex from swh.journal.serializers import object_key, kafka_to_key, kafka_to_value from swh.journal.tests.journal_data import TEST_OBJECTS, TEST_OBJECT_DICTS def consume_messages(consumer, kafka_prefix, expected_messages): """Consume expected_messages from the consumer; Sort them all into a consumed_objects dict""" consumed_messages = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError( "Timed out fetching messages from kafka. " f"Only {fetched_messages}/{expected_messages} fetched" ) msg = consumer.poll(timeout=0.01) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 topic = msg.topic() - assert topic.startswith(kafka_prefix + "."), "Unexpected topic" + assert topic.startswith(f"{kafka_prefix}.") or topic.startswith( + f"{kafka_prefix}_privileged." + ), "Unexpected topic" object_type = topic[len(kafka_prefix + ".") :] consumed_messages[object_type].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) return consumed_messages -def assert_all_objects_consumed(consumed_messages): - """Check whether all objects from TEST_OBJECT_DICTS have been consumed""" +def assert_all_objects_consumed( + consumed_messages: Dict, exclude: Optional[Collection] = None +): + """Check whether all objects from TEST_OBJECT_DICTS have been consumed + + `exclude` can be a list of object types for which we do not want to compare the + values (eg. for anonymized object). + + """ for object_type, known_values in TEST_OBJECT_DICTS.items(): known_keys = [object_key(object_type, obj) for obj in TEST_OBJECTS[object_type]] if not consumed_messages[object_type]: return (received_keys, received_values) = zip(*consumed_messages[object_type]) - if object_type == "origin_visit": - for value in received_values: - del value["visit"] - elif object_type == "content": + if object_type in ("content", "skipped_content"): for value in received_values: del value["ctime"] for key in known_keys: - assert key in received_keys + assert key in received_keys, ( + f"expected {object_type} key {hash_to_hex(key)} " + "absent from consumed messages" + ) + + if exclude and object_type in exclude: + continue for value in known_values: - assert value in received_values + assert value in received_values, ( + f"expected {object_type} value {value!r} is " + "absent from consumed messages" + ) @pytest.fixture(scope="function") def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" return "".join(random.choice(string.ascii_lowercase) for _ in range(10)) @pytest.fixture(scope="function") def kafka_consumer_group(kafka_prefix: str): """Pick a random consumer group for kafka consumers on each call""" return "test-consumer-%s" % kafka_prefix @pytest.fixture(scope="function") def object_types(): """Set of object types to precreate topics for.""" return set(TEST_OBJECT_DICTS.keys()) +@pytest.fixture(scope="function") +def privileged_object_types(): + """Set of object types to precreate privileged topics for.""" + return {"revision", "release"} + + @pytest.fixture(scope="function") def kafka_server( - kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str] + kafka_server_base: str, + kafka_prefix: str, + object_types: Iterator[str], + privileged_object_types: Iterator[str], ) -> str: """A kafka server with existing topics - topics are built from the ``kafka_prefix`` and the ``object_types`` list""" - topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + Unprivileged topics are built as ``{kafka_prefix}.{object_type}`` with object_type + from the ``object_types`` list. + + Privileged topics are built as ``{kafka_prefix}_privileged.{object_type}`` with + object_type from the ``privileged_object_types`` list. + + """ + topics = [f"{kafka_prefix}.{obj}" for obj in object_types] + [ + f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types + ] # unfortunately, the Mock broker does not support the CreatTopic admin API, so we # have to create topics using a Producer. producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "bootstrap producer", "acks": "all", } ) for topic in topics: producer.produce(topic=topic, value=None) for i in range(10): if producer.flush(0.1) == 0: break return kafka_server_base @pytest.fixture(scope="session") def kafka_server_base() -> Iterator[str]: """Create a mock kafka cluster suitable for tests. Yield a connection string. Note: this is a generator to keep the mock broker alive during the whole test session. see https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_mock.h """ admin = AdminClient({"test.mock.num.brokers": "1"}) metadata = admin.list_topics() brokers = [str(broker) for broker in metadata.brokers.values()] assert len(brokers) == 1, "More than one broker found in the kafka cluster?!" broker_connstr, broker_id = brokers[0].split("/") yield broker_connstr TEST_CONFIG = { "consumer_id": "swh.journal.consumer", "stop_after_objects": 1, # will read 1 object and stop "storage": {"cls": "memory", "args": {}}, } @pytest.fixture -def test_config(kafka_server_base: str, kafka_prefix: str, object_types: Iterator[str]): +def test_config( + kafka_server_base: str, + kafka_prefix: str, + object_types: Iterator[str], + privileged_object_types: Iterator[str], +): """Test configuration needed for producer/consumer """ return { **TEST_CONFIG, "object_types": object_types, + "privileged_object_types": privileged_object_types, "brokers": [kafka_server_base], "prefix": kafka_prefix, } @pytest.fixture def consumer( - kafka_server: str, test_config: Dict, kafka_consumer_group: str, + kafka_server: str, test_config: Dict, kafka_consumer_group: str ) -> Consumer: """Get a connected Kafka consumer. """ consumer = Consumer( { "bootstrap.servers": kafka_server, "auto.offset.reset": "earliest", "enable.auto.commit": True, "group.id": kafka_consumer_group, } ) - + prefix = test_config["prefix"] kafka_topics = [ - "%s.%s" % (test_config["prefix"], object_type) - for object_type in test_config["object_types"] + f"{prefix}.{object_type}" for object_type in test_config["object_types"] + ] + [ + f"{prefix}_privileged.{object_type}" + for object_type in test_config["privileged_object_types"] ] - consumer.subscribe(kafka_topics) yield consumer consumer.close() diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py index 986eeb3..30a5796 100644 --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -1,112 +1,121 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Union, overload import msgpack from swh.core.api.serializers import msgpack_dumps, msgpack_loads from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.model import ( Content, Directory, Origin, OriginVisit, Release, Revision, SkippedContent, Snapshot, ) ModelObject = Union[ Content, Directory, Origin, OriginVisit, Release, Revision, SkippedContent, Snapshot ] KeyType = Union[Dict[str, str], Dict[str, bytes], bytes] # these @overload'ed versions of the object_key method aim at helping mypy figuring # the correct type-ing. @overload def object_key( object_type: str, object_: Union[Content, Directory, Revision, Release, Snapshot] ) -> bytes: ... @overload def object_key( object_type: str, object_: Union[Origin, SkippedContent] ) -> Dict[str, bytes]: ... @overload def object_key(object_type: str, object_: OriginVisit) -> Dict[str, str]: ... def object_key(object_type: str, object_) -> KeyType: if object_type in ("revision", "release", "directory", "snapshot"): return object_.id elif object_type == "content": return object_.sha1 # TODO: use a dict of hashes elif object_type == "skipped_content": return {hash: getattr(object_, hash) for hash in DEFAULT_ALGORITHMS} elif object_type == "origin": return {"url": object_.url} elif object_type == "origin_visit": return { "origin": object_.origin, "date": str(object_.date), } else: raise ValueError("Unknown object type: %s." % object_type) def stringify_key_item(k: str, v: Union[str, bytes]) -> str: """Turn the item of a dict key into a string""" if isinstance(v, str): return v if k == "url": return v.decode("utf-8") return v.hex() def pprint_key(key: KeyType) -> str: """Pretty-print a kafka key""" if isinstance(key, dict): return "{%s}" % ", ".join( f"{k}: {stringify_key_item(k, v)}" for k, v in key.items() ) elif isinstance(key, bytes): return key.hex() else: return key def key_to_kafka(key: KeyType) -> bytes: """Serialize a key, possibly a dict, in a predictable way""" p = msgpack.Packer(use_bin_type=True) if isinstance(key, dict): return p.pack_map_pairs(sorted(key.items())) else: return p.pack(key) def kafka_to_key(kafka_key: bytes) -> KeyType: """Deserialize a key""" return msgpack.loads(kafka_key, raw=False) def value_to_kafka(value: Any) -> bytes: """Serialize some data for storage in kafka""" return msgpack_dumps(value) def kafka_to_value(kafka_value: bytes) -> Any: """Deserialize some data stored in kafka""" - return msgpack_loads(kafka_value) + value = msgpack_loads(kafka_value) + if isinstance(value, list): + return tuple(value) + if isinstance(value, dict): + return ensure_tuples(value) + return value + + +def ensure_tuples(value: Dict) -> Dict: + return {k: tuple(v) if isinstance(v, list) else v for k, v in value.items()} diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py index 02bfd0e..ebd07f7 100644 --- a/swh/journal/tests/journal_data.py +++ b/swh/journal/tests/journal_data.py @@ -1,153 +1,276 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime -from typing import Any, Dict, List +from typing import Any, Dict, List, Type from swh.model.hashutil import MultiHash, hash_to_bytes from swh.journal.serializers import ModelObject -from swh.journal.writer.kafka import OBJECT_TYPES +from swh.model.model import ( + BaseModel, + Content, + Directory, + Origin, + OriginVisit, + Release, + Revision, + SkippedContent, + Snapshot, +) + + +OBJECT_TYPES: Dict[Type[BaseModel], str] = { + Content: "content", + Directory: "directory", + Origin: "origin", + OriginVisit: "origin_visit", + Release: "release", + Revision: "revision", + SkippedContent: "skipped_content", + Snapshot: "snapshot", +} + +UTC = datetime.timezone.utc CONTENTS = [ - {**MultiHash.from_data(b"foo").digest(), "length": 3, "status": "visible",}, + { + **MultiHash.from_data(f"foo{i}".encode()).digest(), + "length": 4, + "status": "visible", + } + for i in range(10) +] +SKIPPED_CONTENTS = [ + { + **MultiHash.from_data(f"bar{i}".encode()).digest(), + "length": 4, + "status": "absent", + "reason": f"because chr({i}) != '*'", + } + for i in range(2) ] duplicate_content1 = { "length": 4, "sha1": hash_to_bytes("44973274ccef6ab4dfaaf86599792fa9c3fe4689"), "sha1_git": b"another-foo", "blake2s256": b"another-bar", "sha256": b"another-baz", "status": "visible", } # Craft a sha1 collision duplicate_content2 = duplicate_content1.copy() sha1_array = bytearray(duplicate_content1["sha1_git"]) sha1_array[0] += 1 duplicate_content2["sha1_git"] = bytes(sha1_array) DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] COMMITTERS = [ {"fullname": b"foo", "name": b"foo", "email": b"",}, {"fullname": b"bar", "name": b"bar", "email": b"",}, ] DATES = [ { "timestamp": {"seconds": 1234567891, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, { "timestamp": {"seconds": 1234567892, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, ] REVISIONS = [ { "id": hash_to_bytes("7026b7c1a2af56521e951c01ed20f255fa054238"), "message": b"hello", "date": DATES[0], "committer": COMMITTERS[0], "author": COMMITTERS[0], "committer_date": DATES[0], "type": "git", "directory": b"\x01" * 20, "synthetic": False, "metadata": None, - "parents": [], + "parents": (), }, { "id": hash_to_bytes("368a48fe15b7db2383775f97c6b247011b3f14f4"), "message": b"hello again", "date": DATES[1], "committer": COMMITTERS[1], "author": COMMITTERS[1], "committer_date": DATES[1], "type": "hg", "directory": b"\x02" * 20, "synthetic": False, "metadata": None, - "parents": [], + "parents": (), }, ] RELEASES = [ { "id": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), "name": b"v0.0.1", "date": { "timestamp": {"seconds": 1234567890, "microseconds": 0,}, "offset": 120, "negative_utc": False, }, "author": COMMITTERS[0], "target_type": "revision", "target": b"\x04" * 20, "message": b"foo", "synthetic": False, }, ] ORIGINS = [ {"url": "https://somewhere.org/den/fox",}, {"url": "https://overtherainbow.org/fox/den",}, ] ORIGIN_VISITS = [ { "origin": ORIGINS[0]["url"], "date": "2013-05-07 04:20:39.369271+00:00", - "snapshot": None, # TODO - "status": "ongoing", # TODO + "snapshot": None, + "status": "ongoing", "metadata": {"foo": "bar"}, "type": "git", + "visit": 1, + }, + { + "origin": ORIGINS[1]["url"], + "date": "2014-11-27 17:20:39+00:00", + "snapshot": None, + "status": "ongoing", + "metadata": {"baz": "qux"}, + "type": "hg", + "visit": 1, + }, + { + "origin": ORIGINS[0]["url"], + "date": "2018-11-27 17:20:39+00:00", + "snapshot": None, + "status": "ongoing", + "metadata": {"baz": "qux"}, + "type": "git", + "visit": 2, }, { "origin": ORIGINS[0]["url"], "date": "2018-11-27 17:20:39+00:00", - "snapshot": None, # TODO - "status": "ongoing", # TODO + "snapshot": hash_to_bytes("742cdc6be7bf6e895b055227c2300070f056e07b"), + "status": "full", "metadata": {"baz": "qux"}, "type": "git", + "visit": 3, + }, + { + "origin": ORIGINS[1]["url"], + "date": "2015-11-27 17:20:39+00:00", + "snapshot": hash_to_bytes("ecee48397a92b0d034e9752a17459f3691a73ef9"), + "status": "partial", + "metadata": {"something": "wrong occurred"}, + "type": "hg", + "visit": 2, }, ] + +DIRECTORIES = [ + {"id": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), "entries": ()}, + { + "id": hash_to_bytes("cc13247a0d6584f297ca37b5868d2cbd242aea03"), + "entries": ( + { + "name": b"file1.ext", + "perms": 0o644, + "type": "file", + "target": CONTENTS[0]["sha1_git"], + }, + { + "name": b"dir1", + "perms": 0o755, + "type": "dir", + "target": hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), + }, + { + "name": b"subprepo1", + "perms": 0o160000, + "type": "rev", + "target": REVISIONS[1]["id"], + }, + ), + }, +] + + +SNAPSHOTS = [ + { + "id": hash_to_bytes("742cdc6be7bf6e895b055227c2300070f056e07b"), + "branches": { + b"master": {"target_type": "revision", "target": REVISIONS[0]["id"]} + }, + }, + { + "id": hash_to_bytes("ecee48397a92b0d034e9752a17459f3691a73ef9"), + "branches": { + b"target/revision": { + "target_type": "revision", + "target": REVISIONS[0]["id"], + }, + b"target/alias": {"target_type": "alias", "target": b"target/revision"}, + b"target/directory": { + "target_type": "directory", + "target": DIRECTORIES[0]["id"], + }, + b"target/release": {"target_type": "release", "target": RELEASES[0]["id"]}, + b"target/snapshot": { + "target_type": "snapshot", + "target": hash_to_bytes("742cdc6be7bf6e895b055227c2300070f056e07b"), + }, + }, + }, +] + + TEST_OBJECT_DICTS: Dict[str, List[Dict[str, Any]]] = { "content": CONTENTS, - "directory": [], + "directory": DIRECTORIES, "origin": ORIGINS, "origin_visit": ORIGIN_VISITS, "release": RELEASES, "revision": REVISIONS, - "snapshot": [], - "skipped_content": [], + "snapshot": SNAPSHOTS, + "skipped_content": SKIPPED_CONTENTS, } MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()} TEST_OBJECTS: Dict[str, List[ModelObject]] = {} for object_type, objects in TEST_OBJECT_DICTS.items(): converted_objects: List[ModelObject] = [] model = MODEL_OBJECTS[object_type] for (num, obj_d) in enumerate(objects): - if object_type == "origin_visit": - obj_d = {**obj_d, "visit": num} - elif object_type == "content": - obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now()} + if object_type == "content": + obj_d = {**obj_d, "data": b"", "ctime": datetime.datetime.now(tz=UTC)} converted_objects.append(model.from_dict(obj_d)) TEST_OBJECTS[object_type] = converted_objects diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py index bf795ee..66278b8 100644 --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -1,243 +1,330 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, List from unittest.mock import MagicMock from confluent_kafka import Producer import pytest from swh.model.model import Content from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka REV = { "message": b"something cool", "author": {"fullname": b"Peter", "name": None, "email": b"peter@ouiche.lo"}, "committer": {"fullname": b"Stephen", "name": b"From Outer Space", "email": None}, "date": { "timestamp": {"seconds": 123456789, "microseconds": 123}, "offset": 120, "negative_utc": False, }, "committer_date": { "timestamp": {"seconds": 123123456, "microseconds": 0}, "offset": 0, "negative_utc": False, }, "type": "git", "directory": ( b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" b"\x01\x02\x03\x04\x05" ), "synthetic": False, "metadata": None, - "parents": [], + "parents": (), "id": b"\x8b\xeb\xd1\x9d\x07\xe2\x1e0\xe2 \x91X\x8d\xbd\x1c\xa8\x86\xdeB\x0c", } def test_client(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) # Fill Kafka producer.produce( topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=1, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"revision": [REV]}) def test_client_eof(kafka_prefix: str, kafka_consumer_group: str, kafka_server: str): producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) # Fill Kafka producer.produce( topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=None, stop_on_eof=True, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"revision": [REV]}) @pytest.mark.parametrize("batch_size", [1, 5, 100]) def test_client_batch_size( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, batch_size: int, ): num_objects = 2 * batch_size + 1 assert num_objects < 256, "Too many objects, generation will fail" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) contents = [Content.from_data(bytes([i])) for i in range(num_objects)] # Fill Kafka for content in contents: producer.produce( topic=kafka_prefix + ".content", key=key_to_kafka(content.sha1), value=value_to_kafka(content.to_dict()), ) producer.flush() client = JournalClient( brokers=[kafka_server], group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=num_objects, batch_size=batch_size, ) collected_output: List[Dict] = [] def worker_fn(objects): received = objects["content"] assert len(received) <= batch_size collected_output.extend(received) client.process(worker_fn) expected_output = [content.to_dict() for content in contents] assert len(collected_output) == len(expected_output) for output in collected_output: assert output in expected_output @pytest.fixture() def kafka_producer(kafka_prefix: str, kafka_server_base: str): producer = Producer( { "bootstrap.servers": kafka_server_base, "client.id": "test producer", "acks": "all", } ) # Fill Kafka producer.produce( topic=kafka_prefix + ".something", key=key_to_kafka(b"key1"), value=value_to_kafka("value1"), ) producer.produce( topic=kafka_prefix + ".else", key=key_to_kafka(b"key1"), value=value_to_kafka("value2"), ) producer.flush() return producer def test_client_subscribe_all( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): client = JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_after_objects=2, ) assert set(client.subscription) == { f"{kafka_prefix}.something", f"{kafka_prefix}.else", } worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with( {"something": ["value1"], "else": ["value2"],} ) def test_client_subscribe_one_topic( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): client = JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_after_objects=1, object_types=["else"], ) assert client.subscription == [f"{kafka_prefix}.else"] worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({"else": ["value2"]}) def test_client_subscribe_absent_topic( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix=kafka_prefix, stop_after_objects=1, object_types=["really"], ) def test_client_subscribe_absent_prefix( kafka_producer: Producer, kafka_prefix: str, kafka_server_base: str ): with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", stop_after_objects=1, ) with pytest.raises(ValueError): JournalClient( brokers=[kafka_server_base], group_id="whatever", prefix="wrong.prefix", stop_after_objects=1, object_types=["else"], ) + + +def test_client_subscriptions_with_anonymized_topics( + kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str +): + producer = Producer( + { + "bootstrap.servers": kafka_server_base, + "client.id": "test producer", + "acks": "all", + } + ) + + # Fill Kafka with revision object on both the regular prefix (normally for + # anonymized objects in this case) and privileged one + producer.produce( + topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), + ) + producer.produce( + topic=kafka_prefix + "_privileged.revision", + key=REV["id"], + value=value_to_kafka(REV), + ) + producer.flush() + + # without privileged "channels" activated on the client side + client = JournalClient( + brokers=[kafka_server_base], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=1, + privileged=False, + ) + # we only subscribed to "standard" topics + assert client.subscription == [kafka_prefix + ".revision"] + + # with privileged "channels" activated on the client side + client = JournalClient( + brokers=[kafka_server_base], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=1, + privileged=True, + ) + # we only subscribed to "privileged" topics + assert client.subscription == [kafka_prefix + "_privileged.revision"] + + +def test_client_subscriptions_without_anonymized_topics( + kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str +): + producer = Producer( + { + "bootstrap.servers": kafka_server_base, + "client.id": "test producer", + "acks": "all", + } + ) + + # Fill Kafka with revision objects only on the standard prefix + producer.produce( + topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), + ) + producer.flush() + + # without privileged channel activated on the client side + client = JournalClient( + brokers=[kafka_server_base], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=1, + privileged=False, + ) + # we only subscribed to the standard prefix + assert client.subscription == [kafka_prefix + ".revision"] + + # with privileged channel activated on the client side + client = JournalClient( + brokers=[kafka_server_base], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=1, + privileged=True, + ) + # we also only subscribed to the standard prefix, since there is no priviled prefix + # on the kafka broker + assert client.subscription == [kafka_prefix + ".revision"] diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py index b0ceba8..1fe02cc 100644 --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,97 +1,166 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Iterable + import pytest from confluent_kafka import Consumer, Producer -from swh.model.model import Directory +from swh.model.model import Directory, Revision, Release from swh.journal.tests.journal_data import TEST_OBJECTS from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed from swh.journal.writer.kafka import KafkaJournalWriter, KafkaDeliveryError -def test_kafka_writer(kafka_prefix: str, kafka_server: str, consumer: Consumer): +def test_kafka_writer( + kafka_prefix: str, + kafka_server: str, + consumer: Consumer, + privileged_object_types: Iterable[str], +): writer = KafkaJournalWriter( - brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, + brokers=[kafka_server], + client_id="kafka_writer", + prefix=kafka_prefix, + anonymize=False, ) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) + for key, obj_dict in consumed_messages["revision"]: + obj = Revision.from_dict(obj_dict) + for person in (obj.author, obj.committer): + assert not ( + len(person.fullname) == 32 + and person.name is None + and person.email is None + ) + for key, obj_dict in consumed_messages["release"]: + obj = Release.from_dict(obj_dict) + for person in (obj.author,): + assert not ( + len(person.fullname) == 32 + and person.name is None + and person.email is None + ) + + +def test_kafka_writer_anonymized( + kafka_prefix: str, + kafka_server: str, + consumer: Consumer, + privileged_object_types: Iterable[str], +): + writer = KafkaJournalWriter( + brokers=[kafka_server], + client_id="kafka_writer", + prefix=kafka_prefix, + anonymize=True, + ) + + expected_messages = 0 + + for object_type, objects in TEST_OBJECTS.items(): + writer.write_additions(object_type, objects) + expected_messages += len(objects) + if object_type in privileged_object_types: + expected_messages += len(objects) + + consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) + assert_all_objects_consumed(consumed_messages, exclude=["revision", "release"]) + + for key, obj_dict in consumed_messages["revision"]: + obj = Revision.from_dict(obj_dict) + for person in (obj.author, obj.committer): + assert ( + len(person.fullname) == 32 + and person.name is None + and person.email is None + ) + for key, obj_dict in consumed_messages["release"]: + obj = Release.from_dict(obj_dict) + for person in (obj.author,): + assert ( + len(person.fullname) == 32 + and person.name is None + and person.email is None + ) + def test_write_delivery_failure( kafka_prefix: str, kafka_server: str, consumer: Consumer ): class MockKafkaError: """A mocked kafka error""" def str(self): return "Mocked Kafka Error" def name(self): return "SWH_MOCK_ERROR" class KafkaJournalWriterFailDelivery(KafkaJournalWriter): """A journal writer which always fails delivering messages""" def _on_delivery(self, error, message): """Replace the inbound error with a fake delivery error""" super()._on_delivery(MockKafkaError(), message) kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriterFailDelivery( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, ) - empty_dir = Directory(entries=[]) + empty_dir = Directory(entries=()) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert "Failed deliveries" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_MOCK_ERROR" def test_write_delivery_timeout( kafka_prefix: str, kafka_server: str, consumer: Consumer ): produced = [] class MockProducer(Producer): """A kafka producer which pretends to produce messages, but never sends any delivery acknowledgements""" def produce(self, **kwargs): produced.append(kwargs) writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, flush_timeout=1, producer_class=MockProducer, ) - empty_dir = Directory(entries=[]) + empty_dir = Directory(entries=()) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert len(produced) == 1 assert "timeout" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_FLUSH_TIMEOUT" diff --git a/swh/journal/tests/test_pytest_plugin.py b/swh/journal/tests/test_pytest_plugin.py index f220b3c..69152a3 100644 --- a/swh/journal/tests/test_pytest_plugin.py +++ b/swh/journal/tests/test_pytest_plugin.py @@ -1,51 +1,67 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Iterator from confluent_kafka.admin import AdminClient def test_kafka_server(kafka_server_base: str): ip, port_str = kafka_server_base.split(":") assert ip == "127.0.0.1" assert int(port_str) admin = AdminClient({"bootstrap.servers": kafka_server_base}) topics = admin.list_topics() assert len(topics.brokers) == 1 def test_kafka_server_with_topics( - kafka_server: str, kafka_prefix: str, object_types: Iterator[str] + kafka_server: str, + kafka_prefix: str, + object_types: Iterator[str], + privileged_object_types: Iterator[str], ): admin = AdminClient({"bootstrap.servers": kafka_server}) + + # check unprivileged topics are present topics = { topic for topic in admin.list_topics().topics if topic.startswith(f"{kafka_prefix}.") } assert topics == {f"{kafka_prefix}.{obj}" for obj in object_types} + # check privileged topics are present + topics = { + topic + for topic in admin.list_topics().topics + if topic.startswith(f"{kafka_prefix}_privileged.") + } + assert topics == { + f"{kafka_prefix}_privileged.{obj}" for obj in privileged_object_types + } + def test_test_config(test_config: dict, kafka_prefix: str, kafka_server_base: str): assert test_config == { "consumer_id": "swh.journal.consumer", "stop_after_objects": 1, "storage": {"cls": "memory", "args": {}}, "object_types": { "content", "directory", "origin", "origin_visit", "release", "revision", "snapshot", "skipped_content", }, + "privileged_object_types": {"release", "revision",}, "brokers": [kafka_server_base], "prefix": kafka_prefix, } diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py index 01bbb51..0c08958 100644 --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -1,79 +1,80 @@ from swh.journal.client import JournalClient from swh.journal.writer.kafka import KafkaJournalWriter from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka class FakeKafkaMessage: def __init__(self, topic, key, value): self._topic = topic self._key = key_to_kafka(key) self._value = value_to_kafka(value) def topic(self): return self._topic def value(self): return self._value def key(self): return self._key def error(self): return None class MockedKafkaWriter(KafkaJournalWriter): - def __init__(self, queue): + def __init__(self, queue, anonymize: bool = False): self._prefix = "prefix" self.queue = queue + self.anonymize = anonymize def send(self, topic, key, value): msg = FakeKafkaMessage(topic=topic, key=key, value=value) self.queue.append(msg) def flush(self): pass class MockedKafkaConsumer: """Mimic the confluent_kafka.Consumer API, producing the messages stored in `queue`. You're only allowed to subscribe to topics in which the queue has messages. """ def __init__(self, queue): self.queue = queue self.committed = False def consume(self, num_messages, timeout=None): L = self.queue[0:num_messages] self.queue[0:num_messages] = [] return L def commit(self): if self.queue == []: self.committed = True def list_topics(self, timeout=None): return set(message.topic() for message in self.queue) def subscribe(self, topics): unknown_topics = set(topics) - self.list_topics() if unknown_topics: raise ValueError("Unknown topics %s" % ", ".join(unknown_topics)) def close(self): pass class MockedJournalClient(JournalClient): def __init__(self, queue, object_types=None): self._object_types = object_types self.consumer = MockedKafkaConsumer(queue) self.process_timeout = None self.stop_after_objects = None self.value_deserializer = kafka_to_value self.stop_on_eof = False self.batch_size = 200 diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py index 297e0c2..61a9269 100644 --- a/swh/journal/writer/inmemory.py +++ b/swh/journal/writer/inmemory.py @@ -1,32 +1,40 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from multiprocessing import Manager from typing import List from swh.model.model import BaseModel from .kafka import ModelObject logger = logging.getLogger(__name__) class InMemoryJournalWriter: def __init__(self): # Share the list of objects across processes, for RemoteAPI tests. self.manager = Manager() self.objects = self.manager.list() + self.privileged_objects = self.manager.list() - def write_addition(self, object_type: str, object_: ModelObject) -> None: + def write_addition( + self, object_type: str, object_: ModelObject, privileged: bool = False + ) -> None: assert isinstance(object_, BaseModel) - self.objects.append((object_type, object_)) + if privileged: + self.privileged_objects.append((object_type, object_)) + else: + self.objects.append((object_type, object_)) write_update = write_addition - def write_additions(self, object_type: str, objects: List[ModelObject]) -> None: + def write_additions( + self, object_type: str, objects: List[ModelObject], privileged: bool = False + ) -> None: for object_ in objects: - self.write_addition(object_type, object_) + self.write_addition(object_type, object_, privileged) diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py index 2a26e53..1d24f6e 100644 --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -1,234 +1,242 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import time from typing import Dict, Iterable, List, NamedTuple, Optional, Type from confluent_kafka import Producer, KafkaException -from swh.model.model import ( - BaseModel, - Content, - Directory, - Origin, - OriginVisit, - Release, - Revision, - SkippedContent, - Snapshot, -) - from swh.journal.serializers import ( KeyType, ModelObject, object_key, pprint_key, key_to_kafka, value_to_kafka, ) logger = logging.getLogger(__name__) -OBJECT_TYPES: Dict[Type[BaseModel], str] = { - Content: "content", - Directory: "directory", - Origin: "origin", - OriginVisit: "origin_visit", - Release: "release", - Revision: "revision", - SkippedContent: "skipped_content", - Snapshot: "snapshot", -} - class DeliveryTag(NamedTuple): """Unique tag allowing us to check for a message delivery""" topic: str kafka_key: bytes class DeliveryFailureInfo(NamedTuple): """Verbose information for failed deliveries""" object_type: str key: KeyType message: str code: str def get_object_type(topic: str) -> str: """Get the object type from a topic string""" return topic.rsplit(".", 1)[-1] class KafkaDeliveryError(Exception): """Delivery failed on some kafka messages.""" def __init__(self, message: str, delivery_failures: Iterable[DeliveryFailureInfo]): self.message = message self.delivery_failures = list(delivery_failures) def pretty_failures(self) -> str: return ", ".join( f"{f.object_type} {pprint_key(f.key)} ({f.message})" for f in self.delivery_failures ) def __str__(self): return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" class KafkaJournalWriter: - """This class is instantiated and used by swh-storage to write incoming - new objects to Kafka before adding them to the storage backend - (eg. postgresql) itself. + """This class is used to write serialized versions of swh.model.model objects to a + series of Kafka topics. + + Topics used to send objects representations are built from a ``prefix`` plus the + type of the object: + + ``{prefix}.{object_type}`` + + Objects can be sent as is, or can be anonymized. The anonymization feature, when + activated, will write anonymized versions of model objects in the main topic, and + stock (non-anonymized) objects will be sent to a dedicated (privileged) set of + topics: + + ``{prefix}_privileged.{object_type}`` + + The anonymization of a swh.model object is the result of calling its + ``BaseModel.anonymize()`` method. An object is considered anonymizable if this + method returns a (non-None) value. Args: - brokers: list of broker addresses and ports - prefix: the prefix used to build the topic names for objects - client_id: the id of the writer sent to kafka - producer_config: extra configuration keys passed to the `Producer` + brokers: list of broker addresses and ports. + prefix: the prefix used to build the topic names for objects. + client_id: the id of the writer sent to kafka. + producer_config: extra configuration keys passed to the `Producer`. flush_timeout: timeout, in seconds, after which the `flush` operation will fail if some message deliveries are still pending. - producer_class: override for the kafka producer class + producer_class: override for the kafka producer class. + anonymize: if True, activate the anonymization feature. """ def __init__( self, brokers: Iterable[str], prefix: str, client_id: str, producer_config: Optional[Dict] = None, flush_timeout: float = 120, producer_class: Type[Producer] = Producer, + anonymize: bool = False, ): self._prefix = prefix + self._prefix_privileged = f"{self._prefix}_privileged" + self.anonymize = anonymize if not producer_config: producer_config = {} if "message.max.bytes" not in producer_config: producer_config = { "message.max.bytes": 100 * 1024 * 1024, **producer_config, } self.producer = producer_class( { "bootstrap.servers": ",".join(brokers), "client.id": client_id, "on_delivery": self._on_delivery, "error_cb": self._error_cb, "logger": logger, "acks": "all", **producer_config, } ) # Delivery management self.flush_timeout = flush_timeout # delivery tag -> original object "key" mapping self.deliveries_pending: Dict[DeliveryTag, KeyType] = {} # List of (object_type, key, error_msg, error_name) for failed deliveries self.delivery_failures: List[DeliveryFailureInfo] = [] def _error_cb(self, error): if error.fatal(): raise KafkaException(error) logger.info("Received non-fatal kafka error: %s", error) def _on_delivery(self, error, message): (topic, key) = delivery_tag = DeliveryTag(message.topic(), message.key()) sent_key = self.deliveries_pending.pop(delivery_tag, None) if error is not None: self.delivery_failures.append( DeliveryFailureInfo( get_object_type(topic), sent_key, error.str(), error.name() ) ) def send(self, topic: str, key: KeyType, value): kafka_key = key_to_kafka(key) self.producer.produce( topic=topic, key=kafka_key, value=value_to_kafka(value), ) self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key def delivery_error(self, message) -> KafkaDeliveryError: """Get all failed deliveries, and clear them""" ret = self.delivery_failures self.delivery_failures = [] while self.deliveries_pending: delivery_tag, orig_key = self.deliveries_pending.popitem() (topic, kafka_key) = delivery_tag ret.append( DeliveryFailureInfo( get_object_type(topic), orig_key, "No delivery before flush() timeout", "SWH_FLUSH_TIMEOUT", ) ) return KafkaDeliveryError(message, ret) def flush(self): start = time.monotonic() self.producer.flush(self.flush_timeout) while self.deliveries_pending: if time.monotonic() - start > self.flush_timeout: break self.producer.poll(0.1) if self.deliveries_pending: # Delivery timeout raise self.delivery_error( "flush() exceeded timeout (%ss)" % self.flush_timeout, ) elif self.delivery_failures: raise self.delivery_error("Failed deliveries after flush()") def _sanitize_object( self, object_type: str, object_: ModelObject ) -> Dict[str, str]: dict_ = object_.to_dict() if object_type == "origin_visit": # :( dict_["date"] = str(dict_["date"]) if object_type == "content": dict_.pop("data", None) return dict_ def _write_addition(self, object_type: str, object_: ModelObject) -> None: """Write a single object to the journal""" - topic = f"{self._prefix}.{object_type}" key = object_key(object_type, object_) + + if self.anonymize: + anon_object_ = object_.anonymize() + if anon_object_: # can be either None, or an anonymized object + # if the object is anonymizable, send the non-anonymized version in the + # privileged channel + topic = f"{self._prefix_privileged}.{object_type}" + dict_ = self._sanitize_object(object_type, object_) + logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) + self.send(topic, key=key, value=dict_) + object_ = anon_object_ + + topic = f"{self._prefix}.{object_type}" dict_ = self._sanitize_object(object_type, object_) logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) self.send(topic, key=key, value=dict_) def write_addition(self, object_type: str, object_: ModelObject) -> None: """Write a single object to the journal""" self._write_addition(object_type, object_) self.flush() write_update = write_addition def write_additions(self, object_type: str, objects: Iterable[ModelObject]) -> None: """Write a set of objects to the journal""" for object_ in objects: self._write_addition(object_type, object_) self.flush() diff --git a/version.txt b/version.txt index 495d421..bcead13 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.1.0-0-g2533988 \ No newline at end of file +v0.2.0-0-g64cd01f \ No newline at end of file