Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/writer/kafka.py
Show First 20 Lines • Show All 194 Lines • ▼ Show 20 Lines | def flush(self): | ||||
) | ) | ||||
elif self.delivery_failures: | elif self.delivery_failures: | ||||
raise self.delivery_error("Failed deliveries after flush()") | raise self.delivery_error("Failed deliveries after flush()") | ||||
def _sanitize_object( | def _sanitize_object( | ||||
self, object_type: str, object_: ModelObject | self, object_type: str, object_: ModelObject | ||||
) -> Dict[str, str]: | ) -> Dict[str, str]: | ||||
dict_ = object_.to_dict() | dict_ = object_.to_dict() | ||||
if object_type == "origin_visit": | |||||
# :( | |||||
dict_["date"] = str(dict_["date"]) | |||||
if object_type == "content": | if object_type == "content": | ||||
dict_.pop("data", None) | dict_.pop("data", None) | ||||
return dict_ | return dict_ | ||||
def _write_addition(self, object_type: str, object_: ModelObject) -> None: | def _write_addition(self, object_type: str, object_: ModelObject) -> None: | ||||
"""Write a single object to the journal""" | """Write a single object to the journal""" | ||||
key = object_key(object_type, object_) | key = object_key(object_type, object_) | ||||
Show All 29 Lines |