Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_direct_writer.py
# Copyright (C) 2018-2019 The Software Heritage developers | # Copyright (C) 2018-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import datetime | |||||
import time | import time | ||||
from kafka import KafkaConsumer | from kafka import KafkaConsumer | ||||
from subprocess import Popen | from subprocess import Popen | ||||
from typing import Tuple | from typing import Tuple | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
Show All 16 Lines | for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): | ||||
if key_name: | if key_name: | ||||
assert list(keys) == [object_[key_name] for object_ in objects] | assert list(keys) == [object_[key_name] for object_ in objects] | ||||
else: | else: | ||||
pass # TODO | pass # TODO | ||||
if object_type == 'origin_visit': | if object_type == 'origin_visit': | ||||
for value in values: | for value in values: | ||||
del value['visit'] | del value['visit'] | ||||
elif object_type == 'content': | |||||
for value in values: | |||||
del value['ctime'] | |||||
for object_ in objects: | for object_ in objects: | ||||
assert kafka_to_value(value_to_kafka(object_)) in values | assert kafka_to_value(value_to_kafka(object_)) in values | ||||
def test_direct_writer( | def test_direct_writer( | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
consumer_from_publisher: KafkaConsumer): | consumer_from_publisher: KafkaConsumer): | ||||
kafka_prefix += '.swh.journal.objects' | kafka_prefix += '.swh.journal.objects' | ||||
config = { | config = { | ||||
'brokers': 'localhost:%d' % kafka_server[1], | 'brokers': 'localhost:%d' % kafka_server[1], | ||||
'client_id': 'direct_writer', | 'client_id': 'direct_writer', | ||||
'prefix': kafka_prefix, | 'prefix': kafka_prefix, | ||||
} | } | ||||
writer = DirectKafkaWriter(**config) | writer = DirectKafkaWriter(**config) | ||||
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | ||||
for (num, object_) in enumerate(objects): | for (num, object_) in enumerate(objects): | ||||
if object_type == 'origin_visit': | if object_type == 'origin_visit': | ||||
object_ = {**object_, 'visit': num} | object_ = {**object_, 'visit': num} | ||||
if object_type == 'content': | |||||
object_ = {**object_, 'ctime': datetime.datetime.now()} | |||||
writer.write_addition(object_type, object_) | writer.write_addition(object_type, object_) | ||||
assert_written(consumer_from_publisher, kafka_prefix) | assert_written(consumer_from_publisher, kafka_prefix) | ||||
def test_storage_direct_writer( | def test_storage_direct_writer( | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
Show All 30 Lines |