Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_kafka_writer.py
Show All 39 Lines | for obj_type, objs in TEST_OBJECTS.items(): | ||||
if obj_type in ( | if obj_type in ( | ||||
"content", | "content", | ||||
"skipped_content", | "skipped_content", | ||||
"directory", | "directory", | ||||
"revision", | "revision", | ||||
"release", | "release", | ||||
"snapshot", | "snapshot", | ||||
"origin", | "origin", | ||||
"origin_visit_status", | |||||
): | ): | ||||
method(objs) | method(objs) | ||||
expected_messages += len(objs) | expected_messages += len(objs) | ||||
elif obj_type in ("origin_visit",): | elif obj_type in ("origin_visit",): | ||||
for obj in objs: | for obj in objs: | ||||
assert isinstance(obj, OriginVisit) | assert isinstance(obj, OriginVisit) | ||||
storage.origin_add_one(Origin(url=obj.origin)) | storage.origin_add_one(Origin(url=obj.origin)) | ||||
visit = method(obj.origin, date=obj.date, type=obj.type) | visit = method(obj.origin, date=obj.date, type=obj.type) | ||||
expected_messages += 1 | expected_messages += 1 + 1 # 1 visit + 1 visit status | ||||
obj_d = obj.to_dict() | obj_d = obj.to_dict() | ||||
for k in ("visit", "origin", "date", "type"): | for k in ("visit", "origin", "date", "type"): | ||||
del obj_d[k] | del obj_d[k] | ||||
storage.origin_visit_update(obj.origin, visit.visit, **obj_d) | storage.origin_visit_update(obj.origin, visit.visit, **obj_d) | ||||
expected_messages += 1 | expected_messages += 1 | ||||
else: | else: | ||||
assert False, obj_type | assert False, obj_type | ||||
existing_topics = set( | existing_topics = set( | ||||
topic | topic | ||||
for topic in consumer.list_topics(timeout=10).topics.keys() | for topic in consumer.list_topics(timeout=10).topics.keys() | ||||
if topic.startswith(f"{kafka_prefix}.") # final . to exclude privileged topics | if topic.startswith(f"{kafka_prefix}.") # final . to exclude privileged topics | ||||
) | ) | ||||
assert existing_topics == { | assert existing_topics == { | ||||
f"{kafka_prefix}.{obj_type}" | f"{kafka_prefix}.{obj_type}" | ||||
for obj_type in ( | for obj_type in ( | ||||
"content", | "content", | ||||
"directory", | "directory", | ||||
"origin", | "origin", | ||||
"origin_visit", | "origin_visit", | ||||
"origin_visit_status", | |||||
"release", | "release", | ||||
"revision", | "revision", | ||||
"snapshot", | "snapshot", | ||||
"skipped_content", | "skipped_content", | ||||
) | ) | ||||
} | } | ||||
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) | consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) | ||||
Show All 36 Lines | ): | ||||
) | ) | ||||
assert existing_topics == { | assert existing_topics == { | ||||
f"{kafka_prefix}.{obj_type}" | f"{kafka_prefix}.{obj_type}" | ||||
for obj_type in ( | for obj_type in ( | ||||
"content", | "content", | ||||
"directory", | "directory", | ||||
"origin", | "origin", | ||||
"origin_visit", | "origin_visit", | ||||
"origin_visit_status", | |||||
"release", | "release", | ||||
"revision", | "revision", | ||||
"snapshot", | "snapshot", | ||||
"skipped_content", | "skipped_content", | ||||
) | ) | ||||
} | { | } | { | ||||
f"{kafka_prefix}_privileged.{obj_type}" for obj_type in ("release", "revision",) | f"{kafka_prefix}_privileged.{obj_type}" for obj_type in ("release", "revision",) | ||||
} | } | ||||
Show All 17 Lines |