Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/serializers.py
# Copyright (C) 2016-2021 The Software Heritage developers | # Copyright (C) 2016-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
from enum import Enum | from enum import Enum | ||||
from typing import Any, Union | from typing import Any, BinaryIO, Union | ||||
import msgpack | import msgpack | ||||
from swh.model.model import KeyType | from swh.model.model import KeyType | ||||
class MsgpackExtTypeCodes(Enum): | class MsgpackExtTypeCodes(Enum): | ||||
LONG_INT = 1 | LONG_INT = 1 | ||||
▲ Show 20 Lines • Show All 89 Lines • ▼ Show 20 Lines | def kafka_to_value(kafka_value: bytes) -> Any: | ||||
return msgpack.unpackb( | return msgpack.unpackb( | ||||
kafka_value, | kafka_value, | ||||
raw=False, | raw=False, | ||||
object_hook=decode_types_bw, | object_hook=decode_types_bw, | ||||
ext_hook=msgpack_ext_hook, | ext_hook=msgpack_ext_hook, | ||||
strict_map_key=False, | strict_map_key=False, | ||||
timestamp=3, # convert Timestamp in datetime objects (tz UTC) | timestamp=3, # convert Timestamp in datetime objects (tz UTC) | ||||
) | ) | ||||
def kafka_stream_to_value(file_like: BinaryIO) -> msgpack.Unpacker: | |||||
vlorentz: What is ``stream`` (iterator/file-like object/...)? | |||||
Done Inline Actionsfile-like object (actually something msgpack.Unpacker will eat). Renaming this arg is probably a good idea... douardda: file-like object (actually something msgpack.Unpacker will eat). Renaming this arg is probably… | |||||
Not Done Inline ActionsI think the convention is to call it fd (eg. see the os module) rather than file-like. vlorentz: I think the convention is to call it `fd` (eg. see the `os` module) rather than `file-like`. | |||||
Done Inline Actionsfile_like is the name of the argument in msgpack.Unpacker, so I'll stick with it. douardda: `file_like` is the name of the argument in `msgpack.Unpacker`, so I'll stick with it. | |||||
"""Return a deserializer for data stored in kafka""" | |||||
return msgpack.Unpacker( | |||||
file_like, | |||||
raw=False, | |||||
object_hook=decode_types_bw, | |||||
ext_hook=msgpack_ext_hook, | |||||
strict_map_key=False, | |||||
use_list=False, | |||||
timestamp=3, # convert Timestamp in datetime objects (tz UTC) | |||||
) |
What is `stream` (iterator/file-like object/...)?