Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_kafka_writer.py
Show First 20 Lines • Show All 117 Lines • ▼ Show 20 Lines | for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | ||||
'snapshot', 'origin'): | 'snapshot', 'origin'): | ||||
if object_type == 'content': | if object_type == 'content': | ||||
objects = [{**obj, 'data': b''} for obj in objects] | objects = [{**obj, 'data': b''} for obj in objects] | ||||
method(objects) | method(objects) | ||||
expected_messages += len(objects) | expected_messages += len(objects) | ||||
elif object_type in ('origin_visit',): | elif object_type in ('origin_visit',): | ||||
for object_ in objects: | for object_ in objects: | ||||
object_ = object_.copy() | object_ = object_.copy() | ||||
origin_id = storage.origin_add_one(object_.pop('origin')) | origin_url = object_.pop('origin') | ||||
visit = method(origin=origin_id, date=object_.pop('date'), | storage.origin_add_one({'url': origin_url}) | ||||
visit = method(origin=origin_url, date=object_.pop('date'), | |||||
type=object_.pop('type')) | type=object_.pop('type')) | ||||
expected_messages += 1 | expected_messages += 1 | ||||
visit_id = visit['visit'] | visit_id = visit['visit'] | ||||
storage.origin_visit_update(origin_id, visit_id, **object_) | storage.origin_visit_update(origin_url, visit_id, **object_) | ||||
expected_messages += 1 | expected_messages += 1 | ||||
else: | else: | ||||
assert False, object_type | assert False, object_type | ||||
assert_written(consumer, kafka_prefix, expected_messages) | assert_written(consumer, kafka_prefix, expected_messages) |