diff --git a/swh/journal/tests/test_inmemory.py b/swh/journal/tests/test_inmemory.py index 009cdb5..8abd2d4 100644 --- a/swh/journal/tests/test_inmemory.py +++ b/swh/journal/tests/test_inmemory.py @@ -1,41 +1,37 @@ # Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.journal.writer import model_object_dict_sanitizer from swh.journal.writer.inmemory import InMemoryJournalWriter from swh.model.model import BaseModel from swh.model.tests.swh_model_data import TEST_OBJECTS def test_write_additions_with_test_objects(): - writer = InMemoryJournalWriter[BaseModel]( - value_sanitizer=model_object_dict_sanitizer - ) + writer = InMemoryJournalWriter(value_sanitizer=model_object_dict_sanitizer) expected = [] priv_expected = [] for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) for object in objects: if object.anonymize(): expected.append((object_type, object.anonymize())) priv_expected.append((object_type, object)) else: expected.append((object_type, object)) assert set(priv_expected) == set(writer.privileged_objects) assert set(expected) == set(writer.objects) def test_write_addition_errors_without_unique_key(): - writer = InMemoryJournalWriter[BaseModel]( - value_sanitizer=model_object_dict_sanitizer - ) + writer = InMemoryJournalWriter(value_sanitizer=model_object_dict_sanitizer) with pytest.raises(NotImplementedError): writer.write_addition("BaseModel", BaseModel()) diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py index 241e767..e19b3b6 100644 --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,248 +1,248 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from typing import Iterable from confluent_kafka import Consumer, Producer import pytest from swh.journal.pytest_plugin import assert_all_objects_consumed, consume_messages from swh.journal.writer import model_object_dict_sanitizer from swh.journal.writer.kafka import KafkaDeliveryError, KafkaJournalWriter from swh.model.model import BaseModel, Directory, Release, Revision from swh.model.tests.swh_model_data import TEST_OBJECTS def test_kafka_writer( kafka_prefix: str, kafka_server: str, consumer: Consumer, privileged_object_types: Iterable[str], ): - writer = KafkaJournalWriter[BaseModel]( + writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, anonymize=False, ) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) for key, obj_dict in consumed_messages["revision"]: obj = Revision.from_dict(obj_dict) for person in (obj.author, obj.committer): assert not ( len(person.fullname) == 32 and person.name is None and person.email is None ) for key, obj_dict in consumed_messages["release"]: obj = Release.from_dict(obj_dict) # author is optional for release if obj.author is None: continue for person in (obj.author,): assert not ( len(person.fullname) == 32 and person.name is None and person.email is None ) def test_kafka_writer_anonymized( kafka_prefix: str, kafka_server: str, consumer: Consumer, privileged_object_types: Iterable[str], ): - writer = KafkaJournalWriter[BaseModel]( + writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, anonymize=True, ) expected_messages = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) expected_messages += len(objects) if object_type in privileged_object_types: expected_messages += len(objects) consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages, exclude=["revision", "release"]) for key, obj_dict in consumed_messages["revision"]: obj = Revision.from_dict(obj_dict) for person in (obj.author, obj.committer): assert ( len(person.fullname) == 32 and person.name is None and person.email is None ) for key, obj_dict in consumed_messages["release"]: obj = Release.from_dict(obj_dict) # author is optional for release if obj.author is None: continue for person in (obj.author,): assert ( len(person.fullname) == 32 and person.name is None and person.email is None ) def test_write_delivery_failure(kafka_prefix: str, kafka_server: str): class MockKafkaError: """A mocked kafka error""" def str(self): return "Mocked Kafka Error" def name(self): return "SWH_MOCK_ERROR" class KafkaJournalWriterFailDelivery(KafkaJournalWriter): """A journal writer which always fails delivering messages""" def _on_delivery(self, error, message): """Replace the inbound error with a fake delivery error""" super()._on_delivery(MockKafkaError(), message) kafka_prefix += ".swh.journal.objects" writer = KafkaJournalWriterFailDelivery( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, ) empty_dir = Directory(entries=()) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert "Failed deliveries" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_MOCK_ERROR" def test_write_delivery_timeout(kafka_prefix: str, kafka_server: str): produced = [] class MockProducer(Producer): """A kafka producer which pretends to produce messages, but never sends any delivery acknowledgements""" def produce(self, **kwargs): produced.append(kwargs) - writer = KafkaJournalWriter[BaseModel]( + writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, flush_timeout=1, producer_class=MockProducer, ) empty_dir = Directory(entries=()) with pytest.raises(KafkaDeliveryError) as exc: writer.write_addition("directory", empty_dir) assert len(produced) == 1 assert "timeout" in exc.value.message assert len(exc.value.delivery_failures) == 1 delivery_failure = exc.value.delivery_failures[0] assert delivery_failure.key == empty_dir.id assert delivery_failure.code == "SWH_FLUSH_TIMEOUT" class MockBufferErrorProducer(Producer): """A Kafka producer that returns a BufferError on the `n_buffererrors` first calls to produce.""" def __init__(self, *args, **kwargs): self.n_buffererrors = kwargs.pop("n_bufferrors", 0) self.produce_calls = 0 super().__init__(*args, **kwargs) def produce(self, **kwargs): self.produce_calls += 1 if self.produce_calls <= self.n_buffererrors: raise BufferError("Local: Queue full") self.produce_calls = 0 return super().produce(**kwargs) def test_write_BufferError_retry(kafka_prefix: str, kafka_server: str, caplog): - writer = KafkaJournalWriter[BaseModel]( + writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, flush_timeout=1, producer_class=MockBufferErrorProducer, ) writer.producer.n_buffererrors = 4 empty_dir = Directory(entries=()) caplog.set_level(logging.DEBUG, "swh.journal.writer.kafka") writer.write_addition("directory", empty_dir) records = [] for record in caplog.records: if "BufferError" in record.getMessage(): records.append(record) assert len(records) == writer.producer.n_buffererrors def test_write_BufferError_give_up(kafka_prefix: str, kafka_server: str, caplog): - writer = KafkaJournalWriter[BaseModel]( + writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, flush_timeout=1, producer_class=MockBufferErrorProducer, ) writer.producer.n_buffererrors = 5 empty_dir = Directory(entries=()) with pytest.raises(KafkaDeliveryError): writer.write_addition("directory", empty_dir) def test_write_addition_errors_without_unique_key(kafka_prefix: str, kafka_server: str): - writer = KafkaJournalWriter[BaseModel]( + writer = KafkaJournalWriter( brokers=[kafka_server], client_id="kafka_writer", prefix=kafka_prefix, value_sanitizer=model_object_dict_sanitizer, ) with pytest.raises(NotImplementedError): writer.write_addition("BaseModel", BaseModel()) diff --git a/swh/journal/writer/__init__.py b/swh/journal/writer/__init__.py index 662fa80..13d44f6 100644 --- a/swh/journal/writer/__init__.py +++ b/swh/journal/writer/__init__.py @@ -1,62 +1,56 @@ -# Copyright (C) 2019-2021 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Optional, TypeVar +from typing import Any, Dict, Type import warnings -from typing_extensions import Protocol - -from swh.model.model import KeyType - -TSelf = TypeVar("TSelf") - - -class ValueProtocol(Protocol): - def anonymize(self: TSelf) -> Optional[TSelf]: - ... - - def unique_key(self) -> KeyType: - ... - - def to_dict(self) -> Dict[str, Any]: - ... +from .interface import JournalWriterInterface def model_object_dict_sanitizer( object_type: str, object_dict: Dict[str, Any] ) -> Dict[str, str]: object_dict = object_dict.copy() if object_type == "content": object_dict.pop("data", None) return object_dict -def get_journal_writer(cls, **kwargs): +def get_journal_writer(cls, **kwargs) -> JournalWriterInterface: + if "args" in kwargs: warnings.warn( 'Explicit "args" key is deprecated, use keys directly instead.', DeprecationWarning, ) kwargs = kwargs["args"] kwargs.setdefault("value_sanitizer", model_object_dict_sanitizer) if cls == "inmemory": # FIXME: Remove inmemory in due time warnings.warn( "cls = 'inmemory' is deprecated, use 'memory' instead", DeprecationWarning ) cls = "memory" + + JournalWriter: Type[JournalWriterInterface] if cls == "memory": - from .inmemory import InMemoryJournalWriter as JournalWriter + from .inmemory import InMemoryJournalWriter + + JournalWriter = InMemoryJournalWriter elif cls == "kafka": - from .kafka import KafkaJournalWriter as JournalWriter + from .kafka import KafkaJournalWriter + + JournalWriter = KafkaJournalWriter elif cls == "stream": - from .stream import StreamJournalWriter as JournalWriter + from .stream import StreamJournalWriter + + JournalWriter = StreamJournalWriter assert "output_stream" in kwargs else: raise ValueError("Unknown journal writer class `%s`" % cls) return JournalWriter(**kwargs) diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py index 775785a..a2039c9 100644 --- a/swh/journal/writer/inmemory.py +++ b/swh/journal/writer/inmemory.py @@ -1,43 +1,43 @@ # Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from multiprocessing import Manager -from typing import Any, Callable, Dict, Generic, List, Tuple, TypeVar +from typing import Any, Callable, Dict, Iterable, List, Tuple -from . import ValueProtocol +from .interface import ValueProtocol logger = logging.getLogger(__name__) -TValue = TypeVar("TValue", bound=ValueProtocol) - - -class InMemoryJournalWriter(Generic[TValue]): - objects: List[Tuple[str, TValue]] - privileged_objects: List[Tuple[str, TValue]] +class InMemoryJournalWriter: + objects: List[Tuple[str, ValueProtocol]] + privileged_objects: List[Tuple[str, ValueProtocol]] def __init__( self, value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]] ): # Share the list of objects across processes, for RemoteAPI tests. self.manager = Manager() self.objects = self.manager.list() self.privileged_objects = self.manager.list() - def write_addition(self, object_type: str, object_: TValue) -> None: + def write_addition(self, object_type: str, object_: ValueProtocol) -> None: object_.unique_key() # Check this does not error, to mimic the kafka writer anon_object_ = object_.anonymize() if anon_object_ is not None: self.privileged_objects.append((object_type, object_)) self.objects.append((object_type, anon_object_)) else: self.objects.append((object_type, object_)) - write_update = write_addition - - def write_additions(self, object_type: str, objects: List[TValue]) -> None: + def write_additions( + self, object_type: str, objects: Iterable[ValueProtocol] + ) -> None: for object_ in objects: self.write_addition(object_type, object_) + + def flush(self) -> None: + pass diff --git a/swh/journal/writer/interface.py b/swh/journal/writer/interface.py new file mode 100644 index 0000000..c357137 --- /dev/null +++ b/swh/journal/writer/interface.py @@ -0,0 +1,40 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, Iterable, Optional, TypeVar + +from typing_extensions import Protocol, runtime_checkable + +from swh.model.model import KeyType + +TSelf = TypeVar("TSelf") + + +class ValueProtocol(Protocol): + def anonymize(self: TSelf) -> Optional[TSelf]: + ... + + def unique_key(self) -> KeyType: + ... + + def to_dict(self) -> Dict[str, Any]: + ... + + +@runtime_checkable +class JournalWriterInterface(Protocol): + def write_addition(self, object_type: str, object_: ValueProtocol) -> None: + """Add a SWH object of type object_type in the journal.""" + ... + + def write_additions( + self, object_type: str, objects: Iterable[ValueProtocol] + ) -> None: + """Add a list of SWH objects of type object_type in the journal.""" + ... + + def flush(self) -> None: + """Flush the pending object additions in the backend, if any.""" + ... diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py index 35f12c5..0fbb3ae 100644 --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -1,275 +1,261 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import time -from typing import ( - Any, - Callable, - Dict, - Generic, - Iterable, - List, - NamedTuple, - Optional, - Type, - TypeVar, -) +from typing import Any, Callable, Dict, Iterable, List, NamedTuple, Optional, Type from confluent_kafka import KafkaException, Producer from swh.journal.serializers import KeyType, key_to_kafka, pprint_key, value_to_kafka -from . import ValueProtocol +from .interface import ValueProtocol logger = logging.getLogger(__name__) class DeliveryTag(NamedTuple): """Unique tag allowing us to check for a message delivery""" topic: str kafka_key: bytes class DeliveryFailureInfo(NamedTuple): """Verbose information for failed deliveries""" object_type: str key: KeyType message: str code: str def get_object_type(topic: str) -> str: """Get the object type from a topic string""" return topic.rsplit(".", 1)[-1] class KafkaDeliveryError(Exception): """Delivery failed on some kafka messages.""" def __init__(self, message: str, delivery_failures: Iterable[DeliveryFailureInfo]): self.message = message self.delivery_failures = list(delivery_failures) def pretty_failures(self) -> str: return ", ".join( f"{f.object_type} {pprint_key(f.key)} ({f.message})" for f in self.delivery_failures ) def __str__(self): return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" -TValue = TypeVar("TValue", bound=ValueProtocol) - - -class KafkaJournalWriter(Generic[TValue]): - """This class is used to write serialized versions of value objects to a - series of Kafka topics. The type parameter `TValue`, which must implement the - `ValueProtocol`, is the type of values this writer will write. - Typically, `TValue` will be `swh.model.model.BaseModel`. +class KafkaJournalWriter: + """This class is used to write serialized versions of value objects to a series + of Kafka topics. The type parameter of value objects, which must implement + the `ValueProtocol`, is the type of values this writer will write. + Typically, `ValueProtocol` will be `swh.model.model.BaseModel`. Topics used to send objects representations are built from a ``prefix`` plus the type of the object: ``{prefix}.{object_type}`` Objects can be sent as is, or can be anonymized. The anonymization feature, when activated, will write anonymized versions of value objects in the main topic, and stock (non-anonymized) objects will be sent to a dedicated (privileged) set of topics: ``{prefix}_privileged.{object_type}`` The anonymization of a value object is the result of calling its ``anonymize()`` method. An object is considered anonymizable if this method returns a (non-None) value. Args: brokers: list of broker addresses and ports. prefix: the prefix used to build the topic names for objects. client_id: the id of the writer sent to kafka. value_sanitizer: a function that takes the object type and the dict representation of an object as argument, and returns an other dict that should be actually stored in the journal (eg. removing keys that do no belong there) producer_config: extra configuration keys passed to the `Producer`. flush_timeout: timeout, in seconds, after which the `flush` operation will fail if some message deliveries are still pending. producer_class: override for the kafka producer class. anonymize: if True, activate the anonymization feature. """ def __init__( self, brokers: Iterable[str], prefix: str, client_id: str, value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]], producer_config: Optional[Dict] = None, flush_timeout: float = 120, producer_class: Type[Producer] = Producer, anonymize: bool = False, ): self._prefix = prefix self._prefix_privileged = f"{self._prefix}_privileged" self.anonymize = anonymize if not producer_config: producer_config = {} if "message.max.bytes" not in producer_config: producer_config = { "message.max.bytes": 100 * 1024 * 1024, **producer_config, } self.producer = producer_class( { "bootstrap.servers": ",".join(brokers), "client.id": client_id, "on_delivery": self._on_delivery, "error_cb": self._error_cb, "logger": logger, "acks": "all", **producer_config, } ) # Delivery management self.flush_timeout = flush_timeout # delivery tag -> original object "key" mapping self.deliveries_pending: Dict[DeliveryTag, KeyType] = {} # List of (object_type, key, error_msg, error_name) for failed deliveries self.delivery_failures: List[DeliveryFailureInfo] = [] self.value_sanitizer = value_sanitizer def _error_cb(self, error): if error.fatal(): raise KafkaException(error) logger.info("Received non-fatal kafka error: %s", error) def _on_delivery(self, error, message): (topic, key) = delivery_tag = DeliveryTag(message.topic(), message.key()) sent_key = self.deliveries_pending.pop(delivery_tag, None) if error is not None: self.delivery_failures.append( DeliveryFailureInfo( get_object_type(topic), sent_key, error.str(), error.name() ) ) def send(self, topic: str, key: KeyType, value): kafka_key = key_to_kafka(key) max_attempts = 5 last_exception: Optional[Exception] = None for attempt in range(max_attempts): try: self.producer.produce( topic=topic, key=kafka_key, value=value_to_kafka(value), ) except BufferError as e: last_exception = e wait = 1 + 3 * attempt if logger.isEnabledFor(logging.DEBUG): # pprint_key is expensive logger.debug( "BufferError producing %s %s; waiting for %ss", get_object_type(topic), pprint_key(kafka_key), wait, ) self.producer.poll(wait) else: self.deliveries_pending[DeliveryTag(topic, kafka_key)] = key return # We reach this point if all delivery attempts have failed self.delivery_failures.append( DeliveryFailureInfo( get_object_type(topic), key, str(last_exception), "SWH_BUFFER_ERROR" ) ) def delivery_error(self, message) -> KafkaDeliveryError: """Get all failed deliveries, and clear them""" ret = self.delivery_failures self.delivery_failures = [] while self.deliveries_pending: delivery_tag, orig_key = self.deliveries_pending.popitem() (topic, kafka_key) = delivery_tag ret.append( DeliveryFailureInfo( get_object_type(topic), orig_key, "No delivery before flush() timeout", "SWH_FLUSH_TIMEOUT", ) ) return KafkaDeliveryError(message, ret) - def flush(self): + def flush(self) -> None: start = time.monotonic() self.producer.flush(self.flush_timeout) while self.deliveries_pending: if time.monotonic() - start > self.flush_timeout: break self.producer.poll(0.1) if self.deliveries_pending: # Delivery timeout raise self.delivery_error( "flush() exceeded timeout (%ss)" % self.flush_timeout, ) elif self.delivery_failures: raise self.delivery_error("Failed deliveries after flush()") - def _write_addition(self, object_type: str, object_: TValue) -> None: + def _write_addition(self, object_type: str, object_: ValueProtocol) -> None: """Write a single object to the journal""" key = object_.unique_key() if self.anonymize: anon_object_ = object_.anonymize() if anon_object_: # can be either None, or an anonymized object # if the object is anonymizable, send the non-anonymized version in the # privileged channel topic = f"{self._prefix_privileged}.{object_type}" dict_ = self.value_sanitizer(object_type, object_.to_dict()) logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) self.send(topic, key=key, value=dict_) object_ = anon_object_ topic = f"{self._prefix}.{object_type}" dict_ = self.value_sanitizer(object_type, object_.to_dict()) logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) self.send(topic, key=key, value=dict_) - def write_addition(self, object_type: str, object_: TValue) -> None: + def write_addition(self, object_type: str, object_: ValueProtocol) -> None: """Write a single object to the journal""" self._write_addition(object_type, object_) self.flush() - write_update = write_addition - - def write_additions(self, object_type: str, objects: Iterable[TValue]) -> None: + def write_additions( + self, object_type: str, objects: Iterable[ValueProtocol] + ) -> None: """Write a set of objects to the journal""" for object_ in objects: self._write_addition(object_type, object_) self.flush() diff --git a/swh/journal/writer/stream.py b/swh/journal/writer/stream.py index 5d868b8..f796295 100644 --- a/swh/journal/writer/stream.py +++ b/swh/journal/writer/stream.py @@ -1,43 +1,43 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging -from typing import Any, BinaryIO, Callable, Dict, Generic, List, TypeVar +from typing import Any, BinaryIO, Callable, Dict, Iterable from swh.journal.serializers import value_to_kafka -from . import ValueProtocol +from .interface import ValueProtocol logger = logging.getLogger(__name__) -TValue = TypeVar("TValue", bound=ValueProtocol) - - -class StreamJournalWriter(Generic[TValue]): +class StreamJournalWriter: """A simple JournalWriter which serializes objects in a stream Might be used to serialize a storage in a file to generate a test dataset. """ def __init__( self, output_stream: BinaryIO, value_sanitizer: Callable[[str, Dict[str, Any]], Dict[str, Any]], ): # Share the list of objects across processes, for RemoteAPI tests. self.output = output_stream self.value_sanitizer = value_sanitizer - def write_addition(self, object_type: str, object_: TValue) -> None: + def write_addition(self, object_type: str, object_: ValueProtocol) -> None: object_.unique_key() # Check this does not error, to mimic the kafka writer dict_ = self.value_sanitizer(object_type, object_.to_dict()) self.output.write(value_to_kafka((object_type, dict_))) - write_update = write_addition - - def write_additions(self, object_type: str, objects: List[TValue]) -> None: + def write_additions( + self, object_type: str, objects: Iterable[ValueProtocol] + ) -> None: for object_ in objects: self.write_addition(object_type, object_) + + def flush(self) -> None: + self.output.flush()