Page MenuHomeSoftware Heritage

Jenkins > .tox.py3.lib.python3.7.site-packages.swh.storage.tests.test_kafka_writer::test_storage_direct_writer
Failed

TEST RESULT

Run At
Aug 6 2020, 6:54 PM
Details
kafka_prefix = 'gqflxztznl', kafka_server = '127.0.0.1:35129' consumer = <cimpl.Consumer object at 0x7f35536f0400> def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": False, } storage_config: Dict[str, Any] = { "cls": "pipeline", "steps": [{"cls": "memory", "journal_writer": writer_config},], } storage = get_storage(**storage_config) expected_messages = 0 for obj_type, objs in TEST_OBJECTS.items(): method = getattr(storage, obj_type + "_add") if obj_type in ( "content", "skipped_content", "directory", "metadata_authority", "metadata_fetcher", "revision", "release", "snapshot", "origin", "origin_visit_status", "raw_extrinsic_metadata", ): method(objs) expected_messages += len(objs) elif obj_type in ("origin_visit",): for obj in objs: assert isinstance(obj, OriginVisit) storage.origin_add([Origin(url=obj.origin)]) method([obj]) expected_messages += 1 + 1 # 1 visit + 1 visit status else: assert False, obj_type existing_topics = set( topic for topic in consumer.list_topics(timeout=10).topics.keys() if topic.startswith(f"{kafka_prefix}.") # final . to exclude privileged topics ) assert existing_topics == { f"{kafka_prefix}.{obj_type}" for obj_type in ( "content", "directory", "metadata_authority", "metadata_fetcher", "origin", "origin_visit", "origin_visit_status", "raw_extrinsic_metadata", "release", "revision", "snapshot", "skipped_content", ) } consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) > assert_all_objects_consumed(consumed_messages) .tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_kafka_writer.py:90: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ consumed_messages = defaultdict(<class 'list'>, {'content': [(b':\xf8E\xc4\xffT\xc1\x11/c>=l\xe3k\xf98\xb1&\xf4', {'sha1': b':\xf8E\xc4\xf...cher': [({'name': 'test-fetcher', 'version': '1.0.0'}, {'name': 'test-fetcher', 'version': '1.0.0', 'metadata': {}})]}) exclude = None def assert_all_objects_consumed( consumed_messages: Dict, exclude: Optional[Collection] = None ): """Check whether all objects from TEST_OBJECTS have been consumed `exclude` can be a list of object types for which we do not want to compare the values (eg. for anonymized object). """ for object_type, known_objects in TEST_OBJECTS.items(): known_keys = [object_key(object_type, obj) for obj in known_objects] if not consumed_messages[object_type]: return (received_keys, received_values) = zip(*consumed_messages[object_type]) if object_type in ("content", "skipped_content"): for value in received_values: del value["ctime"] if object_type == "content": known_objects = [attr.evolve(o, data=None) for o in known_objects] for key in known_keys: assert key in received_keys, ( f"expected {object_type} key {pprint_key(key)} " "absent from consumed messages" ) if exclude and object_type in exclude: continue for value in known_objects: expected_value = value.to_dict() > if value.object_type in ("content", "skipped_content"): E AttributeError: 'MetadataAuthority' object has no attribute 'object_type' .tox/py3/lib/python3.7/site-packages/swh/journal/pytest_plugin.py:98: AttributeError