Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_stream.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import io | import io | ||||
from typing import Dict, List, Tuple | |||||
import msgpack | from swh.journal.serializers import kafka_stream_to_value | ||||
from swh.journal.serializers import msgpack_ext_hook | |||||
from swh.journal.writer import get_journal_writer, model_object_dict_sanitizer | from swh.journal.writer import get_journal_writer, model_object_dict_sanitizer | ||||
from swh.journal.writer.interface import JournalWriterInterface | |||||
from swh.model.tests.swh_model_data import TEST_OBJECTS | from swh.model.tests.swh_model_data import TEST_OBJECTS | ||||
def test_write_additions_with_test_objects(): | def fill_writer(writer: JournalWriterInterface) -> List[Tuple[str, Dict]]: | ||||
ardumont: Can't this be typed a bit? | |||||
Done Inline Actionsnot sure it's worth it, but I'll try douardda: not sure it's worth it, but I'll try | |||||
Done Inline ActionsSo D7952 it is then. douardda: So D7952 it is then. | |||||
outs = io.BytesIO() | |||||
writer = get_journal_writer( | |||||
cls="stream", | |||||
value_sanitizer=model_object_dict_sanitizer, | |||||
output_stream=outs, | |||||
) | |||||
expected = [] | expected = [] | ||||
n = 0 | |||||
for object_type, objects in TEST_OBJECTS.items(): | for object_type, objects in TEST_OBJECTS.items(): | ||||
writer.write_additions(object_type, objects) | writer.write_additions(object_type, objects) | ||||
for object in objects: | for object in objects: | ||||
objd = object.to_dict() | objd = object.to_dict() | ||||
if object_type == "content": | if object_type == "content": | ||||
objd.pop("data") | objd.pop("data") | ||||
expected.append((object_type, objd)) | expected.append((object_type, objd)) | ||||
n += len(objects) | return expected | ||||
outs.seek(0, 0) | |||||
unpacker = msgpack.Unpacker( | def test_stream_journal_writer_stream(): | ||||
outs, | outs = io.BytesIO() | ||||
raw=False, | |||||
ext_hook=msgpack_ext_hook, | writer = get_journal_writer( | ||||
strict_map_key=False, | cls="stream", | ||||
use_list=False, | value_sanitizer=model_object_dict_sanitizer, | ||||
timestamp=3, # convert Timestamp in datetime objects (tz UTC) | output_stream=outs, | ||||
) | ) | ||||
expected = fill_writer(writer) | |||||
outs.seek(0, 0) | |||||
unpacker = kafka_stream_to_value(outs) | |||||
for i, (objtype, objd) in enumerate(unpacker, start=1): | for i, (objtype, objd) in enumerate(unpacker, start=1): | ||||
assert (objtype, objd) in expected | assert (objtype, objd) in expected | ||||
assert len(expected) == i | assert len(expected) == i | ||||
Not Done Inline Actionsi've no idea what that is so til [1] [1] https://docs.pytest.org/en/6.2.x/capture.html#accessing-captured-output-from-a-test-function ardumont: i've no idea what that is so til [1]
[1] https://docs.pytest.org/en/6.2.x/capture. | |||||
Done Inline Actionsdid not know either before this :-) douardda: did not know either before this :-) |
Can't this be typed a bit?