diff --git a/swh/journal/writer/inmemory.py b/swh/journal/writer/inmemory.py --- a/swh/journal/writer/inmemory.py +++ b/swh/journal/writer/inmemory.py @@ -5,16 +5,16 @@ import logging from multiprocessing import Manager -from typing import Any, Callable, Dict, Iterable, List, Tuple +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple -from .interface import ValueProtocol +from .interface import KeyType, ValueProtocol logger = logging.getLogger(__name__) class InMemoryJournalWriter: - objects: List[Tuple[str, ValueProtocol]] - privileged_objects: List[Tuple[str, ValueProtocol]] + objects: List[Tuple[str, KeyType, Optional[ValueProtocol]]] + privileged_objects: List[Tuple[str, KeyType, Optional[ValueProtocol]]] def __init__( self, @@ -28,15 +28,15 @@ self.anonymize = anonymize def write_addition(self, object_type: str, object_: ValueProtocol) -> None: - object_.unique_key() # Check this does not error, to mimic the kafka writer + key = object_.unique_key() anon_object_ = None if self.anonymize: anon_object_ = object_.anonymize() if anon_object_ is not None: - self.privileged_objects.append((object_type, object_)) - self.objects.append((object_type, anon_object_)) + self.privileged_objects.append((object_type, key, object_)) + self.objects.append((object_type, key, anon_object_)) else: - self.objects.append((object_type, object_)) + self.objects.append((object_type, key, object_)) def write_additions( self, object_type: str, objects: Iterable[ValueProtocol] @@ -46,3 +46,7 @@ def flush(self) -> None: pass + + def delete(self, object_type: str, object_keys: Iterable[KeyType]) -> None: + for key in object_keys: + self.objects.append((object_type, key, None)) diff --git a/swh/journal/writer/interface.py b/swh/journal/writer/interface.py --- a/swh/journal/writer/interface.py +++ b/swh/journal/writer/interface.py @@ -38,3 +38,7 @@ def flush(self) -> None: """Flush the pending object additions in the backend, if any.""" ... + + def delete(self, object_type: str, object_keys: Iterable[KeyType]) -> None: + """Delete the object with keys `object_keys`.""" + ... diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -160,7 +160,7 @@ ) ) - def send(self, topic: str, key: KeyType, value): + def reliable_produce(self, topic: str, key: KeyType, kafka_value: Optional[bytes]): kafka_key = key_to_kafka(key) max_attempts = 5 last_exception: Optional[Exception] = None @@ -169,7 +169,7 @@ self.producer.produce( topic=topic, key=kafka_key, - value=value_to_kafka(value), + value=kafka_value, ) except BufferError as e: last_exception = e @@ -194,6 +194,10 @@ ) ) + def send(self, topic: str, key: KeyType, value): + kafka_value = value_to_kafka(value) + return self.reliable_produce(topic, key, kafka_value) + def delivery_error(self, message) -> KafkaDeliveryError: """Get all failed deliveries, and clear them""" ret = self.delivery_failures @@ -266,3 +270,8 @@ if self.auto_flush: self.flush() + + def delete(self, object_type: str, object_keys: Iterable[KeyType]) -> None: + topic = f"{self._prefix}.{object_type}" + for key in object_keys: + self.reliable_produce(topic, key, None) diff --git a/swh/journal/writer/stream.py b/swh/journal/writer/stream.py --- a/swh/journal/writer/stream.py +++ b/swh/journal/writer/stream.py @@ -8,7 +8,7 @@ from swh.journal.serializers import value_to_kafka -from .interface import ValueProtocol +from .interface import KeyType, ValueProtocol logger = logging.getLogger(__name__) @@ -39,5 +39,8 @@ for object_ in objects: self.write_addition(object_type, object_) + def delete(self, object_type: str, object_keys: Iterable[KeyType]) -> None: + pass + def flush(self) -> None: self.output.flush()