kafka_prefix = 'xxksuzbwwe', kafka_server = '127.0.0.1:41265'
consumer = <cimpl.Consumer object at 0x7f0f1768e510>
privileged_object_types = {'release', 'revision'}
def test_kafka_writer(
kafka_prefix: str,
kafka_server: str,
consumer: Consumer,
privileged_object_types: Iterable[str],
):
writer = KafkaJournalWriter(
brokers=[kafka_server],
client_id="kafka_writer",
prefix=kafka_prefix,
anonymize=False,
)
expected_messages = 0
for object_type, objects in TEST_OBJECTS.items():
writer.write_additions(object_type, objects)
expected_messages += len(objects)
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages)
> assert_all_objects_consumed(consumed_messages)
.tox/py3/lib/python3.7/site-packages/swh/journal/tests/test_kafka_writer.py:38:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
consumed_messages = defaultdict(<class 'list'>, {'content': [(b"\xaa\xad\xd9Iw\xb8\xfb\xf3\xf6\xfb\t\xfc;\xbb\xc9\xed\xbd\xfa\x84'", {'sha...2\x82\x08\x89\xb3\xad\xc4\x8cm', 'length': 4, 'status': 'absent', 'reason': "because chr(1) != '*'", 'ctime': None})]})
exclude = None
def assert_all_objects_consumed(
consumed_messages: Dict, exclude: Optional[Collection] = None
):
"""Check whether all objects from TEST_OBJECTS have been consumed
`exclude` can be a list of object types for which we do not want to compare the
values (eg. for anonymized object).
"""
for object_type, known_objects in TEST_OBJECTS.items():
known_keys = [object_key(object_type, obj) for obj in known_objects]
if not consumed_messages[object_type]:
return
(received_keys, received_values) = zip(*consumed_messages[object_type])
if object_type in ("content", "skipped_content"):
for value in received_values:
del value["ctime"]
if object_type == "content":
known_objects = [attr.evolve(o, data=None) for o in known_objects]
for key in known_keys:
assert key in received_keys, (
f"expected {object_type} key {pprint_key(key)} "
"absent from consumed messages"
)
if exclude and object_type in exclude:
continue
for value in known_objects:
expected_value = value.to_dict()
> if value.object_type in ("content", "skipped_content"):
E AttributeError: 'MetadataAuthority' object has no attribute 'object_type'
.tox/py3/lib/python3.7/site-packages/swh/journal/pytest_plugin.py:98: AttributeError
TEST RESULT
TEST RESULT
- Run At
- Jul 30 2020, 5:39 PM