Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123870
D3556.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
10 KB
Subscribers
None
D3556.diff
View Options
diff --git a/swh/storage/tests/test_api_client.py b/swh/storage/tests/test_api_client.py
--- a/swh/storage/tests/test_api_client.py
+++ b/swh/storage/tests/test_api_client.py
@@ -60,8 +60,8 @@
class TestStorage(_TestStorage):
- def test_content_update(self, swh_storage, app_server):
+ def test_content_update(self, swh_storage, app_server, sample_data_model):
# TODO, journal_writer not supported
swh_storage.journal_writer.journal = None
with patch.object(server.storage.journal_writer, "journal", None):
- super().test_content_update(swh_storage)
+ super().test_content_update(swh_storage, sample_data_model)
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -38,7 +38,6 @@
Snapshot,
)
from swh.model.hypothesis_strategies import objects
-from swh.model.hashutil import hash_to_hex
from swh.storage import get_storage
from swh.storage.converters import origin_url_to_sha1 as sha1
from swh.storage.exc import HashCollision, StorageArgumentException
@@ -282,60 +281,60 @@
if type(cm.value) == psycopg2.IntegrityError:
assert cm.exception.pgcode == psycopg2.errorcodes.NOT_NULL_VIOLATION
- def test_content_get_missing(self, swh_storage):
- cont = data.cont
+ def test_content_get_missing(self, swh_storage, sample_data_model):
+ cont, cont2 = sample_data_model["content"][:2]
swh_storage.content_add([cont])
# Query a single missing content
- results = list(swh_storage.content_get([data.cont2["sha1"]]))
+ results = list(swh_storage.content_get([cont2.sha1]))
assert results == [None]
# Check content_get does not abort after finding a missing content
- results = list(swh_storage.content_get([data.cont["sha1"], data.cont2["sha1"]]))
- assert results == [{"sha1": cont["sha1"], "data": cont["data"]}, None]
+ results = list(swh_storage.content_get([cont.sha1, cont2.sha1]))
+ assert results == [{"sha1": cont.sha1, "data": cont.data}, None]
# Check content_get does not discard found countent when it finds
# a missing content.
- results = list(swh_storage.content_get([data.cont2["sha1"], data.cont["sha1"]]))
- assert results == [None, {"sha1": cont["sha1"], "data": cont["data"]}]
+ results = list(swh_storage.content_get([cont2.sha1, cont.sha1]))
+ assert results == [None, {"sha1": cont.sha1, "data": cont.data}]
- def test_content_add_different_input(self, swh_storage):
- cont = data.cont
- cont2 = data.cont2
+ def test_content_add_different_input(self, swh_storage, sample_data_model):
+ cont, cont2 = sample_data_model["content"][:2]
actual_result = swh_storage.content_add([cont, cont2])
assert actual_result == {
"content:add": 2,
- "content:add:bytes": cont["length"] + cont2["length"],
+ "content:add:bytes": cont.length + cont2.length,
}
- def test_content_add_twice(self, swh_storage):
- actual_result = swh_storage.content_add([data.cont])
+ def test_content_add_twice(self, swh_storage, sample_data_model):
+ cont, cont2 = sample_data_model["content"][:2]
+
+ actual_result = swh_storage.content_add([cont])
assert actual_result == {
"content:add": 1,
- "content:add:bytes": data.cont["length"],
+ "content:add:bytes": cont.length,
}
assert len(swh_storage.journal_writer.journal.objects) == 1
- actual_result = swh_storage.content_add([data.cont, data.cont2])
+ actual_result = swh_storage.content_add([cont, cont2])
assert actual_result == {
"content:add": 1,
- "content:add:bytes": data.cont2["length"],
+ "content:add:bytes": cont2.length,
}
assert 2 <= len(swh_storage.journal_writer.journal.objects) <= 3
- assert len(swh_storage.content_find(data.cont)) == 1
- assert len(swh_storage.content_find(data.cont2)) == 1
+ assert len(swh_storage.content_find(cont.to_dict())) == 1
+ assert len(swh_storage.content_find(cont2.to_dict())) == 1
- def test_content_add_collision(self, swh_storage):
- cont1 = data.cont
+ def test_content_add_collision(self, swh_storage, sample_data_model):
+ cont1 = sample_data_model["content"][0]
# create (corrupted) content with same sha1{,_git} but != sha256
- cont1b = cont1.copy()
- sha256_array = bytearray(cont1b["sha256"])
+ sha256_array = bytearray(cont1.sha256)
sha256_array[0] += 1
- cont1b["sha256"] = bytes(sha256_array)
+ cont1b = attr.evolve(cont1, sha256=bytes(sha256_array))
with pytest.raises(HashCollision) as cm:
swh_storage.content_add([cont1, cont1b])
@@ -344,56 +343,61 @@
actual_algo = exc.algo
assert actual_algo in ["sha1", "sha1_git", "blake2s256"]
actual_id = exc.hash_id
- assert actual_id == hash_to_hex(cont1[actual_algo])
+ assert actual_id == getattr(cont1, actual_algo).hex()
collisions = exc.args[2]
assert len(collisions) == 2
assert collisions == [
- content_hex_hashes(Content.from_dict(cont1).hashes()),
- content_hex_hashes(Content.from_dict(cont1b).hashes()),
+ content_hex_hashes(cont1.hashes()),
+ content_hex_hashes(cont1b.hashes()),
]
assert exc.colliding_content_hashes() == [
- Content.from_dict(cont1).hashes(),
- Content.from_dict(cont1b).hashes(),
+ cont1.hashes(),
+ cont1b.hashes(),
]
- def test_content_add_duplicate(self, swh_storage):
- swh_storage.content_add([data.cont, data.cont])
+ def test_content_add_duplicate(self, swh_storage, sample_data_model):
+ cont = sample_data_model["content"][0]
+ swh_storage.content_add([cont, cont])
- assert list(swh_storage.content_get([data.cont["sha1"]])) == [
- {"sha1": data.cont["sha1"], "data": data.cont["data"]}
+ assert list(swh_storage.content_get([cont.sha1])) == [
+ {"sha1": cont.sha1, "data": cont.data}
]
- def test_content_update(self, swh_storage):
+ def test_content_update(self, swh_storage, sample_data_model):
+ cont1 = sample_data_model["content"][0]
+
if hasattr(swh_storage, "storage"):
swh_storage.journal_writer.journal = None # TODO, not supported
- cont = copy.deepcopy(data.cont)
+ swh_storage.content_add([cont1])
- swh_storage.content_add([cont])
# alter the sha1_git for example
- cont["sha1_git"] = hash_to_bytes("3a60a5275d0333bf13468e8b3dcab90f4046e654")
+ cont1b = attr.evolve(
+ cont1, sha1_git=hash_to_bytes("3a60a5275d0333bf13468e8b3dcab90f4046e654")
+ )
- swh_storage.content_update([cont], keys=["sha1_git"])
+ swh_storage.content_update([cont1b.to_dict()], keys=["sha1_git"])
- results = swh_storage.content_get_metadata([cont["sha1"]])
- del cont["data"]
- assert tuple(results[cont["sha1"]]) == (cont,)
+ results = swh_storage.content_get_metadata([cont1.sha1])
- def test_content_add_metadata(self, swh_storage):
- cont = data.cont
- del cont["data"]
- cont["ctime"] = now()
+ expected_content = attr.evolve(cont1b, data=None).to_dict()
+ del expected_content["ctime"]
+ assert tuple(results[cont1.sha1]) == (expected_content,)
+
+ def test_content_add_metadata(self, swh_storage, sample_data_model):
+ cont = attr.evolve(sample_data_model["content"][0], data=None, ctime=now())
actual_result = swh_storage.content_add_metadata([cont])
assert actual_result == {
"content:add": 1,
}
- expected_cont = cont.copy()
+ expected_cont = cont.to_dict()
del expected_cont["ctime"]
- assert tuple(
- swh_storage.content_get_metadata([cont["sha1"]])[cont["sha1"]]
- ) == (expected_cont,)
+
+ assert tuple(swh_storage.content_get_metadata([cont.sha1])[cont.sha1]) == (
+ expected_cont,
+ )
contents = [
obj
for (obj_type, obj) in swh_storage.journal_writer.journal.objects
@@ -401,33 +405,26 @@
]
assert len(contents) == 1
for obj in contents:
- obj_d = obj.to_dict()
- del obj_d["ctime"]
- assert obj_d == expected_cont
+ obj = attr.evolve(obj, ctime=None)
+ assert obj == cont
- def test_content_add_metadata_different_input(self, swh_storage):
- cont = data.cont
- del cont["data"]
- cont["ctime"] = now()
- cont2 = data.cont2
- del cont2["data"]
- cont2["ctime"] = now()
+ def test_content_add_metadata_different_input(self, swh_storage, sample_data_model):
+ contents = sample_data_model["content"][:2]
+ cont = attr.evolve(contents[0], data=None, ctime=now())
+ cont2 = attr.evolve(contents[1], data=None, ctime=now())
actual_result = swh_storage.content_add_metadata([cont, cont2])
assert actual_result == {
"content:add": 2,
}
- def test_content_add_metadata_collision(self, swh_storage):
- cont1 = data.cont
- del cont1["data"]
- cont1["ctime"] = now()
+ def test_content_add_metadata_collision(self, swh_storage, sample_data_model):
+ cont1 = attr.evolve(sample_data_model["content"][0], data=None, ctime=now())
# create (corrupted) content with same sha1{,_git} but != sha256
- cont1b = cont1.copy()
- sha1_git_array = bytearray(cont1b["sha256"])
+ sha1_git_array = bytearray(cont1.sha256)
sha1_git_array[0] += 1
- cont1b["sha256"] = bytes(sha1_git_array)
+ cont1b = attr.evolve(cont1, sha256=bytes(sha1_git_array))
with pytest.raises(HashCollision) as cm:
swh_storage.content_add_metadata([cont1, cont1b])
@@ -436,16 +433,16 @@
actual_algo = exc.algo
assert actual_algo in ["sha1", "sha1_git", "blake2s256"]
actual_id = exc.hash_id
- assert actual_id == hash_to_hex(cont1[actual_algo])
+ assert actual_id == getattr(cont1, actual_algo).hex()
collisions = exc.args[2]
assert len(collisions) == 2
assert collisions == [
- content_hex_hashes(Content.from_dict(cont1).hashes()),
- content_hex_hashes(Content.from_dict(cont1b).hashes()),
+ content_hex_hashes(cont1.hashes()),
+ content_hex_hashes(cont1b.hashes()),
]
assert exc.colliding_content_hashes() == [
- Content.from_dict(cont1).hashes(),
- Content.from_dict(cont1b).hashes(),
+ cont1.hashes(),
+ cont1b.hashes(),
]
def test_skipped_content_add(self, swh_storage):
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 20 2024, 5:13 AM (11 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216946
Attached To
D3556: test.storage: content_add_metadata: Use data model object
Event Timeline
Log In to Comment