kafka_prefix = 'rtrdiywbnt', kafka_consumer_group = 'test-consumer-rtrdiywbnt'
kafka_server = '127.0.0.1:37923'
def test_storage_play_anonymized(
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str
):
"""Optimal replayer scenario.
This:
- writes objects to the topic
- replayer consumes objects from the topic and replay them
"""
writer_config = {
"cls": "kafka",
"brokers": [kafka_server],
"client_id": "kafka_writer",
"prefix": kafka_prefix,
"anonymize": True,
}
src_config: Dict[str, Any] = {"cls": "memory", "journal_writer": writer_config}
storage = get_storage(**src_config)
# Fill the src storage
nb_sent = 0
for obj_type, objs in TEST_OBJECTS.items():
if obj_type in ("origin_visit", "origin_visit_status"):
# these are unrelated with what we want to test here
continue
method = getattr(storage, obj_type + "_add")
method(objs)
nb_sent += len(objs)
# Fill a destination storage from Kafka **using anonymized topics**
dst_storage = get_storage(cls="memory")
replayer = JournalClient(
brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=nb_sent,
privileged=False,
)
worker_fn = functools.partial(process_replay_objects, storage=dst_storage)
nb_inserted = replayer.process(worker_fn)
assert nb_sent == nb_inserted
> check_replayed(storage, dst_storage, expected_anonymized=True)
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_replay.py:306:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
src = <swh.storage.in_memory.InMemoryStorage object at 0x7f89ac3a6940>
dst = <swh.storage.in_memory.InMemoryStorage object at 0x7f89abb5ae10>
expected_anonymized = True
def check_replayed(src, dst, expected_anonymized=False):
"""Simple utility function to compare the content of 2 in_memory storages
If expected_anonymized is True, objects from the source storage are anonymized
before comparing with the destination storage.
"""
def maybe_anonymize(obj):
if expected_anonymized:
return obj.anonymize() or obj
return obj
expected_persons = {maybe_anonymize(person) for person in src._persons.values()}
got_persons = set(dst._persons.values())
assert got_persons == expected_persons
for attr in (
"contents",
"skipped_contents",
"directories",
"revisions",
"releases",
"snapshots",
"origins",
"origin_visit_statuses",
):
expected_objects = [
(id, maybe_anonymize(obj))
> for id, obj in sorted(getattr(src, f"_{attr}").items())
]
E AttributeError: 'InMemoryStorage' object has no attribute '_contents'
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_replay.py:353: AttributeError
TEST RESULT
TEST RESULT
- Run At
- Aug 12 2020, 7:43 PM