Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/tests/utils.py
Show First 20 Lines • Show All 579 Lines • ▼ Show 20 Lines | for (obj_id, content) in OBJ_STORAGE_DATA.items(): | ||||
sha256=content_hashes["sha256"], | sha256=content_hashes["sha256"], | ||||
blake2s256=content_hashes["blake2s256"], | blake2s256=content_hashes["blake2s256"], | ||||
) | ) | ||||
) | ) | ||||
storage.content_add(contents) | storage.content_add(contents) | ||||
class CommonContentIndexerTest(metaclass=abc.ABCMeta): | class CommonContentIndexerTest(metaclass=abc.ABCMeta): | ||||
legacy_get_format = False | |||||
"""True if and only if the tested indexer uses the legacy format. | |||||
see: https://forge.softwareheritage.org/T1433 | |||||
""" | |||||
def get_indexer_results(self, ids): | def get_indexer_results(self, ids): | ||||
"""Override this for indexers that don't have a mock storage.""" | """Override this for indexers that don't have a mock storage.""" | ||||
return self.indexer.idx_storage.state | return self.indexer.idx_storage.state | ||||
def assert_legacy_results_ok(self, sha1s, expected_results=None): | |||||
# XXX old format, remove this when all endpoints are | |||||
# updated to the new one | |||||
# see: https://forge.softwareheritage.org/T1433 | |||||
sha1s = [ | |||||
sha1 if isinstance(sha1, bytes) else hash_to_bytes(sha1) for sha1 in sha1s | |||||
] | |||||
actual_results = list(self.get_indexer_results(sha1s)) | |||||
if expected_results is None: | |||||
expected_results = self.expected_results | |||||
self.assertEqual( | |||||
len(expected_results), | |||||
len(actual_results), | |||||
(expected_results, actual_results), | |||||
) | |||||
for indexed_data in actual_results: | |||||
_id = indexed_data["id"] | |||||
expected_data = expected_results[hashutil.hash_to_hex(_id)].copy() | |||||
expected_data["id"] = _id | |||||
self.assertEqual(indexed_data, expected_data) | |||||
def assert_results_ok(self, sha1s, expected_results=None): | def assert_results_ok(self, sha1s, expected_results=None): | ||||
if self.legacy_get_format: | |||||
self.assert_legacy_results_ok(sha1s, expected_results) | |||||
return | |||||
sha1s = [ | sha1s = [ | ||||
sha1 if isinstance(sha1, bytes) else hash_to_bytes(sha1) for sha1 in sha1s | sha1 if isinstance(sha1, bytes) else hash_to_bytes(sha1) for sha1 in sha1s | ||||
] | ] | ||||
actual_results = list(self.get_indexer_results(sha1s)) | actual_results = list(self.get_indexer_results(sha1s)) | ||||
if expected_results is None: | if expected_results is None: | ||||
expected_results = self.expected_results | expected_results = self.expected_results | ||||
self.assertEqual( | self.assertEqual(expected_results, actual_results) | ||||
sum(res is not None for res in expected_results.values()), | |||||
sum(sum(map(len, res.values())) for res in actual_results), | |||||
(expected_results, actual_results), | |||||
) | |||||
for indexed_data in actual_results: | |||||
(_id, indexed_data) = list(indexed_data.items())[0] | |||||
if expected_results.get(hashutil.hash_to_hex(_id)) is None: | |||||
self.assertEqual(indexed_data, []) | |||||
else: | |||||
expected_data = expected_results[hashutil.hash_to_hex(_id)].copy() | |||||
expected_data = [expected_data] | |||||
self.assertEqual(indexed_data, expected_data) | |||||
def test_index(self): | def test_index(self): | ||||
"""Known sha1 have their data indexed | """Known sha1 have their data indexed | ||||
""" | """ | ||||
sha1s = [self.id0, self.id1, self.id2] | sha1s = [self.id0, self.id1, self.id2] | ||||
# when | # when | ||||
Show All 13 Lines | def test_index_one_unknown_sha1(self): | ||||
"799a5ef812c53907562fe379d4b3851e69c7cb15", # unknown | "799a5ef812c53907562fe379d4b3851e69c7cb15", # unknown | ||||
"800a5ef812c53907562fe379d4b3851e69c7cb15", | "800a5ef812c53907562fe379d4b3851e69c7cb15", | ||||
] # unknown | ] # unknown | ||||
# when | # when | ||||
self.indexer.run(sha1s, policy_update="update-dups") | self.indexer.run(sha1s, policy_update="update-dups") | ||||
# then | # then | ||||
expected_results = { | # TODO: unconditionally use res.id when all endpoints moved away from dicts | ||||
k: v for k, v in self.expected_results.items() if k in sha1s | expected_results = [ | ||||
} | res | ||||
for res in self.expected_results | |||||
if hashutil.hash_to_hex(getattr(res, "id", None) or res["id"]) in sha1s | |||||
] | |||||
self.assert_results_ok(sha1s, expected_results) | self.assert_results_ok(sha1s, expected_results) | ||||
class CommonContentIndexerPartitionTest: | class CommonContentIndexerPartitionTest: | ||||
"""Allows to factorize tests on range indexer. | """Allows to factorize tests on range indexer. | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 82 Lines • Show Last 20 Lines |