Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_stream.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import io | import io | ||||
from typing import Dict, List, Tuple | |||||
import msgpack | from swh.journal.serializers import kafka_stream_to_value | ||||
from swh.journal.serializers import msgpack_ext_hook | |||||
from swh.journal.writer import get_journal_writer, model_object_dict_sanitizer | from swh.journal.writer import get_journal_writer, model_object_dict_sanitizer | ||||
from swh.journal.writer.interface import JournalWriterInterface | |||||
from swh.model.tests.swh_model_data import TEST_OBJECTS | from swh.model.tests.swh_model_data import TEST_OBJECTS | ||||
def test_write_additions_with_test_objects(): | def fill_writer(writer: JournalWriterInterface) -> List[Tuple[str, Dict]]: | ||||
ardumont: Can't this be typed a bit? | |||||
Done Inline Actionsnot sure it's worth it, but I'll try douardda: not sure it's worth it, but I'll try | |||||
Done Inline ActionsSo D7952 it is then. douardda: So D7952 it is then. | |||||
outs = io.BytesIO() | |||||
writer = get_journal_writer( | |||||
cls="stream", | |||||
value_sanitizer=model_object_dict_sanitizer, | |||||
output_stream=outs, | |||||
) | |||||
expected = [] | expected = [] | ||||
n = 0 | |||||
for object_type, objects in TEST_OBJECTS.items(): | for object_type, objects in TEST_OBJECTS.items(): | ||||
writer.write_additions(object_type, objects) | writer.write_additions(object_type, objects) | ||||
for object in objects: | for object in objects: | ||||
objd = object.to_dict() | objd = object.to_dict() | ||||
if object_type == "content": | if object_type == "content": | ||||
objd.pop("data") | objd.pop("data") | ||||
expected.append((object_type, objd)) | expected.append((object_type, objd)) | ||||
n += len(objects) | writer.flush() | ||||
return expected | |||||
def test_stream_journal_writer_stream(): | |||||
outs = io.BytesIO() | |||||
writer = get_journal_writer( | |||||
cls="stream", | |||||
value_sanitizer=model_object_dict_sanitizer, | |||||
output_stream=outs, | |||||
) | |||||
expected = fill_writer(writer) | |||||
outs.seek(0, 0) | outs.seek(0, 0) | ||||
unpacker = msgpack.Unpacker( | unpacker = kafka_stream_to_value(outs) | ||||
outs, | for i, (objtype, objd) in enumerate(unpacker, start=1): | ||||
raw=False, | assert (objtype, objd) in expected | ||||
ext_hook=msgpack_ext_hook, | assert len(expected) == i | ||||
strict_map_key=False, | |||||
use_list=False, | |||||
timestamp=3, # convert Timestamp in datetime objects (tz UTC) | def test_stream_journal_writer_filename(tmp_path): | ||||
out_fname = str(tmp_path / "journal.msgpack") | |||||
writer = get_journal_writer( | |||||
cls="stream", | |||||
value_sanitizer=model_object_dict_sanitizer, | |||||
output_stream=out_fname, | |||||
) | ) | ||||
expected = fill_writer(writer) | |||||
with open(out_fname, "rb") as outs: | |||||
unpacker = kafka_stream_to_value(outs) | |||||
for i, (objtype, objd) in enumerate(unpacker, start=1): | |||||
assert (objtype, objd) in expected | |||||
assert len(expected) == i | |||||
def test_stream_journal_writer_stdout(capfdbinary): | |||||
writer = get_journal_writer( | |||||
cls="stream", | |||||
value_sanitizer=model_object_dict_sanitizer, | |||||
output_stream="-", | |||||
) | |||||
expected = fill_writer(writer) | |||||
Not Done Inline Actionsi've no idea what that is so til [1] [1] https://docs.pytest.org/en/6.2.x/capture.html#accessing-captured-output-from-a-test-function ardumont: i've no idea what that is so til [1]
[1] https://docs.pytest.org/en/6.2.x/capture. | |||||
Done Inline Actionsdid not know either before this :-) douardda: did not know either before this :-) | |||||
captured = capfdbinary.readouterr() | |||||
assert captured.err == b"" | |||||
outs = io.BytesIO(captured.out) | |||||
unpacker = kafka_stream_to_value(outs) | |||||
for i, (objtype, objd) in enumerate(unpacker, start=1): | for i, (objtype, objd) in enumerate(unpacker, start=1): | ||||
assert (objtype, objd) in expected | assert (objtype, objd) in expected | ||||
assert len(expected) == i | assert len(expected) == i |
Can't this be typed a bit?