kafka_prefix = 'rrnxafkqlg', kafka_server = '127.0.0.1:49293'
consumer = <cimpl.Consumer object at 0x7f8972852400>
def test_storage_direct_writer_anonymized(
kafka_prefix: str, kafka_server, consumer: Consumer
):
writer_config = {
"cls": "kafka",
"brokers": [kafka_server],
"client_id": "kafka_writer",
"prefix": kafka_prefix,
"anonymize": True,
}
storage_config = {
"cls": "pipeline",
"steps": [{"cls": "memory", "journal_writer": writer_config},],
}
storage = get_storage(**storage_config)
expected_messages = 0
for obj_type, objs in TEST_OBJECTS.items():
if obj_type == "origin_visit":
# these have non-consistent API and are unrelated with what we
# want to test here
continue
method = getattr(storage, obj_type + "_add")
> method(objs)
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_kafka_writer.py:112:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/storage/in_memory.py:503: in release_add
self.journal_writer.release_add(releases)
.tox/py3/lib/python3.7/site-packages/swh/storage/writer.py:93: in release_add
self.write_additions("release", releases)
.tox/py3/lib/python3.7/site-packages/swh/storage/writer.py:66: in write_additions
f"{obj_type}:anonymized", [anonymize(value) for value in values]
.tox/py3/lib/python3.7/site-packages/swh/journal/writer/kafka.py:232: in write_additions
self._write_addition(object_type, object_)
.tox/py3/lib/python3.7/site-packages/swh/journal/writer/kafka.py:217: in _write_addition
key = object_key(object_type, object_)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
object_type = 'release:anonymized'
object_ = Release(name=b'v0.0.1', message=b'foo', target=b'\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\x04\... offset=120, negative_utc=False), metadata=None, id=b'\xd8\x1c\xc0q\x0e\xb6\xcf\x9e\xfd[\x92\n\x84S\xe1\xe0qW\xb6\xcd')
def object_key(object_type: str, object_) -> KeyType:
if object_type in ("revision", "release", "directory", "snapshot"):
return object_.id
elif object_type == "content":
return object_.sha1 # TODO: use a dict of hashes
elif object_type == "skipped_content":
return {hash: getattr(object_, hash) for hash in DEFAULT_ALGORITHMS}
elif object_type == "origin":
return {"url": object_.url}
elif object_type == "origin_visit":
return {
"origin": object_.origin,
"date": str(object_.date),
}
else:
> raise ValueError("Unknown object type: %s." % object_type)
E ValueError: Unknown object type: release:anonymized.
.tox/py3/lib/python3.7/site-packages/swh/journal/serializers.py:66: ValueError
TEST RESULT
TEST RESULT
- Run At
- May 13 2020, 5:40 PM