kafka_prefix = 'cfsdxccgks', kafka_server = '127.0.0.1:55303'
consumer = <cimpl.Consumer object at 0x7f574f2e6d90>
def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer):
writer_config = {
"cls": "kafka",
"brokers": [kafka_server],
"client_id": "kafka_writer",
"prefix": kafka_prefix,
"anonymize": False,
}
storage_config = {
"cls": "pipeline",
"steps": [{"cls": "memory", "journal_writer": writer_config},],
}
storage = get_storage(**storage_config)
expected_messages = 0
for obj_type, objs in TEST_OBJECTS.items():
method = getattr(storage, obj_type + "_add")
if obj_type in (
"content",
"skipped_content",
"directory",
"revision",
"release",
"snapshot",
"origin",
):
method(objs)
expected_messages += len(objs)
elif obj_type in ("origin_visit",):
for obj in objs:
assert isinstance(obj, OriginVisit)
storage.origin_add_one(Origin(url=obj.origin))
> visit = method(obj.origin, date=obj.date, type=obj.type)
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_kafka_writer.py:55:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/storage/in_memory.py:821: in origin_visit_add
self._origin_visit_status_add_one(visit_update)
.tox/py3/lib/python3.7/site-packages/swh/storage/in_memory.py:831: in _origin_visit_status_add_one
self.journal_writer.origin_visit_status_add([visit_status])
.tox/py3/lib/python3.7/site-packages/swh/storage/writer.py:91: in origin_visit_status_add
self.write_additions("origin_visit_status", visit_statuses)
.tox/py3/lib/python3.7/site-packages/swh/storage/writer.py:48: in write_additions
self.journal.write_additions(obj_type, values)
.tox/py3/lib/python3.7/site-packages/swh/journal/writer/kafka.py:240: in write_additions
self._write_addition(object_type, object_)
.tox/py3/lib/python3.7/site-packages/swh/journal/writer/kafka.py:212: in _write_addition
key = object_key(object_type, object_)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
object_type = 'origin_visit_status'
object_ = OriginVisitStatus(origin='https://somewhere.org/den/fox', visit=1, date=datetime.datetime(2013, 5, 7, 4, 20, 39, 369271, tzinfo=tzlocal()), status='ongoing', snapshot=None, metadata=None)
def object_key(object_type: str, object_) -> KeyType:
if object_type in ("revision", "release", "directory", "snapshot"):
return object_.id
elif object_type == "content":
return object_.sha1 # TODO: use a dict of hashes
elif object_type == "skipped_content":
return {hash: getattr(object_, hash) for hash in DEFAULT_ALGORITHMS}
elif object_type == "origin":
return {"url": object_.url}
elif object_type == "origin_visit":
return {
"origin": object_.origin,
"date": str(object_.date),
}
else:
> raise ValueError("Unknown object type: %s." % object_type)
E ValueError: Unknown object type: origin_visit_status.
.tox/py3/lib/python3.7/site-packages/swh/journal/serializers.py:66: ValueError
TEST RESULT
TEST RESULT
- Run At
- Jun 8 2020, 4:36 PM