diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -8,36 +8,7 @@ import msgpack from swh.core.api.serializers import msgpack_dumps, msgpack_loads -from swh.model.model import ( - Content, - Directory, - KeyType, - MetadataAuthority, - MetadataFetcher, - Origin, - OriginVisit, - OriginVisitStatus, - RawExtrinsicMetadata, - Release, - Revision, - SkippedContent, - Snapshot, -) - -ModelObject = Union[ - Content, - Directory, - MetadataAuthority, - MetadataFetcher, - Origin, - OriginVisit, - OriginVisitStatus, - RawExtrinsicMetadata, - Release, - Revision, - SkippedContent, - Snapshot, -] +from swh.model.model import KeyType def stringify_key_item(k: str, v: Union[str, bytes]) -> str: diff --git a/swh/journal/tests/journal_data.py b/swh/journal/tests/journal_data.py --- a/swh/journal/tests/journal_data.py +++ b/swh/journal/tests/journal_data.py @@ -8,10 +8,10 @@ import attr -from swh.journal.serializers import ModelObject from swh.model.hashutil import MultiHash, hash_to_bytes, hash_to_hex from swh.model.identifiers import SWHID from swh.model.model import ( + BaseModel, Content, Directory, DirectoryEntry, @@ -327,7 +327,7 @@ ] -TEST_OBJECTS: Dict[str, Sequence[ModelObject]] = { +TEST_OBJECTS: Dict[str, Sequence[BaseModel]] = { "content": CONTENTS, "directory": DIRECTORIES, "metadata_authority": METADATA_AUTHORITIES, diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -12,7 +12,7 @@ from swh.journal.tests.journal_data import TEST_OBJECTS from swh.journal.writer import model_object_dict_sanitizer from swh.journal.writer.kafka import KafkaDeliveryError, KafkaJournalWriter -from swh.model.model import Directory, Release, Revision +from swh.model.model import BaseModel, Directory, Release, Revision def test_kafka_writer( @@ -21,7 +21,7 @@ consumer: Consumer, privileged_object_types: Iterable[str], ): - writer = KafkaJournalWriter( + writer = KafkaJournalWriter[BaseModel]( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, @@ -62,7 +62,7 @@ consumer: Consumer, privileged_object_types: Iterable[str], ): - writer = KafkaJournalWriter( + writer = KafkaJournalWriter[BaseModel]( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, @@ -150,7 +150,7 @@ def produce(self, **kwargs): produced.append(kwargs) - writer = KafkaJournalWriter( + writer = KafkaJournalWriter[BaseModel]( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, diff --git a/swh/journal/writer/__init__.py b/swh/journal/writer/__init__.py --- a/swh/journal/writer/__init__.py +++ b/swh/journal/writer/__init__.py @@ -3,9 +3,26 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict +from typing import Any, Dict, Optional, TypeVar import warnings +from typing_extensions import Protocol + +from swh.model.model import KeyType + +TSelf = TypeVar("TSelf") + + +class ValueProtocol(Protocol): + def anonymize(self: TSelf) -> Optional[TSelf]: + ... + + def unique_key(self) -> KeyType: + ... + + def to_dict(self) -> Dict[str, Any]: + ... + def model_object_dict_sanitizer( object_type: str, object_dict: Dict[str, Any] diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py --- a/swh/journal/writer/inmemory.py +++ b/swh/journal/writer/inmemory.py @@ -5,17 +5,19 @@ import logging from multiprocessing import Manager -from typing import Any, List, Tuple +from typing import Any, Generic, List, Tuple, TypeVar -from swh.journal.serializers import ModelObject -from swh.model.model import BaseModel +from . import ValueProtocol logger = logging.getLogger(__name__) -class InMemoryJournalWriter: - objects: List[Tuple[str, ModelObject]] - privileged_objects: List[Tuple[str, ModelObject]] +TValue = TypeVar("TValue", bound=ValueProtocol) + + +class InMemoryJournalWriter(Generic[TValue]): + objects: List[Tuple[str, TValue]] + privileged_objects: List[Tuple[str, TValue]] def __init__(self, value_sanitizer: Any): # Share the list of objects across processes, for RemoteAPI tests. @@ -24,9 +26,8 @@ self.privileged_objects = self.manager.list() def write_addition( - self, object_type: str, object_: ModelObject, privileged: bool = False + self, object_type: str, object_: TValue, privileged: bool = False ) -> None: - assert isinstance(object_, BaseModel) if privileged: self.privileged_objects.append((object_type, object_)) else: @@ -35,7 +36,7 @@ write_update = write_addition def write_additions( - self, object_type: str, objects: List[ModelObject], privileged: bool = False + self, object_type: str, objects: List[TValue], privileged: bool = False ) -> None: for object_ in objects: self.write_addition(object_type, object_, privileged) diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -5,17 +5,24 @@ import logging import time -from typing import Any, Callable, Dict, Iterable, List, NamedTuple, Optional, Type +from typing import ( + Any, + Callable, + Dict, + Generic, + Iterable, + List, + NamedTuple, + Optional, + Type, + TypeVar, +) from confluent_kafka import KafkaException, Producer -from swh.journal.serializers import ( - KeyType, - ModelObject, - key_to_kafka, - pprint_key, - value_to_kafka, -) +from swh.journal.serializers import KeyType, key_to_kafka, pprint_key, value_to_kafka + +from . import ValueProtocol logger = logging.getLogger(__name__) @@ -58,9 +65,14 @@ return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" -class KafkaJournalWriter: - """This class is used to write serialized versions of swh.model.model objects to a - series of Kafka topics. +TValue = TypeVar("TValue", bound=ValueProtocol) + + +class KafkaJournalWriter(Generic[TValue]): + """This class is used to write serialized versions of value objects to a + series of Kafka topics. The type parameter `TValue`, which must implement the + `ValueProtocol`, is the type of values this writer will write. + Typically, `TValue` will be `swh.model.model.BaseModel`. Topics used to send objects representations are built from a ``prefix`` plus the type of the object: @@ -68,20 +80,24 @@ ``{prefix}.{object_type}`` Objects can be sent as is, or can be anonymized. The anonymization feature, when - activated, will write anonymized versions of model objects in the main topic, and + activated, will write anonymized versions of value objects in the main topic, and stock (non-anonymized) objects will be sent to a dedicated (privileged) set of topics: ``{prefix}_privileged.{object_type}`` - The anonymization of a swh.model object is the result of calling its - ``BaseModel.anonymize()`` method. An object is considered anonymizable if this + The anonymization of a value object is the result of calling its + ``anonymize()`` method. An object is considered anonymizable if this method returns a (non-None) value. Args: brokers: list of broker addresses and ports. prefix: the prefix used to build the topic names for objects. client_id: the id of the writer sent to kafka. + value_sanitizer: a function that takes the object type and the dict + representation of an object as argument, and returns an other dict + that should be actually stored in the journal (eg. removing keys + that do no belong there) producer_config: extra configuration keys passed to the `Producer`. flush_timeout: timeout, in seconds, after which the `flush` operation will fail if some message deliveries are still pending. @@ -198,7 +214,7 @@ elif self.delivery_failures: raise self.delivery_error("Failed deliveries after flush()") - def _write_addition(self, object_type: str, object_: ModelObject) -> None: + def _write_addition(self, object_type: str, object_: TValue) -> None: """Write a single object to the journal""" key = object_.unique_key() @@ -218,14 +234,14 @@ logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) self.send(topic, key=key, value=dict_) - def write_addition(self, object_type: str, object_: ModelObject) -> None: + def write_addition(self, object_type: str, object_: TValue) -> None: """Write a single object to the journal""" self._write_addition(object_type, object_) self.flush() write_update = write_addition - def write_additions(self, object_type: str, objects: Iterable[ModelObject]) -> None: + def write_additions(self, object_type: str, objects: Iterable[TValue]) -> None: """Write a set of objects to the journal""" for object_ in objects: self._write_addition(object_type, object_)