diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -89,6 +89,9 @@ will fail if some message deliveries are still pending. producer_class: override for the kafka producer class. anonymize: if True, activate the anonymization feature. + auto_flush: if True (default), flush the kafka producer in + ``write_addition()`` and ``write_additions()``. This should be set + to False ONLY for testing purpuse. DO NOT USE ON PRODUCTION ENVIRONMENT. """ @@ -102,10 +105,12 @@ flush_timeout: float = 120, producer_class: Type[Producer] = Producer, anonymize: bool = False, + auto_flush: bool = True, ): self._prefix = prefix self._prefix_privileged = f"{self._prefix}_privileged" self.anonymize = anonymize + self.auto_flush = auto_flush if not producer_config: producer_config = {} @@ -249,7 +254,8 @@ def write_addition(self, object_type: str, object_: ValueProtocol) -> None: """Write a single object to the journal""" self._write_addition(object_type, object_) - self.flush() + if self.auto_flush: + self.flush() def write_additions( self, object_type: str, objects: Iterable[ValueProtocol] @@ -258,4 +264,5 @@ for object_ in objects: self._write_addition(object_type, object_) - self.flush() + if self.auto_flush: + self.flush()