diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -1,11 +1,11 @@ -# Copyright (C) 2016-2021 The Software Heritage developers +# Copyright (C) 2016-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from enum import Enum -from typing import Any, Union +from typing import Any, BinaryIO, Union import msgpack @@ -111,3 +111,16 @@ strict_map_key=False, timestamp=3, # convert Timestamp in datetime objects (tz UTC) ) + + +def kafka_stream_to_value(file_like: BinaryIO) -> msgpack.Unpacker: + """Return a deserializer for data stored in kafka""" + return msgpack.Unpacker( + file_like, + raw=False, + object_hook=decode_types_bw, + ext_hook=msgpack_ext_hook, + strict_map_key=False, + use_list=False, + timestamp=3, # convert Timestamp in datetime objects (tz UTC) + ) diff --git a/swh/journal/tests/test_stream.py b/swh/journal/tests/test_stream.py --- a/swh/journal/tests/test_stream.py +++ b/swh/journal/tests/test_stream.py @@ -1,28 +1,19 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import io +from typing import Dict, List, Tuple -import msgpack - -from swh.journal.serializers import msgpack_ext_hook +from swh.journal.serializers import kafka_stream_to_value from swh.journal.writer import get_journal_writer, model_object_dict_sanitizer +from swh.journal.writer.interface import JournalWriterInterface from swh.model.tests.swh_model_data import TEST_OBJECTS -def test_write_additions_with_test_objects(): - outs = io.BytesIO() - - writer = get_journal_writer( - cls="stream", - value_sanitizer=model_object_dict_sanitizer, - output_stream=outs, - ) +def fill_writer(writer: JournalWriterInterface) -> List[Tuple[str, Dict]]: expected = [] - - n = 0 for object_type, objects in TEST_OBJECTS.items(): writer.write_additions(object_type, objects) @@ -32,18 +23,21 @@ objd.pop("data") expected.append((object_type, objd)) - n += len(objects) + return expected - outs.seek(0, 0) - unpacker = msgpack.Unpacker( - outs, - raw=False, - ext_hook=msgpack_ext_hook, - strict_map_key=False, - use_list=False, - timestamp=3, # convert Timestamp in datetime objects (tz UTC) + +def test_stream_journal_writer_stream(): + outs = io.BytesIO() + + writer = get_journal_writer( + cls="stream", + value_sanitizer=model_object_dict_sanitizer, + output_stream=outs, ) + expected = fill_writer(writer) + outs.seek(0, 0) + unpacker = kafka_stream_to_value(outs) for i, (objtype, objd) in enumerate(unpacker, start=1): assert (objtype, objd) in expected assert len(expected) == i