Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7124570
D3571.id12574.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
11 KB
Subscribers
None
D3571.id12574.diff
View Options
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -2346,38 +2346,39 @@
)
assert actual_origin_visit3 == ovs3
- def test_person_fullname_unicity(self, swh_storage):
- # given (person injection through revisions for example)
- revision = data.revision
-
+ def test_person_fullname_unicity(self, swh_storage, sample_data_model):
+ revision, rev2 = sample_data_model["revision"][0:2]
# create a revision with same committer fullname but wo name and email
- revision2 = copy.deepcopy(data.revision2)
- revision2["committer"] = dict(revision["committer"])
- revision2["committer"]["email"] = None
- revision2["committer"]["name"] = None
+ revision2 = attr.evolve(
+ rev2,
+ committer=Person(
+ fullname=revision.committer.fullname, name=None, email=None
+ ),
+ )
- swh_storage.revision_add([revision])
- swh_storage.revision_add([revision2])
+ swh_storage.revision_add([revision, revision2])
# when getting added revisions
- revisions = list(swh_storage.revision_get([revision["id"], revision2["id"]]))
+ revisions = list(swh_storage.revision_get([revision.id, revision2.id]))
- # then
- # check committers are the same
+ # then check committers are the same
assert revisions[0]["committer"] == revisions[1]["committer"]
- def test_snapshot_add_get_empty(self, swh_storage):
- swh_storage.origin_add([data.origin])
- origin_url = data.origin["url"]
+ def test_snapshot_add_get_empty(self, swh_storage, sample_data_model):
+ empty_snapshot = sample_data_model["snapshot"][1]
+ empty_snapshot_dict = empty_snapshot.to_dict()
+
+ origin = sample_data_model["origin"][0]
+ swh_storage.origin_add([origin])
ov1 = swh_storage.origin_visit_add(
[
OriginVisit(
- origin=origin_url, date=data.date_visit1, type=data.type_visit1,
+ origin=origin.url, date=data.date_visit1, type=data.type_visit1,
)
]
)[0]
- actual_result = swh_storage.snapshot_add([data.empty_snapshot])
+ actual_result = swh_storage.snapshot_add([empty_snapshot])
assert actual_result == {"snapshot:add": 1}
date_now = now()
@@ -2385,24 +2386,24 @@
swh_storage.origin_visit_status_add(
[
OriginVisitStatus(
- origin=origin_url,
+ origin=origin.url,
visit=ov1.visit,
date=date_now,
status="full",
- snapshot=data.empty_snapshot["id"],
+ snapshot=empty_snapshot.id,
)
]
)
- by_id = swh_storage.snapshot_get(data.empty_snapshot["id"])
- assert by_id == {**data.empty_snapshot, "next_branch": None}
+ by_id = swh_storage.snapshot_get(empty_snapshot.id)
+ assert by_id == {**empty_snapshot_dict, "next_branch": None}
- by_ov = swh_storage.snapshot_get_by_origin_visit(origin_url, ov1.visit)
- assert by_ov == {**data.empty_snapshot, "next_branch": None}
+ by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, ov1.visit)
+ assert by_ov == {**empty_snapshot_dict, "next_branch": None}
ovs1 = OriginVisitStatus.from_dict(
{
- "origin": origin_url,
+ "origin": origin.url,
"date": data.date_visit1,
"visit": ov1.visit,
"status": "created",
@@ -2412,77 +2413,82 @@
)
ovs2 = OriginVisitStatus.from_dict(
{
- "origin": origin_url,
+ "origin": origin.url,
"date": date_now,
"visit": ov1.visit,
"status": "full",
"metadata": None,
- "snapshot": data.empty_snapshot["id"],
+ "snapshot": empty_snapshot.id,
}
)
actual_objects = list(swh_storage.journal_writer.journal.objects)
expected_objects = [
- ("origin", Origin.from_dict(data.origin)),
+ ("origin", origin),
("origin_visit", ov1),
("origin_visit_status", ovs1,),
- ("snapshot", Snapshot.from_dict(data.empty_snapshot)),
+ ("snapshot", empty_snapshot),
("origin_visit_status", ovs2,),
]
for obj in expected_objects:
assert obj in actual_objects
- def test_snapshot_add_get_complete(self, swh_storage):
- origin_url = data.origin["url"]
- swh_storage.origin_add([data.origin])
+ def test_snapshot_add_get_complete(self, swh_storage, sample_data_model):
+ complete_snapshot = sample_data_model["snapshot"][2]
+ complete_snapshot_dict = complete_snapshot.to_dict()
+ origin = sample_data_model["origin"][0]
+
+ swh_storage.origin_add([origin])
visit = OriginVisit(
- origin=origin_url, date=data.date_visit1, type=data.type_visit1,
+ origin=origin.url, date=data.date_visit1, type=data.type_visit1,
)
origin_visit1 = swh_storage.origin_visit_add([visit])[0]
visit_id = origin_visit1.visit
- actual_result = swh_storage.snapshot_add([data.complete_snapshot])
+ actual_result = swh_storage.snapshot_add([complete_snapshot])
swh_storage.origin_visit_status_add(
[
OriginVisitStatus(
- origin=origin_url,
+ origin=origin.url,
visit=origin_visit1.visit,
date=now(),
status="ongoing",
- snapshot=data.complete_snapshot["id"],
+ snapshot=complete_snapshot.id,
)
]
)
assert actual_result == {"snapshot:add": 1}
- by_id = swh_storage.snapshot_get(data.complete_snapshot["id"])
- assert by_id == {**data.complete_snapshot, "next_branch": None}
+ by_id = swh_storage.snapshot_get(complete_snapshot.id)
+ assert by_id == {**complete_snapshot_dict, "next_branch": None}
- by_ov = swh_storage.snapshot_get_by_origin_visit(origin_url, visit_id)
- assert by_ov == {**data.complete_snapshot, "next_branch": None}
+ by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, visit_id)
+ assert by_ov == {**complete_snapshot_dict, "next_branch": None}
- def test_snapshot_add_many(self, swh_storage):
- actual_result = swh_storage.snapshot_add(
- [data.snapshot, data.complete_snapshot]
- )
+ def test_snapshot_add_many(self, swh_storage, sample_data_model):
+ snapshot, _, complete_snapshot = sample_data_model["snapshot"][:3]
+
+ actual_result = swh_storage.snapshot_add([snapshot, complete_snapshot])
assert actual_result == {"snapshot:add": 2}
- assert {
- **data.complete_snapshot,
+ assert swh_storage.snapshot_get(complete_snapshot.id) == {
+ **complete_snapshot.to_dict(),
"next_branch": None,
- } == swh_storage.snapshot_get(data.complete_snapshot["id"])
+ }
- assert {**data.snapshot, "next_branch": None} == swh_storage.snapshot_get(
- data.snapshot["id"]
- )
+ assert swh_storage.snapshot_get(snapshot.id) == {
+ **snapshot.to_dict(),
+ "next_branch": None,
+ }
swh_storage.refresh_stat_counters()
assert swh_storage.stat_counters()["snapshot"] == 2
- def test_snapshot_add_many_from_generator(self, swh_storage):
+ def test_snapshot_add_many_from_generator(self, swh_storage, sample_data_model):
+ snapshot, _, complete_snapshot = sample_data_model["snapshot"][:3]
+
def _snp_gen():
- yield data.snapshot
- yield data.complete_snapshot
+ yield from [snapshot, complete_snapshot]
actual_result = swh_storage.snapshot_add(_snp_gen())
assert actual_result == {"snapshot:add": 2}
@@ -2490,59 +2496,65 @@
swh_storage.refresh_stat_counters()
assert swh_storage.stat_counters()["snapshot"] == 2
- def test_snapshot_add_many_incremental(self, swh_storage):
- actual_result = swh_storage.snapshot_add([data.complete_snapshot])
+ def test_snapshot_add_many_incremental(self, swh_storage, sample_data_model):
+ snapshot, _, complete_snapshot = sample_data_model["snapshot"][:3]
+
+ actual_result = swh_storage.snapshot_add([complete_snapshot])
assert actual_result == {"snapshot:add": 1}
- actual_result2 = swh_storage.snapshot_add(
- [data.snapshot, data.complete_snapshot]
- )
+ actual_result2 = swh_storage.snapshot_add([snapshot, complete_snapshot])
assert actual_result2 == {"snapshot:add": 1}
- assert {
- **data.complete_snapshot,
+ assert swh_storage.snapshot_get(complete_snapshot.id) == {
+ **complete_snapshot.to_dict(),
"next_branch": None,
- } == swh_storage.snapshot_get(data.complete_snapshot["id"])
+ }
- assert {**data.snapshot, "next_branch": None} == swh_storage.snapshot_get(
- data.snapshot["id"]
- )
+ assert swh_storage.snapshot_get(snapshot.id) == {
+ **snapshot.to_dict(),
+ "next_branch": None,
+ }
- def test_snapshot_add_twice(self, swh_storage):
- actual_result = swh_storage.snapshot_add([data.empty_snapshot])
+ def test_snapshot_add_twice(self, swh_storage, sample_data_model):
+ snapshot, empty_snapshot = sample_data_model["snapshot"][:2]
+
+ actual_result = swh_storage.snapshot_add([empty_snapshot])
assert actual_result == {"snapshot:add": 1}
assert list(swh_storage.journal_writer.journal.objects) == [
- ("snapshot", Snapshot.from_dict(data.empty_snapshot))
+ ("snapshot", empty_snapshot)
]
- actual_result = swh_storage.snapshot_add([data.snapshot])
+ actual_result = swh_storage.snapshot_add([snapshot])
assert actual_result == {"snapshot:add": 1}
assert list(swh_storage.journal_writer.journal.objects) == [
- ("snapshot", Snapshot.from_dict(data.empty_snapshot)),
- ("snapshot", Snapshot.from_dict(data.snapshot)),
+ ("snapshot", empty_snapshot),
+ ("snapshot", snapshot),
]
- def test_snapshot_add_validation(self, swh_storage):
- snap = copy.deepcopy(data.snapshot)
+ def test_snapshot_add_validation(self, swh_storage, sample_data_model):
+ snapshot = sample_data_model["snapshot"][0]
+
+ snap = snapshot.to_dict()
snap["branches"][b"foo"] = {"target_type": "revision"}
with pytest.raises(StorageArgumentException, match="target"):
swh_storage.snapshot_add([snap])
- snap = copy.deepcopy(data.snapshot)
+ snap = snapshot.to_dict()
snap["branches"][b"foo"] = {"target": b"\x42" * 20}
with pytest.raises(StorageArgumentException, match="target_type"):
swh_storage.snapshot_add([snap])
- def test_snapshot_add_count_branches(self, swh_storage):
- actual_result = swh_storage.snapshot_add([data.complete_snapshot])
+ def test_snapshot_add_count_branches(self, swh_storage, sample_data_model):
+ complete_snapshot = sample_data_model["snapshot"][2]
+
+ actual_result = swh_storage.snapshot_add([complete_snapshot])
assert actual_result == {"snapshot:add": 1}
- snp_id = data.complete_snapshot["id"]
- snp_size = swh_storage.snapshot_count_branches(snp_id)
+ snp_size = swh_storage.snapshot_count_branches(complete_snapshot.id)
expected_snp_size = {
"alias": 1,
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 21 2024, 1:34 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3231950
Attached To
D3571: test_storage: snapshot: Use data model object
Event Timeline
Log In to Comment