Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_direct_writer.py
Show First 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): | ||||
for object_ in objects: | for object_ in objects: | ||||
assert kafka_to_value(value_to_kafka(object_)) in values | assert kafka_to_value(value_to_kafka(object_)) in values | ||||
def test_direct_writer( | def test_direct_writer( | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
consumer_from_publisher: KafkaConsumer): | consumer: KafkaConsumer): | ||||
kafka_prefix += '.swh.journal.objects' | kafka_prefix += '.swh.journal.objects' | ||||
config = { | config = { | ||||
'brokers': 'localhost:%d' % kafka_server[1], | 'brokers': 'localhost:%d' % kafka_server[1], | ||||
'client_id': 'direct_writer', | 'client_id': 'direct_writer', | ||||
'prefix': kafka_prefix, | 'prefix': kafka_prefix, | ||||
} | } | ||||
writer = DirectKafkaWriter(**config) | writer = DirectKafkaWriter(**config) | ||||
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | ||||
for (num, object_) in enumerate(objects): | for (num, object_) in enumerate(objects): | ||||
if object_type == 'origin_visit': | if object_type == 'origin_visit': | ||||
object_ = {**object_, 'visit': num} | object_ = {**object_, 'visit': num} | ||||
if object_type == 'content': | if object_type == 'content': | ||||
object_ = {**object_, 'ctime': datetime.datetime.now()} | object_ = {**object_, 'ctime': datetime.datetime.now()} | ||||
writer.write_addition(object_type, object_) | writer.write_addition(object_type, object_) | ||||
assert_written(consumer_from_publisher, kafka_prefix) | assert_written(consumer, kafka_prefix) | ||||
def test_storage_direct_writer( | def test_storage_direct_writer( | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_server: Tuple[Popen, int], | kafka_server: Tuple[Popen, int], | ||||
consumer_from_publisher: KafkaConsumer): | consumer: KafkaConsumer): | ||||
kafka_prefix += '.swh.journal.objects' | kafka_prefix += '.swh.journal.objects' | ||||
config = { | config = { | ||||
'brokers': 'localhost:%d' % kafka_server[1], | 'brokers': 'localhost:%d' % kafka_server[1], | ||||
'client_id': 'direct_writer', | 'client_id': 'direct_writer', | ||||
'prefix': kafka_prefix, | 'prefix': kafka_prefix, | ||||
} | } | ||||
Show All 12 Lines | for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | ||||
object_ = object_.copy() | object_ = object_.copy() | ||||
origin_id = storage.origin_add_one(object_.pop('origin')) | origin_id = storage.origin_add_one(object_.pop('origin')) | ||||
visit = method(origin=origin_id, date=object_.pop('date')) | visit = method(origin=origin_id, date=object_.pop('date')) | ||||
visit_id = visit['visit'] | visit_id = visit['visit'] | ||||
storage.origin_visit_update(origin_id, visit_id, **object_) | storage.origin_visit_update(origin_id, visit_id, **object_) | ||||
else: | else: | ||||
assert False, object_type | assert False, object_type | ||||
assert_written(consumer_from_publisher, kafka_prefix) | assert_written(consumer, kafka_prefix) |