kafka_prefix = 'naubfzivoy', kafka_consumer_group = 'test-consumer-naubfzivoy'
kafka_server = '127.0.0.1:47721', privileged = False
@pytest.mark.parametrize("privileged", [True, False])
def test_storage_play_anonymized(
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, privileged: bool,
):
"""Optimal replayer scenario.
This:
- writes objects to the topic
- replayer consumes objects from the topic and replay them
This tests the behavior with both a privileged and non-privileged replayer
"""
writer_config = {
"cls": "kafka",
"brokers": [kafka_server],
"client_id": "kafka_writer",
"prefix": kafka_prefix,
"anonymize": True,
}
src_config: Dict[str, Any] = {"cls": "memory", "journal_writer": writer_config}
storage = get_storage(**src_config)
# Fill the src storage
nb_sent = 0
for obj_type, objs in TEST_OBJECTS.items():
if obj_type in ("origin_visit", "origin_visit_status"):
# these are unrelated with what we want to test here
continue
method = getattr(storage, obj_type + "_add")
method(objs)
nb_sent += len(objs)
# Fill a destination storage from Kafka, potentially using privileged topics
dst_storage = get_storage(cls="memory")
deserializer = ModelObjectDeserializer()
replayer = JournalClient(
brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=nb_sent,
privileged=privileged,
value_deserializer=deserializer.convert,
)
worker_fn = functools.partial(process_replay_objects, storage=dst_storage)
nb_inserted = replayer.process(worker_fn)
replayer.consumer.commit()
assert nb_sent == nb_inserted
# Check the contents of the destination storage, and whether the anonymization was
# properly used
assert isinstance(storage, InMemoryStorage) # needed to help mypy
assert isinstance(dst_storage, InMemoryStorage)
> check_replayed(storage, dst_storage, expected_anonymized=not privileged)
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_replay.py:376:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
src = <swh.storage.in_memory.InMemoryStorage object at 0x7f18e235d518>
dst = <swh.storage.in_memory.InMemoryStorage object at 0x7f18e6c1c518>
exclude = None, expected_anonymized = True
def check_replayed(
src: InMemoryStorage,
dst: InMemoryStorage,
exclude: Optional[Container] = None,
expected_anonymized=False,
):
"""Simple utility function to compare the content of 2 in_memory storages"""
def fix_expected(attr, row):
if expected_anonymized:
if attr == "releases":
row = dataclasses.replace(
row, author=row.author and row.author.anonymize()
)
elif attr == "revisions":
row = dataclasses.replace(
row,
author=row.author.anonymize(),
committer=row.committer.anonymize(),
)
if attr == "revisions":
# the replayer should now drop the metadata attribute; see
# swh/storgae/replay.py:_insert_objects()
row.metadata = "null"
return row
for attr_ in (
"contents",
"skipped_contents",
"directories",
"extid",
"revisions",
"releases",
"snapshots",
"origins",
"origin_visits",
"origin_visit_statuses",
"raw_extrinsic_metadata",
):
if exclude and attr_ in exclude:
continue
expected_objects = [
(id, nullify_ctime(fix_expected(attr_, obj)))
for id, obj in sorted(getattr(src._cql_runner, f"_{attr_}").iter_all())
]
got_objects = [
(id, nullify_ctime(obj))
for id, obj in sorted(getattr(dst._cql_runner, f"_{attr_}").iter_all())
]
> assert got_objects == expected_objects, f"Mismatch object list for {attr_}"
E AssertionError: Mismatch object list for revisions
E assert [] == [((b'Q\xd9\xd..., b'bar'),)))]
E Right contains 3 more items, first extra item: ((b'Q\xd9\xd9J\xb0\x8d?uQ.:\x9f\xd1Q2\xe0\xa7\xcay(',), RevisionRow(id=b'Q\xd9\xd9J\xb0\x8d?uQ.:\x9f\xd1Q2\xe0\xa7\xca...eB\x9e\xa0O\xaeU\x11\xb6\x8f\xbf\x8f\xb9', name=None, email=None), synthetic=False, metadata='null', extra_headers=()))
E Full diff:
E [
E + ,
E - ((b'Q\xd9\xd9J\xb0\x8d?uQ.:\x9f\xd1Q2\xe0\xa7\xcay(',),
E - RevisionRow(id=b'Q\xd9\xd9J\xb0\x8d?uQ.:\x9f\xd1Q2\xe0\xa7\xcay(', date=TimestampWithTimezone(timestamp=Timestamp(seconds=1234567892, microseconds=0), offset=120, negative_utc=False), committer_date=TimestampWithTimezone(timestamp=Timesta...
E
E ...Full output truncated (6 lines hidden), use '-vv' to show
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_replay.py:268: AssertionError
TEST RESULT
TEST RESULT
- Run At
- Oct 29 2021, 10:35 AM