Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_kafka_writer.py
# Copyright (C) 2018-2019 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import datetime | import datetime | ||||
from confluent_kafka import Consumer, KafkaException | from confluent_kafka import Consumer, KafkaException | ||||
from subprocess import Popen | from subprocess import Popen | ||||
from typing import Tuple | from typing import List, Tuple | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.journal.writer.kafka import KafkaJournalWriter | from swh.journal.replay import object_converter_fn | ||||
from swh.journal.serializers import ( | from swh.journal.serializers import ( | ||||
kafka_to_key, kafka_to_value | kafka_to_key, kafka_to_value | ||||
) | ) | ||||
from swh.journal.writer.kafka import KafkaJournalWriter | |||||
from swh.model.model import Content, Origin, BaseModel | |||||
from .conftest import OBJECT_TYPE_KEYS | from .conftest import OBJECT_TYPE_KEYS | ||||
def assert_written(consumer, kafka_prefix, expected_messages): | def assert_written(consumer, kafka_prefix, expected_messages): | ||||
consumed_objects = defaultdict(list) | consumed_objects = defaultdict(list) | ||||
fetched_messages = 0 | fetched_messages = 0 | ||||
▲ Show 20 Lines • Show All 84 Lines • ▼ Show 20 Lines | writer_config = { | ||||
'prefix': kafka_prefix, | 'prefix': kafka_prefix, | ||||
'producer_config': { | 'producer_config': { | ||||
'message.max.bytes': 100000000, | 'message.max.bytes': 100000000, | ||||
} | } | ||||
} | } | ||||
storage_config = { | storage_config = { | ||||
'cls': 'pipeline', | 'cls': 'pipeline', | ||||
'steps': [ | 'steps': [ | ||||
{'cls': 'validate'}, | |||||
{'cls': 'memory', 'journal_writer': writer_config}, | {'cls': 'memory', 'journal_writer': writer_config}, | ||||
] | ] | ||||
} | } | ||||
storage = get_storage(**storage_config) | storage = get_storage(**storage_config) | ||||
expected_messages = 0 | expected_messages = 0 | ||||
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | ||||
method = getattr(storage, object_type + '_add') | method = getattr(storage, object_type + '_add') | ||||
if object_type in ('content', 'directory', 'revision', 'release', | if object_type in ('content', 'directory', 'revision', 'release', | ||||
'snapshot', 'origin'): | 'snapshot', 'origin'): | ||||
objects_: List[BaseModel] | |||||
if object_type == 'content': | if object_type == 'content': | ||||
objects = [{**obj, 'data': b''} for obj in objects] | objects_ = [ | ||||
method(objects) | Content.from_dict({ | ||||
**obj, 'data': b''}) | |||||
for obj in objects | |||||
] | |||||
else: | |||||
objects_ = [ | |||||
object_converter_fn[object_type](obj) | |||||
for obj in objects | |||||
] | |||||
method(objects_) | |||||
expected_messages += len(objects) | expected_messages += len(objects) | ||||
elif object_type in ('origin_visit',): | elif object_type in ('origin_visit',): | ||||
for object_ in objects: | for object_ in objects: | ||||
object_ = object_.copy() | object_ = object_.copy() | ||||
origin_url = object_.pop('origin') | origin_url = object_.pop('origin') | ||||
storage.origin_add_one({'url': origin_url}) | storage.origin_add_one(Origin(url=origin_url)) | ||||
visit = method(origin=origin_url, date=object_.pop('date'), | visit = method(origin=origin_url, date=object_.pop('date'), | ||||
type=object_.pop('type')) | type=object_.pop('type')) | ||||
expected_messages += 1 | expected_messages += 1 | ||||
visit_id = visit['visit'] | visit_id = visit['visit'] | ||||
storage.origin_visit_update(origin_url, visit_id, **object_) | storage.origin_visit_update(origin_url, visit_id, **object_) | ||||
expected_messages += 1 | expected_messages += 1 | ||||
else: | else: | ||||
assert False, object_type | assert False, object_type | ||||
assert_written(consumer, kafka_prefix, expected_messages) | assert_written(consumer, kafka_prefix, expected_messages) |