Page MenuHomeSoftware Heritage

D3573.diff
No OneTemporary

D3573.diff

diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -3007,215 +3007,216 @@
if "person" in counters:
assert counters["person"] == 3
- def test_content_find_ctime(self, swh_storage):
- cont = data.cont.copy()
- del cont["data"]
- ctime = now()
- cont["ctime"] = ctime
- swh_storage.content_add_metadata([cont])
-
- actually_present = swh_storage.content_find({"sha1": cont["sha1"]})
-
- # check ctime up to one second
- dt = actually_present[0]["ctime"] - ctime
- assert abs(dt.total_seconds()) <= 1
- del actually_present[0]["ctime"]
-
- assert actually_present[0] == {
- "sha1": cont["sha1"],
- "sha256": cont["sha256"],
- "sha1_git": cont["sha1_git"],
- "blake2s256": cont["blake2s256"],
- "length": cont["length"],
- "status": "visible",
- }
+ def test_content_find_ctime(self, swh_storage, sample_data_model):
+ origin_content = sample_data_model["content"][0]
+ ctime = round_to_milliseconds(now())
+ content = attr.evolve(origin_content, data=None, ctime=ctime)
+ swh_storage.content_add_metadata([content])
+
+ actually_present = swh_storage.content_find({"sha1": content.sha1})
+ assert actually_present[0] == content.to_dict()
+
+ def test_content_find_with_present_content(self, swh_storage, sample_data_model):
+ content = sample_data_model["content"][0]
+ expected_content = content.to_dict()
+ del expected_content["data"]
+ del expected_content["ctime"]
- def test_content_find_with_present_content(self, swh_storage):
# 1. with something to find
- cont = data.cont
- swh_storage.content_add([cont, data.cont2])
+ swh_storage.content_add([content])
- actually_present = swh_storage.content_find({"sha1": cont["sha1"]})
+ actually_present = swh_storage.content_find({"sha1": content.sha1})
assert 1 == len(actually_present)
actually_present[0].pop("ctime")
-
- assert actually_present[0] == {
- "sha1": cont["sha1"],
- "sha256": cont["sha256"],
- "sha1_git": cont["sha1_git"],
- "blake2s256": cont["blake2s256"],
- "length": cont["length"],
- "status": "visible",
- }
+ assert actually_present[0] == expected_content
# 2. with something to find
- actually_present = swh_storage.content_find({"sha1_git": cont["sha1_git"]})
+ actually_present = swh_storage.content_find({"sha1_git": content.sha1_git})
assert 1 == len(actually_present)
-
actually_present[0].pop("ctime")
- assert actually_present[0] == {
- "sha1": cont["sha1"],
- "sha256": cont["sha256"],
- "sha1_git": cont["sha1_git"],
- "blake2s256": cont["blake2s256"],
- "length": cont["length"],
- "status": "visible",
- }
+ assert actually_present[0] == expected_content
# 3. with something to find
- actually_present = swh_storage.content_find({"sha256": cont["sha256"]})
+ actually_present = swh_storage.content_find({"sha256": content.sha256})
assert 1 == len(actually_present)
-
actually_present[0].pop("ctime")
- assert actually_present[0] == {
- "sha1": cont["sha1"],
- "sha256": cont["sha256"],
- "sha1_git": cont["sha1_git"],
- "blake2s256": cont["blake2s256"],
- "length": cont["length"],
- "status": "visible",
- }
+ assert actually_present[0] == expected_content
# 4. with something to find
- actually_present = swh_storage.content_find(
- {
- "sha1": cont["sha1"],
- "sha1_git": cont["sha1_git"],
- "sha256": cont["sha256"],
- "blake2s256": cont["blake2s256"],
- }
- )
+ actually_present = swh_storage.content_find(content.hashes())
assert 1 == len(actually_present)
-
actually_present[0].pop("ctime")
- assert actually_present[0] == {
- "sha1": cont["sha1"],
- "sha256": cont["sha256"],
- "sha1_git": cont["sha1_git"],
- "blake2s256": cont["blake2s256"],
- "length": cont["length"],
- "status": "visible",
- }
+ assert actually_present[0] == expected_content
- def test_content_find_with_non_present_content(self, swh_storage):
+ def test_content_find_with_non_present_content(
+ self, swh_storage, sample_data_model
+ ):
+ missing_content = sample_data_model["content_metadata"][0]
# 1. with something that does not exist
- missing_cont = data.missing_cont
-
- actually_present = swh_storage.content_find({"sha1": missing_cont["sha1"]})
+ actually_present = swh_storage.content_find({"sha1": missing_content.sha1})
assert actually_present == []
# 2. with something that does not exist
actually_present = swh_storage.content_find(
- {"sha1_git": missing_cont["sha1_git"]}
+ {"sha1_git": missing_content.sha1_git}
)
-
assert actually_present == []
# 3. with something that does not exist
- actually_present = swh_storage.content_find({"sha256": missing_cont["sha256"]})
-
+ actually_present = swh_storage.content_find({"sha256": missing_content.sha256})
assert actually_present == []
- def test_content_find_with_duplicate_input(self, swh_storage):
- cont1 = data.cont
- duplicate_cont = cont1.copy()
+ def test_content_find_with_duplicate_input(self, swh_storage, sample_data_model):
+ content = sample_data_model["content"][0]
# Create fake data with colliding sha256 and blake2s256
- sha1_array = bytearray(duplicate_cont["sha1"])
+ sha1_array = bytearray(content.sha1)
sha1_array[0] += 1
- duplicate_cont["sha1"] = bytes(sha1_array)
- sha1git_array = bytearray(duplicate_cont["sha1_git"])
+ sha1git_array = bytearray(content.sha1_git)
sha1git_array[0] += 1
- duplicate_cont["sha1_git"] = bytes(sha1git_array)
+ duplicated_content = attr.evolve(
+ content, sha1=bytes(sha1_array), sha1_git=bytes(sha1git_array)
+ )
+
# Inject the data
- swh_storage.content_add([cont1, duplicate_cont])
- finder = {
- "blake2s256": duplicate_cont["blake2s256"],
- "sha256": duplicate_cont["sha256"],
- }
- actual_result = list(swh_storage.content_find(finder))
+ swh_storage.content_add([content, duplicated_content])
+
+ actual_result = list(
+ swh_storage.content_find(
+ {
+ "blake2s256": duplicated_content.blake2s256,
+ "sha256": duplicated_content.sha256,
+ }
+ )
+ )
- cont1.pop("data")
- duplicate_cont.pop("data")
- actual_result[0].pop("ctime")
- actual_result[1].pop("ctime")
+ expected_content = content.to_dict()
+ expected_duplicated_content = duplicated_content.to_dict()
- expected_result = [cont1, duplicate_cont]
+ for key in ["data", "ctime"]: # so we can compare
+ for dict_ in [
+ expected_content,
+ expected_duplicated_content,
+ actual_result[0],
+ actual_result[1],
+ ]:
+ dict_.pop(key, None)
+
+ expected_result = [expected_content, expected_duplicated_content]
for result in expected_result:
assert result in actual_result
- def test_content_find_with_duplicate_sha256(self, swh_storage):
- cont1 = data.cont
- duplicate_cont = cont1.copy()
+ def test_content_find_with_duplicate_sha256(self, swh_storage, sample_data_model):
+ content = sample_data_model["content"][0]
+ hashes = {}
# Create fake data with colliding sha256
for hashalgo in ("sha1", "sha1_git", "blake2s256"):
- value = bytearray(duplicate_cont[hashalgo])
+ value = bytearray(getattr(content, hashalgo))
value[0] += 1
- duplicate_cont[hashalgo] = bytes(value)
- swh_storage.content_add([cont1, duplicate_cont])
+ hashes[hashalgo] = bytes(value)
+
+ duplicated_content = attr.evolve(
+ content,
+ sha1=hashes["sha1"],
+ sha1_git=hashes["sha1_git"],
+ blake2s256=hashes["blake2s256"],
+ )
+ swh_storage.content_add([content, duplicated_content])
+
+ actual_result = list(
+ swh_storage.content_find({"sha256": duplicated_content.sha256})
+ )
- finder = {"sha256": duplicate_cont["sha256"]}
- actual_result = list(swh_storage.content_find(finder))
assert len(actual_result) == 2
- cont1.pop("data")
- duplicate_cont.pop("data")
- actual_result[0].pop("ctime")
- actual_result[1].pop("ctime")
- expected_result = [cont1, duplicate_cont]
- assert expected_result == sorted(actual_result, key=lambda x: x["sha1"])
+ expected_content = content.to_dict()
+ expected_duplicated_content = duplicated_content.to_dict()
+
+ for key in ["data", "ctime"]: # so we can compare
+ for dict_ in [
+ expected_content,
+ expected_duplicated_content,
+ actual_result[0],
+ actual_result[1],
+ ]:
+ dict_.pop(key, None)
+
+ assert sorted(actual_result, key=lambda x: x["sha1"]) == [
+ expected_content,
+ expected_duplicated_content,
+ ]
# Find with both sha256 and blake2s256
- finder = {
- "sha256": duplicate_cont["sha256"],
- "blake2s256": duplicate_cont["blake2s256"],
- }
- actual_result = list(swh_storage.content_find(finder))
+ actual_result = list(
+ swh_storage.content_find(
+ {
+ "sha256": duplicated_content.sha256,
+ "blake2s256": duplicated_content.blake2s256,
+ }
+ )
+ )
+
assert len(actual_result) == 1
actual_result[0].pop("ctime")
- expected_result = [duplicate_cont]
- assert actual_result[0] == duplicate_cont
+ assert actual_result == [expected_duplicated_content]
- def test_content_find_with_duplicate_blake2s256(self, swh_storage):
- cont1 = data.cont
- duplicate_cont = cont1.copy()
+ def test_content_find_with_duplicate_blake2s256(
+ self, swh_storage, sample_data_model
+ ):
+ content = sample_data_model["content"][0]
# Create fake data with colliding sha256 and blake2s256
- sha1_array = bytearray(duplicate_cont["sha1"])
+ sha1_array = bytearray(content.sha1)
sha1_array[0] += 1
- duplicate_cont["sha1"] = bytes(sha1_array)
- sha1git_array = bytearray(duplicate_cont["sha1_git"])
+ sha1git_array = bytearray(content.sha1_git)
sha1git_array[0] += 1
- duplicate_cont["sha1_git"] = bytes(sha1git_array)
- sha256_array = bytearray(duplicate_cont["sha256"])
+ sha256_array = bytearray(content.sha256)
sha256_array[0] += 1
- duplicate_cont["sha256"] = bytes(sha256_array)
- swh_storage.content_add([cont1, duplicate_cont])
- finder = {"blake2s256": duplicate_cont["blake2s256"]}
- actual_result = list(swh_storage.content_find(finder))
- cont1.pop("data")
- duplicate_cont.pop("data")
- actual_result[0].pop("ctime")
- actual_result[1].pop("ctime")
- expected_result = [cont1, duplicate_cont]
+ duplicated_content = attr.evolve(
+ content,
+ sha1=bytes(sha1_array),
+ sha1_git=bytes(sha1git_array),
+ sha256=bytes(sha256_array),
+ )
+
+ swh_storage.content_add([content, duplicated_content])
+
+ actual_result = list(
+ swh_storage.content_find({"blake2s256": duplicated_content.blake2s256})
+ )
+
+ expected_content = content.to_dict()
+ expected_duplicated_content = duplicated_content.to_dict()
+
+ for key in ["data", "ctime"]: # so we can compare
+ for dict_ in [
+ expected_content,
+ expected_duplicated_content,
+ actual_result[0],
+ actual_result[1],
+ ]:
+ dict_.pop(key, None)
+
+ expected_result = [expected_content, expected_duplicated_content]
for result in expected_result:
assert result in actual_result
# Find with both sha256 and blake2s256
- finder = {
- "sha256": duplicate_cont["sha256"],
- "blake2s256": duplicate_cont["blake2s256"],
- }
- actual_result = list(swh_storage.content_find(finder))
+ actual_result = list(
+ swh_storage.content_find(
+ {
+ "sha256": duplicated_content.sha256,
+ "blake2s256": duplicated_content.blake2s256,
+ }
+ )
+ )
actual_result[0].pop("ctime")
-
- expected_result = [duplicate_cont]
- assert expected_result == actual_result
+ assert actual_result == [expected_duplicated_content]
def test_content_find_bad_input(self, swh_storage):
# 1. with bad input

File Metadata

Mime Type
text/plain
Expires
Dec 20 2024, 4:40 AM (11 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3226068

Event Timeline