Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_kafka_writer.py
Show All 16 Lines | |||||
def test_kafka_writer( | def test_kafka_writer( | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_server: str, | kafka_server: str, | ||||
consumer: Consumer, | consumer: Consumer, | ||||
privileged_object_types: Iterable[str], | privileged_object_types: Iterable[str], | ||||
): | ): | ||||
writer = KafkaJournalWriter[BaseModel]( | writer = KafkaJournalWriter( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
client_id="kafka_writer", | client_id="kafka_writer", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
value_sanitizer=model_object_dict_sanitizer, | value_sanitizer=model_object_dict_sanitizer, | ||||
anonymize=False, | anonymize=False, | ||||
) | ) | ||||
expected_messages = 0 | expected_messages = 0 | ||||
Show All 27 Lines | |||||
def test_kafka_writer_anonymized( | def test_kafka_writer_anonymized( | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_server: str, | kafka_server: str, | ||||
consumer: Consumer, | consumer: Consumer, | ||||
privileged_object_types: Iterable[str], | privileged_object_types: Iterable[str], | ||||
): | ): | ||||
writer = KafkaJournalWriter[BaseModel]( | writer = KafkaJournalWriter( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
client_id="kafka_writer", | client_id="kafka_writer", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
value_sanitizer=model_object_dict_sanitizer, | value_sanitizer=model_object_dict_sanitizer, | ||||
anonymize=True, | anonymize=True, | ||||
) | ) | ||||
expected_messages = 0 | expected_messages = 0 | ||||
▲ Show 20 Lines • Show All 70 Lines • ▼ Show 20 Lines | def test_write_delivery_timeout(kafka_prefix: str, kafka_server: str): | ||||
class MockProducer(Producer): | class MockProducer(Producer): | ||||
"""A kafka producer which pretends to produce messages, but never sends any | """A kafka producer which pretends to produce messages, but never sends any | ||||
delivery acknowledgements""" | delivery acknowledgements""" | ||||
def produce(self, **kwargs): | def produce(self, **kwargs): | ||||
produced.append(kwargs) | produced.append(kwargs) | ||||
writer = KafkaJournalWriter[BaseModel]( | writer = KafkaJournalWriter( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
client_id="kafka_writer", | client_id="kafka_writer", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
value_sanitizer=model_object_dict_sanitizer, | value_sanitizer=model_object_dict_sanitizer, | ||||
flush_timeout=1, | flush_timeout=1, | ||||
producer_class=MockProducer, | producer_class=MockProducer, | ||||
) | ) | ||||
Show All 25 Lines | def produce(self, **kwargs): | ||||
if self.produce_calls <= self.n_buffererrors: | if self.produce_calls <= self.n_buffererrors: | ||||
raise BufferError("Local: Queue full") | raise BufferError("Local: Queue full") | ||||
self.produce_calls = 0 | self.produce_calls = 0 | ||||
return super().produce(**kwargs) | return super().produce(**kwargs) | ||||
def test_write_BufferError_retry(kafka_prefix: str, kafka_server: str, caplog): | def test_write_BufferError_retry(kafka_prefix: str, kafka_server: str, caplog): | ||||
writer = KafkaJournalWriter[BaseModel]( | writer = KafkaJournalWriter( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
client_id="kafka_writer", | client_id="kafka_writer", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
value_sanitizer=model_object_dict_sanitizer, | value_sanitizer=model_object_dict_sanitizer, | ||||
flush_timeout=1, | flush_timeout=1, | ||||
producer_class=MockBufferErrorProducer, | producer_class=MockBufferErrorProducer, | ||||
) | ) | ||||
writer.producer.n_buffererrors = 4 | writer.producer.n_buffererrors = 4 | ||||
empty_dir = Directory(entries=()) | empty_dir = Directory(entries=()) | ||||
caplog.set_level(logging.DEBUG, "swh.journal.writer.kafka") | caplog.set_level(logging.DEBUG, "swh.journal.writer.kafka") | ||||
writer.write_addition("directory", empty_dir) | writer.write_addition("directory", empty_dir) | ||||
records = [] | records = [] | ||||
for record in caplog.records: | for record in caplog.records: | ||||
if "BufferError" in record.getMessage(): | if "BufferError" in record.getMessage(): | ||||
records.append(record) | records.append(record) | ||||
assert len(records) == writer.producer.n_buffererrors | assert len(records) == writer.producer.n_buffererrors | ||||
def test_write_BufferError_give_up(kafka_prefix: str, kafka_server: str, caplog): | def test_write_BufferError_give_up(kafka_prefix: str, kafka_server: str, caplog): | ||||
writer = KafkaJournalWriter[BaseModel]( | writer = KafkaJournalWriter( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
client_id="kafka_writer", | client_id="kafka_writer", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
value_sanitizer=model_object_dict_sanitizer, | value_sanitizer=model_object_dict_sanitizer, | ||||
flush_timeout=1, | flush_timeout=1, | ||||
producer_class=MockBufferErrorProducer, | producer_class=MockBufferErrorProducer, | ||||
) | ) | ||||
writer.producer.n_buffererrors = 5 | writer.producer.n_buffererrors = 5 | ||||
empty_dir = Directory(entries=()) | empty_dir = Directory(entries=()) | ||||
with pytest.raises(KafkaDeliveryError): | with pytest.raises(KafkaDeliveryError): | ||||
writer.write_addition("directory", empty_dir) | writer.write_addition("directory", empty_dir) | ||||
def test_write_addition_errors_without_unique_key(kafka_prefix: str, kafka_server: str): | def test_write_addition_errors_without_unique_key(kafka_prefix: str, kafka_server: str): | ||||
writer = KafkaJournalWriter[BaseModel]( | writer = KafkaJournalWriter( | ||||
brokers=[kafka_server], | brokers=[kafka_server], | ||||
client_id="kafka_writer", | client_id="kafka_writer", | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
value_sanitizer=model_object_dict_sanitizer, | value_sanitizer=model_object_dict_sanitizer, | ||||
) | ) | ||||
with pytest.raises(NotImplementedError): | with pytest.raises(NotImplementedError): | ||||
writer.write_addition("BaseModel", BaseModel()) | writer.write_addition("BaseModel", BaseModel()) |