Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_cassandra.py
Show First 20 Lines • Show All 307 Lines • ▼ Show 20 Lines | def test_content_find_murmur3_collision(self, swh_storage, mocker, sample_data): | ||||
Row(**{col: getattr(cont, col) for col in cols}) | Row(**{col: getattr(cont, col) for col in cols}) | ||||
for cont in [cont, cont2] | for cont in [cont, cont2] | ||||
] | ] | ||||
mocker.patch.object( | mocker.patch.object( | ||||
swh_storage._cql_runner, "content_get_from_token", mock_cgft | swh_storage._cql_runner, "content_get_from_token", mock_cgft | ||||
) | ) | ||||
expected_cont = attr.evolve(cont, data=None).to_dict() | expected_content = attr.evolve(cont, data=None) | ||||
actual_result = swh_storage.content_find({"sha1": cont.sha1}) | actual_result = swh_storage.content_find({"sha1": cont.sha1}) | ||||
assert called == 2 | assert called == 2 | ||||
# but cont2 should be filtered out | # but cont2 should be filtered out | ||||
assert actual_result == [expected_cont] | assert actual_result == [expected_content] | ||||
@pytest.mark.skip("content_update is not yet implemented for Cassandra") | @pytest.mark.skip("content_update is not yet implemented for Cassandra") | ||||
def test_content_update(self): | def test_content_update(self): | ||||
pass | pass | ||||
@pytest.mark.skip( | @pytest.mark.skip( | ||||
'The "person" table of the pgsql is a legacy thing, and not ' | 'The "person" table of the pgsql is a legacy thing, and not ' | ||||
"supported by the cassandra backend." | "supported by the cassandra backend." | ||||
▲ Show 20 Lines • Show All 57 Lines • Show Last 20 Lines |