Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_kafka_writer.py
# Copyright (C) 2018-2019 The Software Heritage developers | # Copyright (C) 2018-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import datetime | import datetime | ||||
from confluent_kafka import Consumer, KafkaException | from confluent_kafka import Consumer, KafkaException | ||||
from subprocess import Popen | from subprocess import Popen | ||||
from typing import Tuple | from typing import Tuple | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.journal.writer.kafka import KafkaJournalWriter | from swh.journal.writer.kafka import KafkaJournalWriter | ||||
from swh.journal.serializers import ( | from swh.journal.serializers import kafka_to_value, key_to_kafka | ||||
kafka_to_key, kafka_to_value | |||||
) | |||||
from .conftest import OBJECT_TYPE_KEYS | from .conftest import OBJECT_TYPE_KEYS | ||||
def assert_written(consumer, kafka_prefix, expected_messages): | def assert_written(consumer, kafka_prefix, expected_messages): | ||||
consumed_objects = defaultdict(list) | consumed_objects = defaultdict(list) | ||||
fetched_messages = 0 | fetched_messages = 0 | ||||
Show All 13 Lines | while fetched_messages < expected_messages: | ||||
if error is not None: | if error is not None: | ||||
if error.fatal(): | if error.fatal(): | ||||
raise KafkaException(error) | raise KafkaException(error) | ||||
retries_left -= 1 | retries_left -= 1 | ||||
continue | continue | ||||
fetched_messages += 1 | fetched_messages += 1 | ||||
consumed_objects[msg.topic()].append( | consumed_objects[msg.topic()].append( | ||||
(kafka_to_key(msg.key()), kafka_to_value(msg.value())) | (msg.key(), kafka_to_value(msg.value())) | ||||
) | ) | ||||
for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): | for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): | ||||
topic = kafka_prefix + '.' + object_type | topic = kafka_prefix + '.' + object_type | ||||
(keys, values) = zip(*consumed_objects[topic]) | (keys, values) = zip(*consumed_objects[topic]) | ||||
if key_name: | if key_name: | ||||
assert list(keys) == [object_[key_name] for object_ in objects] | assert list(keys) == [key_to_kafka(object_[key_name]) | ||||
for object_ in objects] | |||||
else: | else: | ||||
pass # TODO | pass # TODO | ||||
if object_type == 'origin_visit': | if object_type == 'origin_visit': | ||||
for value in values: | for value in values: | ||||
del value['visit'] | del value['visit'] | ||||
elif object_type == 'content': | elif object_type == 'content': | ||||
for value in values: | for value in values: | ||||
▲ Show 20 Lines • Show All 73 Lines • Show Last 20 Lines |