Changeset View
Standalone View
swh/journal/writer/kafka.py
Show First 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | def pretty_failures(self) -> str: | ||||
for f in self.delivery_failures | for f in self.delivery_failures | ||||
) | ) | ||||
def __str__(self): | def __str__(self): | ||||
return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" | return f"KafkaDeliveryError({self.message}, [{self.pretty_failures()}])" | ||||
class KafkaJournalWriter: | class KafkaJournalWriter: | ||||
"""This class is instantiated and used by swh-storage to write incoming | """This class is used to write serialized versions of swh.model.model objects to a | ||||
new objects to Kafka before adding them to the storage backend | series of Kafka topics. | ||||
(eg. postgresql) itself. | |||||
Topics used to send objects representations are built from a ``prefix`` plus the | |||||
type of the object: | |||||
``{prefix}.{object_type}`` | |||||
Objects can be sent as is, or can be anonymized. The anonymization feature, when | |||||
activated, will write anonymized versions of model objects in the main topic, and | |||||
stock (non-anonymized) objects will be sent to a dedicated (privileged) set of | |||||
topics: | |||||
``{prefix}_privileged.{object_type}`` | |||||
The anonymization of a swh.model object is the result of calling its | |||||
``BaseModel.anonymize()`` method. An object is considered anonymizable if this | |||||
method returns a (non-None) value. | |||||
Args: | Args: | ||||
brokers: list of broker addresses and ports | brokers: list of broker addresses and ports. | ||||
prefix: the prefix used to build the topic names for objects | prefix: the prefix used to build the topic names for objects. | ||||
client_id: the id of the writer sent to kafka | client_id: the id of the writer sent to kafka. | ||||
producer_config: extra configuration keys passed to the `Producer` | producer_config: extra configuration keys passed to the `Producer`. | ||||
flush_timeout: timeout, in seconds, after which the `flush` operation | flush_timeout: timeout, in seconds, after which the `flush` operation | ||||
will fail if some message deliveries are still pending. | will fail if some message deliveries are still pending. | ||||
producer_class: override for the kafka producer class | producer_class: override for the kafka producer class. | ||||
anonymize: if True, activate the anonymization feature. | |||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
brokers: Iterable[str], | brokers: Iterable[str], | ||||
prefix: str, | prefix: str, | ||||
client_id: str, | client_id: str, | ||||
producer_config: Optional[Dict] = None, | producer_config: Optional[Dict] = None, | ||||
flush_timeout: float = 120, | flush_timeout: float = 120, | ||||
producer_class: Type[Producer] = Producer, | producer_class: Type[Producer] = Producer, | ||||
anonymize: bool = False, | |||||
): | ): | ||||
self._prefix = prefix | self._prefix = prefix | ||||
self._prefix_privileged = f"{self._prefix}_privileged" | |||||
self.anonymize = anonymize | |||||
if not producer_config: | if not producer_config: | ||||
producer_config = {} | producer_config = {} | ||||
if "message.max.bytes" not in producer_config: | if "message.max.bytes" not in producer_config: | ||||
producer_config = { | producer_config = { | ||||
"message.max.bytes": 100 * 1024 * 1024, | "message.max.bytes": 100 * 1024 * 1024, | ||||
**producer_config, | **producer_config, | ||||
▲ Show 20 Lines • Show All 89 Lines • ▼ Show 20 Lines | ) -> Dict[str, str]: | ||||
# :( | # :( | ||||
dict_["date"] = str(dict_["date"]) | dict_["date"] = str(dict_["date"]) | ||||
if object_type == "content": | if object_type == "content": | ||||
dict_.pop("data", None) | dict_.pop("data", None) | ||||
return dict_ | return dict_ | ||||
def _write_addition(self, object_type: str, object_: ModelObject) -> None: | def _write_addition(self, object_type: str, object_: ModelObject) -> None: | ||||
"""Write a single object to the journal""" | """Write a single object to the journal""" | ||||
topic = f"{self._prefix}.{object_type}" | |||||
key = object_key(object_type, object_) | key = object_key(object_type, object_) | ||||
if self.anonymize: | |||||
anon_object_ = object_.anonymize() | |||||
if anon_object_: # can be either None, or an anonymized object | |||||
# if the object is anonymizable, send the non-anonymized version in the | |||||
# privileged channel | |||||
topic = f"{self._prefix_privileged}.{object_type}" | |||||
dict_ = self._sanitize_object(object_type, object_) | |||||
logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) | |||||
ardumont: mmm if you keep that reference, here, i fear you are sending the anonymized object to the… | |||||
Done Inline ActionsI don't understand your point here. douardda: I don't understand your point here.
| |||||
Not Done Inline ActionsI meant instruction not sentence sorry. my understanding is that:
That last instruction overrode the stock object reference to be the anonymized dict_ = self._sanitize_object(object_type, object_) sends the anonymized object to the priviledged topic instead of the stock Or am i misunderstanding something? [1] I do read the descriptions when reviewing, it just takes some time to sink ardumont: I meant `instruction` not `sentence` sorry.
Explicitely, i meant, `object_ = anon_object_`… | |||||
Done Inline Actions
no kidding :-) douardda: > [1] I do read the descriptions when reviewing, it just takes some time to sink in sometimes… | |||||
Done Inline Actions
Correct, with "anonymized topic" being the "standard one". So in this code, the non-anonymized version of say a Release will be first sent to the privileged topic. Then, thanks to this object_ = anon_object_, the remaining of the code, which is executed unconditionally, will send the anonymized object to the main channel. So this anonymized version will be sent to the main channel if and only if the privileged one has been used for the stock version of the object.
Nope, as explained above, it will send the anonymized object to the main topic, not the privileged one (this has already been dealt in the if block.) douardda: > my understanding is that:
>
> when anon_objects exists, we send this to the anonymized… | |||||
Not Done Inline Actionsoh yeah, totally. I misread the topic = f"{self._prefix_privileged}.{object_type}" at the start of the conditional! sorry for the noise then. ardumont: oh yeah, totally.
I misread the `topic = f"{self._prefix_privileged}.{object_type}"` at the… | |||||
self.send(topic, key=key, value=dict_) | |||||
object_ = anon_object_ | |||||
topic = f"{self._prefix}.{object_type}" | |||||
dict_ = self._sanitize_object(object_type, object_) | dict_ = self._sanitize_object(object_type, object_) | ||||
logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) | logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) | ||||
self.send(topic, key=key, value=dict_) | self.send(topic, key=key, value=dict_) | ||||
def write_addition(self, object_type: str, object_: ModelObject) -> None: | def write_addition(self, object_type: str, object_: ModelObject) -> None: | ||||
"""Write a single object to the journal""" | """Write a single object to the journal""" | ||||
self._write_addition(object_type, object_) | self._write_addition(object_type, object_) | ||||
self.flush() | self.flush() | ||||
Show All 9 Lines |
mmm if you keep that reference, here, i fear you are sending the anonymized object to the priviledged topic...
you should remove that last sentence.