Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/writer/kafka.py
Show First 20 Lines • Show All 147 Lines • ▼ Show 20 Lines | class KafkaJournalWriter: | ||||
def _sanitize_object( | def _sanitize_object( | ||||
self, object_type: str, object_: ModelObject | self, object_type: str, object_: ModelObject | ||||
) -> Dict[str, str]: | ) -> Dict[str, str]: | ||||
dict_ = object_.to_dict() | dict_ = object_.to_dict() | ||||
if object_type == "origin_visit": | if object_type == "origin_visit": | ||||
# :( | # :( | ||||
dict_["date"] = str(dict_["date"]) | dict_["date"] = str(dict_["date"]) | ||||
if object_type == "content": | |||||
dict_.pop("data", None) | |||||
return dict_ | return dict_ | ||||
def _write_addition(self, object_type: str, object_: ModelObject) -> None: | def _write_addition(self, object_type: str, object_: ModelObject) -> None: | ||||
"""Write a single object to the journal""" | """Write a single object to the journal""" | ||||
topic = f"{self._prefix}.{object_type}" | topic = f"{self._prefix}.{object_type}" | ||||
key = self._get_key(object_type, object_) | key = self._get_key(object_type, object_) | ||||
dict_ = self._sanitize_object(object_type, object_) | dict_ = self._sanitize_object(object_type, object_) | ||||
logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) | logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) | ||||
Show All 15 Lines |