Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_replay.py
Show First 20 Lines • Show All 79 Lines • ▼ Show 20 Lines | def test_storage_replayer(replayer_storage_and_client, caplog): | ||||
caplog.set_level(logging.ERROR, "swh.journal.replay") | caplog.set_level(logging.ERROR, "swh.journal.replay") | ||||
# Fill the destination storage from Kafka | # Fill the destination storage from Kafka | ||||
dst = get_storage(cls="memory") | dst = get_storage(cls="memory") | ||||
worker_fn = functools.partial(process_replay_objects, storage=dst) | worker_fn = functools.partial(process_replay_objects, storage=dst) | ||||
nb_inserted = replayer.process(worker_fn) | nb_inserted = replayer.process(worker_fn) | ||||
assert nb_sent == nb_inserted | assert nb_sent == nb_inserted | ||||
_check_replayed(src, dst) | assert isinstance(src, InMemoryStorage) # needed to help mypy | ||||
assert isinstance(dst, InMemoryStorage) | |||||
check_replayed(src, dst) | |||||
collision = 0 | collision = 0 | ||||
for record in caplog.records: | for record in caplog.records: | ||||
logtext = record.getMessage() | logtext = record.getMessage() | ||||
if "Colliding contents:" in logtext: | if "Colliding contents:" in logtext: | ||||
collision += 1 | collision += 1 | ||||
assert collision == 0, "No collision should be detected" | assert collision == 0, "No collision should be detected" | ||||
▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | def test_storage_play_with_collision(replayer_storage_and_client, caplog): | ||||
assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) | assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) | ||||
for content in DUPLICATE_CONTENTS: | for content in DUPLICATE_CONTENTS: | ||||
expected_content_hashes = { | expected_content_hashes = { | ||||
k: hash_to_hex(v) for k, v in content.hashes().items() | k: hash_to_hex(v) for k, v in content.hashes().items() | ||||
} | } | ||||
assert expected_content_hashes in actual_colliding_hashes | assert expected_content_hashes in actual_colliding_hashes | ||||
# all objects from the src should exists in the dst storage | # all objects from the src should exists in the dst storage | ||||
_check_replayed(src, dst, exclude=["contents"]) | assert isinstance(src, InMemoryStorage) # needed to help mypy | ||||
assert isinstance(dst, InMemoryStorage) # needed to help mypy | |||||
check_replayed(src, dst, exclude=["contents"]) | |||||
# but the dst has one content more (one of the 2 colliding ones) | # but the dst has one content more (one of the 2 colliding ones) | ||||
assert ( | assert ( | ||||
len(list(src._cql_runner._contents.iter_all())) | len(list(src._cql_runner._contents.iter_all())) | ||||
== len(list(dst._cql_runner._contents.iter_all())) - 1 | == len(list(dst._cql_runner._contents.iter_all())) - 1 | ||||
) | ) | ||||
def test_replay_skipped_content(replayer_storage_and_client): | def test_replay_skipped_content(replayer_storage_and_client): | ||||
"""Test the 'skipped_content' topic is properly replayed.""" | """Test the 'skipped_content' topic is properly replayed.""" | ||||
src, replayer = replayer_storage_and_client | src, replayer = replayer_storage_and_client | ||||
_check_replay_skipped_content(src, replayer, "skipped_content") | _check_replay_skipped_content(src, replayer, "skipped_content") | ||||
def test_replay_skipped_content_bwcompat(replayer_storage_and_client): | def test_replay_skipped_content_bwcompat(replayer_storage_and_client): | ||||
"""Test the 'content' topic can be used to replay SkippedContent objects.""" | """Test the 'content' topic can be used to replay SkippedContent objects.""" | ||||
src, replayer = replayer_storage_and_client | src, replayer = replayer_storage_and_client | ||||
_check_replay_skipped_content(src, replayer, "content") | _check_replay_skipped_content(src, replayer, "content") | ||||
# utility functions | # utility functions | ||||
def _check_replayed( | def check_replayed( | ||||
src: InMemoryStorage, dst: InMemoryStorage, exclude: Optional[Container] = None | src: InMemoryStorage, | ||||
dst: InMemoryStorage, | |||||
exclude: Optional[Container] = None, | |||||
expected_anonymized=False, | |||||
): | ): | ||||
"""Simple utility function to compare the content of 2 in_memory storages | """Simple utility function to compare the content of 2 in_memory storages""" | ||||
def fix_expected(attr, row): | |||||
if expected_anonymized: | |||||
if attr == "releases": | |||||
row = dataclasses.replace( | |||||
row, author=row.author and row.author.anonymize() | |||||
) | |||||
elif attr == "revisions": | |||||
row = dataclasses.replace( | |||||
row, | |||||
author=row.author.anonymize(), | |||||
committer=row.committer.anonymize(), | |||||
) | |||||
return row | |||||
""" | |||||
for attr_ in ( | for attr_ in ( | ||||
"contents", | "contents", | ||||
"skipped_contents", | "skipped_contents", | ||||
"directories", | "directories", | ||||
"extid", | "extid", | ||||
"revisions", | "revisions", | ||||
"releases", | "releases", | ||||
"snapshots", | "snapshots", | ||||
"origins", | "origins", | ||||
"origin_visits", | "origin_visits", | ||||
"origin_visit_statuses", | "origin_visit_statuses", | ||||
"raw_extrinsic_metadata", | "raw_extrinsic_metadata", | ||||
): | ): | ||||
if exclude and attr_ in exclude: | if exclude and attr_ in exclude: | ||||
continue | continue | ||||
expected_objects = [ | expected_objects = [ | ||||
(id, nullify_ctime(obj)) | (id, nullify_ctime(fix_expected(attr_, obj))) | ||||
for id, obj in sorted(getattr(src._cql_runner, f"_{attr_}").iter_all()) | for id, obj in sorted(getattr(src._cql_runner, f"_{attr_}").iter_all()) | ||||
] | ] | ||||
got_objects = [ | got_objects = [ | ||||
(id, nullify_ctime(obj)) | (id, nullify_ctime(obj)) | ||||
for id, obj in sorted(getattr(dst._cql_runner, f"_{attr_}").iter_all()) | for id, obj in sorted(getattr(dst._cql_runner, f"_{attr_}").iter_all()) | ||||
] | ] | ||||
assert got_objects == expected_objects, f"Mismatch object list for {attr_}" | assert got_objects == expected_objects, f"Mismatch object list for {attr_}" | ||||
▲ Show 20 Lines • Show All 94 Lines • ▼ Show 20 Lines | ): | ||||
worker_fn = functools.partial(process_replay_objects, storage=dst_storage) | worker_fn = functools.partial(process_replay_objects, storage=dst_storage) | ||||
nb_inserted = replayer.process(worker_fn) | nb_inserted = replayer.process(worker_fn) | ||||
replayer.consumer.commit() | replayer.consumer.commit() | ||||
assert nb_sent == nb_inserted | assert nb_sent == nb_inserted | ||||
# Check the contents of the destination storage, and whether the anonymization was | # Check the contents of the destination storage, and whether the anonymization was | ||||
# properly used | # properly used | ||||
assert isinstance(storage, InMemoryStorage) # needed to help mypy | |||||
assert isinstance(dst_storage, InMemoryStorage) | |||||
check_replayed(storage, dst_storage, expected_anonymized=not privileged) | check_replayed(storage, dst_storage, expected_anonymized=not privileged) | ||||
def check_replayed(src, dst, expected_anonymized=False): | |||||
"""Simple utility function to compare the content of 2 in_memory storages | |||||
If expected_anonymized is True, objects from the source storage are anonymized | |||||
before comparing with the destination storage. | |||||
""" | |||||
def maybe_anonymize(attr_, row): | |||||
if expected_anonymized: | |||||
if attr_ == "releases": | |||||
row = dataclasses.replace(row, author=row.author.anonymize()) | |||||
elif attr_ == "revisions": | |||||
row = dataclasses.replace( | |||||
row, | |||||
author=row.author.anonymize(), | |||||
committer=row.committer.anonymize(), | |||||
) | |||||
return row | |||||
for attr_ in ( | |||||
"contents", | |||||
"skipped_contents", | |||||
"directories", | |||||
"revisions", | |||||
"releases", | |||||
"snapshots", | |||||
"origins", | |||||
"origin_visit_statuses", | |||||
"raw_extrinsic_metadata", | |||||
): | |||||
expected_objects = [ | |||||
(id, nullify_ctime(maybe_anonymize(attr_, obj))) | |||||
for id, obj in sorted(getattr(src._cql_runner, f"_{attr_}").iter_all()) | |||||
] | |||||
got_objects = [ | |||||
(id, nullify_ctime(obj)) | |||||
for id, obj in sorted(getattr(dst._cql_runner, f"_{attr_}").iter_all()) | |||||
] | |||||
assert got_objects == expected_objects, f"Mismatch object list for {attr_}" |