Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/writer/kafka.py
Show First 20 Lines • Show All 79 Lines • ▼ Show 20 Lines | def __init__( | ||||
brokers: Iterable[str], | brokers: Iterable[str], | ||||
prefix: str, | prefix: str, | ||||
client_id: str, | client_id: str, | ||||
producer_config: Optional[Dict] = None, | producer_config: Optional[Dict] = None, | ||||
flush_timeout: float = 120, | flush_timeout: float = 120, | ||||
producer_class: Type[Producer] = Producer, | producer_class: Type[Producer] = Producer, | ||||
): | ): | ||||
self._prefix = prefix | self._prefix = prefix | ||||
self._prefix_privileged = f"{self._prefix}_privileged" | |||||
if not producer_config: | if not producer_config: | ||||
producer_config = {} | producer_config = {} | ||||
if "message.max.bytes" not in producer_config: | if "message.max.bytes" not in producer_config: | ||||
producer_config = { | producer_config = { | ||||
"message.max.bytes": 100 * 1024 * 1024, | "message.max.bytes": 100 * 1024 * 1024, | ||||
**producer_config, | **producer_config, | ||||
▲ Show 20 Lines • Show All 87 Lines • ▼ Show 20 Lines | ) -> Dict[str, str]: | ||||
dict_ = object_.to_dict() | dict_ = object_.to_dict() | ||||
if object_type == "origin_visit": | if object_type == "origin_visit": | ||||
# :( | # :( | ||||
dict_["date"] = str(dict_["date"]) | dict_["date"] = str(dict_["date"]) | ||||
if object_type == "content": | if object_type == "content": | ||||
dict_.pop("data", None) | dict_.pop("data", None) | ||||
return dict_ | return dict_ | ||||
def _write_addition(self, object_type: str, object_: ModelObject) -> None: | def _write_addition( | ||||
self, object_type: str, object_: ModelObject, privileged: bool | |||||
) -> None: | |||||
"""Write a single object to the journal""" | """Write a single object to the journal""" | ||||
if privileged: | |||||
topic = f"{self._prefix_privileged}.{object_type}" | |||||
else: | |||||
topic = f"{self._prefix}.{object_type}" | topic = f"{self._prefix}.{object_type}" | ||||
key = object_key(object_type, object_) | key = object_key(object_type, object_) | ||||
dict_ = self._sanitize_object(object_type, object_) | dict_ = self._sanitize_object(object_type, object_) | ||||
logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) | logger.debug("topic: %s, key: %s, value: %s", topic, key, dict_) | ||||
self.send(topic, key=key, value=dict_) | self.send(topic, key=key, value=dict_) | ||||
def write_addition(self, object_type: str, object_: ModelObject) -> None: | def write_addition( | ||||
self, object_type: str, object_: ModelObject, privileged: bool = False | |||||
) -> None: | |||||
"""Write a single object to the journal""" | """Write a single object to the journal""" | ||||
self._write_addition(object_type, object_) | self._write_addition(object_type, object_, privileged) | ||||
self.flush() | self.flush() | ||||
write_update = write_addition | write_update = write_addition | ||||
def write_additions(self, object_type: str, objects: Iterable[ModelObject]) -> None: | def write_additions( | ||||
self, object_type: str, objects: Iterable[ModelObject], privileged: bool = False | |||||
) -> None: | |||||
"""Write a set of objects to the journal""" | """Write a set of objects to the journal""" | ||||
for object_ in objects: | for object_ in objects: | ||||
self._write_addition(object_type, object_) | self._write_addition(object_type, object_, privileged) | ||||
self.flush() | self.flush() |