diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -3007,215 +3007,216 @@ if "person" in counters: assert counters["person"] == 3 - def test_content_find_ctime(self, swh_storage): - cont = data.cont.copy() - del cont["data"] - ctime = now() - cont["ctime"] = ctime - swh_storage.content_add_metadata([cont]) - - actually_present = swh_storage.content_find({"sha1": cont["sha1"]}) - - # check ctime up to one second - dt = actually_present[0]["ctime"] - ctime - assert abs(dt.total_seconds()) <= 1 - del actually_present[0]["ctime"] - - assert actually_present[0] == { - "sha1": cont["sha1"], - "sha256": cont["sha256"], - "sha1_git": cont["sha1_git"], - "blake2s256": cont["blake2s256"], - "length": cont["length"], - "status": "visible", - } + def test_content_find_ctime(self, swh_storage, sample_data_model): + origin_content = sample_data_model["content"][0] + ctime = round_to_milliseconds(now()) + content = attr.evolve(origin_content, data=None, ctime=ctime) + swh_storage.content_add_metadata([content]) + + actually_present = swh_storage.content_find({"sha1": content.sha1}) + assert actually_present[0] == content.to_dict() + + def test_content_find_with_present_content(self, swh_storage, sample_data_model): + content = sample_data_model["content"][0] + expected_content = content.to_dict() + del expected_content["data"] + del expected_content["ctime"] - def test_content_find_with_present_content(self, swh_storage): # 1. with something to find - cont = data.cont - swh_storage.content_add([cont, data.cont2]) + swh_storage.content_add([content]) - actually_present = swh_storage.content_find({"sha1": cont["sha1"]}) + actually_present = swh_storage.content_find({"sha1": content.sha1}) assert 1 == len(actually_present) actually_present[0].pop("ctime") - - assert actually_present[0] == { - "sha1": cont["sha1"], - "sha256": cont["sha256"], - "sha1_git": cont["sha1_git"], - "blake2s256": cont["blake2s256"], - "length": cont["length"], - "status": "visible", - } + assert actually_present[0] == expected_content # 2. with something to find - actually_present = swh_storage.content_find({"sha1_git": cont["sha1_git"]}) + actually_present = swh_storage.content_find({"sha1_git": content.sha1_git}) assert 1 == len(actually_present) - actually_present[0].pop("ctime") - assert actually_present[0] == { - "sha1": cont["sha1"], - "sha256": cont["sha256"], - "sha1_git": cont["sha1_git"], - "blake2s256": cont["blake2s256"], - "length": cont["length"], - "status": "visible", - } + assert actually_present[0] == expected_content # 3. with something to find - actually_present = swh_storage.content_find({"sha256": cont["sha256"]}) + actually_present = swh_storage.content_find({"sha256": content.sha256}) assert 1 == len(actually_present) - actually_present[0].pop("ctime") - assert actually_present[0] == { - "sha1": cont["sha1"], - "sha256": cont["sha256"], - "sha1_git": cont["sha1_git"], - "blake2s256": cont["blake2s256"], - "length": cont["length"], - "status": "visible", - } + assert actually_present[0] == expected_content # 4. with something to find - actually_present = swh_storage.content_find( - { - "sha1": cont["sha1"], - "sha1_git": cont["sha1_git"], - "sha256": cont["sha256"], - "blake2s256": cont["blake2s256"], - } - ) + actually_present = swh_storage.content_find(content.hashes()) assert 1 == len(actually_present) - actually_present[0].pop("ctime") - assert actually_present[0] == { - "sha1": cont["sha1"], - "sha256": cont["sha256"], - "sha1_git": cont["sha1_git"], - "blake2s256": cont["blake2s256"], - "length": cont["length"], - "status": "visible", - } + assert actually_present[0] == expected_content - def test_content_find_with_non_present_content(self, swh_storage): + def test_content_find_with_non_present_content( + self, swh_storage, sample_data_model + ): + missing_content = sample_data_model["content_metadata"][0] # 1. with something that does not exist - missing_cont = data.missing_cont - - actually_present = swh_storage.content_find({"sha1": missing_cont["sha1"]}) + actually_present = swh_storage.content_find({"sha1": missing_content.sha1}) assert actually_present == [] # 2. with something that does not exist actually_present = swh_storage.content_find( - {"sha1_git": missing_cont["sha1_git"]} + {"sha1_git": missing_content.sha1_git} ) - assert actually_present == [] # 3. with something that does not exist - actually_present = swh_storage.content_find({"sha256": missing_cont["sha256"]}) - + actually_present = swh_storage.content_find({"sha256": missing_content.sha256}) assert actually_present == [] - def test_content_find_with_duplicate_input(self, swh_storage): - cont1 = data.cont - duplicate_cont = cont1.copy() + def test_content_find_with_duplicate_input(self, swh_storage, sample_data_model): + content = sample_data_model["content"][0] # Create fake data with colliding sha256 and blake2s256 - sha1_array = bytearray(duplicate_cont["sha1"]) + sha1_array = bytearray(content.sha1) sha1_array[0] += 1 - duplicate_cont["sha1"] = bytes(sha1_array) - sha1git_array = bytearray(duplicate_cont["sha1_git"]) + sha1git_array = bytearray(content.sha1_git) sha1git_array[0] += 1 - duplicate_cont["sha1_git"] = bytes(sha1git_array) + duplicated_content = attr.evolve( + content, sha1=bytes(sha1_array), sha1_git=bytes(sha1git_array) + ) + # Inject the data - swh_storage.content_add([cont1, duplicate_cont]) - finder = { - "blake2s256": duplicate_cont["blake2s256"], - "sha256": duplicate_cont["sha256"], - } - actual_result = list(swh_storage.content_find(finder)) + swh_storage.content_add([content, duplicated_content]) + + actual_result = list( + swh_storage.content_find( + { + "blake2s256": duplicated_content.blake2s256, + "sha256": duplicated_content.sha256, + } + ) + ) - cont1.pop("data") - duplicate_cont.pop("data") - actual_result[0].pop("ctime") - actual_result[1].pop("ctime") + expected_content = content.to_dict() + expected_duplicated_content = duplicated_content.to_dict() - expected_result = [cont1, duplicate_cont] + for key in ["data", "ctime"]: # so we can compare + for dict_ in [ + expected_content, + expected_duplicated_content, + actual_result[0], + actual_result[1], + ]: + dict_.pop(key, None) + + expected_result = [expected_content, expected_duplicated_content] for result in expected_result: assert result in actual_result - def test_content_find_with_duplicate_sha256(self, swh_storage): - cont1 = data.cont - duplicate_cont = cont1.copy() + def test_content_find_with_duplicate_sha256(self, swh_storage, sample_data_model): + content = sample_data_model["content"][0] + hashes = {} # Create fake data with colliding sha256 for hashalgo in ("sha1", "sha1_git", "blake2s256"): - value = bytearray(duplicate_cont[hashalgo]) + value = bytearray(getattr(content, hashalgo)) value[0] += 1 - duplicate_cont[hashalgo] = bytes(value) - swh_storage.content_add([cont1, duplicate_cont]) + hashes[hashalgo] = bytes(value) + + duplicated_content = attr.evolve( + content, + sha1=hashes["sha1"], + sha1_git=hashes["sha1_git"], + blake2s256=hashes["blake2s256"], + ) + swh_storage.content_add([content, duplicated_content]) + + actual_result = list( + swh_storage.content_find({"sha256": duplicated_content.sha256}) + ) - finder = {"sha256": duplicate_cont["sha256"]} - actual_result = list(swh_storage.content_find(finder)) assert len(actual_result) == 2 - cont1.pop("data") - duplicate_cont.pop("data") - actual_result[0].pop("ctime") - actual_result[1].pop("ctime") - expected_result = [cont1, duplicate_cont] - assert expected_result == sorted(actual_result, key=lambda x: x["sha1"]) + expected_content = content.to_dict() + expected_duplicated_content = duplicated_content.to_dict() + + for key in ["data", "ctime"]: # so we can compare + for dict_ in [ + expected_content, + expected_duplicated_content, + actual_result[0], + actual_result[1], + ]: + dict_.pop(key, None) + + assert sorted(actual_result, key=lambda x: x["sha1"]) == [ + expected_content, + expected_duplicated_content, + ] # Find with both sha256 and blake2s256 - finder = { - "sha256": duplicate_cont["sha256"], - "blake2s256": duplicate_cont["blake2s256"], - } - actual_result = list(swh_storage.content_find(finder)) + actual_result = list( + swh_storage.content_find( + { + "sha256": duplicated_content.sha256, + "blake2s256": duplicated_content.blake2s256, + } + ) + ) + assert len(actual_result) == 1 actual_result[0].pop("ctime") - expected_result = [duplicate_cont] - assert actual_result[0] == duplicate_cont + assert actual_result == [expected_duplicated_content] - def test_content_find_with_duplicate_blake2s256(self, swh_storage): - cont1 = data.cont - duplicate_cont = cont1.copy() + def test_content_find_with_duplicate_blake2s256( + self, swh_storage, sample_data_model + ): + content = sample_data_model["content"][0] # Create fake data with colliding sha256 and blake2s256 - sha1_array = bytearray(duplicate_cont["sha1"]) + sha1_array = bytearray(content.sha1) sha1_array[0] += 1 - duplicate_cont["sha1"] = bytes(sha1_array) - sha1git_array = bytearray(duplicate_cont["sha1_git"]) + sha1git_array = bytearray(content.sha1_git) sha1git_array[0] += 1 - duplicate_cont["sha1_git"] = bytes(sha1git_array) - sha256_array = bytearray(duplicate_cont["sha256"]) + sha256_array = bytearray(content.sha256) sha256_array[0] += 1 - duplicate_cont["sha256"] = bytes(sha256_array) - swh_storage.content_add([cont1, duplicate_cont]) - finder = {"blake2s256": duplicate_cont["blake2s256"]} - actual_result = list(swh_storage.content_find(finder)) - cont1.pop("data") - duplicate_cont.pop("data") - actual_result[0].pop("ctime") - actual_result[1].pop("ctime") - expected_result = [cont1, duplicate_cont] + duplicated_content = attr.evolve( + content, + sha1=bytes(sha1_array), + sha1_git=bytes(sha1git_array), + sha256=bytes(sha256_array), + ) + + swh_storage.content_add([content, duplicated_content]) + + actual_result = list( + swh_storage.content_find({"blake2s256": duplicated_content.blake2s256}) + ) + + expected_content = content.to_dict() + expected_duplicated_content = duplicated_content.to_dict() + + for key in ["data", "ctime"]: # so we can compare + for dict_ in [ + expected_content, + expected_duplicated_content, + actual_result[0], + actual_result[1], + ]: + dict_.pop(key, None) + + expected_result = [expected_content, expected_duplicated_content] for result in expected_result: assert result in actual_result # Find with both sha256 and blake2s256 - finder = { - "sha256": duplicate_cont["sha256"], - "blake2s256": duplicate_cont["blake2s256"], - } - actual_result = list(swh_storage.content_find(finder)) + actual_result = list( + swh_storage.content_find( + { + "sha256": duplicated_content.sha256, + "blake2s256": duplicated_content.blake2s256, + } + ) + ) actual_result[0].pop("ctime") - - expected_result = [duplicate_cont] - assert expected_result == actual_result + assert actual_result == [expected_duplicated_content] def test_content_find_bad_input(self, swh_storage): # 1. with bad input