Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7124335
D3571.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
24 KB
Subscribers
None
D3571.diff
View Options
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -31,6 +31,7 @@
from swh.model.model import (
Content,
Directory,
+ MetadataTargetType,
Origin,
OriginVisit,
OriginVisitStatus,
@@ -39,7 +40,6 @@
Revision,
SkippedContent,
Snapshot,
- MetadataTargetType,
)
from swh.model.hypothesis_strategies import objects
from swh.storage import get_storage
@@ -2346,38 +2346,39 @@
)
assert actual_origin_visit3 == ovs3
- def test_person_fullname_unicity(self, swh_storage):
- # given (person injection through revisions for example)
- revision = data.revision
-
+ def test_person_fullname_unicity(self, swh_storage, sample_data_model):
+ revision, rev2 = sample_data_model["revision"][0:2]
# create a revision with same committer fullname but wo name and email
- revision2 = copy.deepcopy(data.revision2)
- revision2["committer"] = dict(revision["committer"])
- revision2["committer"]["email"] = None
- revision2["committer"]["name"] = None
+ revision2 = attr.evolve(
+ rev2,
+ committer=Person(
+ fullname=revision.committer.fullname, name=None, email=None
+ ),
+ )
- swh_storage.revision_add([revision])
- swh_storage.revision_add([revision2])
+ swh_storage.revision_add([revision, revision2])
# when getting added revisions
- revisions = list(swh_storage.revision_get([revision["id"], revision2["id"]]))
+ revisions = list(swh_storage.revision_get([revision.id, revision2.id]))
- # then
- # check committers are the same
+ # then check committers are the same
assert revisions[0]["committer"] == revisions[1]["committer"]
- def test_snapshot_add_get_empty(self, swh_storage):
- swh_storage.origin_add([data.origin])
- origin_url = data.origin["url"]
+ def test_snapshot_add_get_empty(self, swh_storage, sample_data_model):
+ empty_snapshot = sample_data_model["snapshot"][1]
+ empty_snapshot_dict = empty_snapshot.to_dict()
+
+ origin = sample_data_model["origin"][0]
+ swh_storage.origin_add([origin])
ov1 = swh_storage.origin_visit_add(
[
OriginVisit(
- origin=origin_url, date=data.date_visit1, type=data.type_visit1,
+ origin=origin.url, date=data.date_visit1, type=data.type_visit1,
)
]
)[0]
- actual_result = swh_storage.snapshot_add([data.empty_snapshot])
+ actual_result = swh_storage.snapshot_add([empty_snapshot])
assert actual_result == {"snapshot:add": 1}
date_now = now()
@@ -2385,24 +2386,24 @@
swh_storage.origin_visit_status_add(
[
OriginVisitStatus(
- origin=origin_url,
+ origin=origin.url,
visit=ov1.visit,
date=date_now,
status="full",
- snapshot=data.empty_snapshot["id"],
+ snapshot=empty_snapshot.id,
)
]
)
- by_id = swh_storage.snapshot_get(data.empty_snapshot["id"])
- assert by_id == {**data.empty_snapshot, "next_branch": None}
+ by_id = swh_storage.snapshot_get(empty_snapshot.id)
+ assert by_id == {**empty_snapshot_dict, "next_branch": None}
- by_ov = swh_storage.snapshot_get_by_origin_visit(origin_url, ov1.visit)
- assert by_ov == {**data.empty_snapshot, "next_branch": None}
+ by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, ov1.visit)
+ assert by_ov == {**empty_snapshot_dict, "next_branch": None}
ovs1 = OriginVisitStatus.from_dict(
{
- "origin": origin_url,
+ "origin": origin.url,
"date": data.date_visit1,
"visit": ov1.visit,
"status": "created",
@@ -2412,77 +2413,82 @@
)
ovs2 = OriginVisitStatus.from_dict(
{
- "origin": origin_url,
+ "origin": origin.url,
"date": date_now,
"visit": ov1.visit,
"status": "full",
"metadata": None,
- "snapshot": data.empty_snapshot["id"],
+ "snapshot": empty_snapshot.id,
}
)
actual_objects = list(swh_storage.journal_writer.journal.objects)
expected_objects = [
- ("origin", Origin.from_dict(data.origin)),
+ ("origin", origin),
("origin_visit", ov1),
("origin_visit_status", ovs1,),
- ("snapshot", Snapshot.from_dict(data.empty_snapshot)),
+ ("snapshot", empty_snapshot),
("origin_visit_status", ovs2,),
]
for obj in expected_objects:
assert obj in actual_objects
- def test_snapshot_add_get_complete(self, swh_storage):
- origin_url = data.origin["url"]
- swh_storage.origin_add([data.origin])
+ def test_snapshot_add_get_complete(self, swh_storage, sample_data_model):
+ complete_snapshot = sample_data_model["snapshot"][2]
+ complete_snapshot_dict = complete_snapshot.to_dict()
+ origin = sample_data_model["origin"][0]
+
+ swh_storage.origin_add([origin])
visit = OriginVisit(
- origin=origin_url, date=data.date_visit1, type=data.type_visit1,
+ origin=origin.url, date=data.date_visit1, type=data.type_visit1,
)
origin_visit1 = swh_storage.origin_visit_add([visit])[0]
visit_id = origin_visit1.visit
- actual_result = swh_storage.snapshot_add([data.complete_snapshot])
+ actual_result = swh_storage.snapshot_add([complete_snapshot])
swh_storage.origin_visit_status_add(
[
OriginVisitStatus(
- origin=origin_url,
+ origin=origin.url,
visit=origin_visit1.visit,
date=now(),
status="ongoing",
- snapshot=data.complete_snapshot["id"],
+ snapshot=complete_snapshot.id,
)
]
)
assert actual_result == {"snapshot:add": 1}
- by_id = swh_storage.snapshot_get(data.complete_snapshot["id"])
- assert by_id == {**data.complete_snapshot, "next_branch": None}
+ by_id = swh_storage.snapshot_get(complete_snapshot.id)
+ assert by_id == {**complete_snapshot_dict, "next_branch": None}
- by_ov = swh_storage.snapshot_get_by_origin_visit(origin_url, visit_id)
- assert by_ov == {**data.complete_snapshot, "next_branch": None}
+ by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, visit_id)
+ assert by_ov == {**complete_snapshot_dict, "next_branch": None}
- def test_snapshot_add_many(self, swh_storage):
- actual_result = swh_storage.snapshot_add(
- [data.snapshot, data.complete_snapshot]
- )
+ def test_snapshot_add_many(self, swh_storage, sample_data_model):
+ snapshot, _, complete_snapshot = sample_data_model["snapshot"][:3]
+
+ actual_result = swh_storage.snapshot_add([snapshot, complete_snapshot])
assert actual_result == {"snapshot:add": 2}
- assert {
- **data.complete_snapshot,
+ assert swh_storage.snapshot_get(complete_snapshot.id) == {
+ **complete_snapshot.to_dict(),
"next_branch": None,
- } == swh_storage.snapshot_get(data.complete_snapshot["id"])
+ }
- assert {**data.snapshot, "next_branch": None} == swh_storage.snapshot_get(
- data.snapshot["id"]
- )
+ assert swh_storage.snapshot_get(snapshot.id) == {
+ **snapshot.to_dict(),
+ "next_branch": None,
+ }
swh_storage.refresh_stat_counters()
assert swh_storage.stat_counters()["snapshot"] == 2
- def test_snapshot_add_many_from_generator(self, swh_storage):
+ def test_snapshot_add_many_from_generator(self, swh_storage, sample_data_model):
+ snapshot, _, complete_snapshot = sample_data_model["snapshot"][:3]
+
def _snp_gen():
- yield data.snapshot
- yield data.complete_snapshot
+ yield from [snapshot, complete_snapshot]
actual_result = swh_storage.snapshot_add(_snp_gen())
assert actual_result == {"snapshot:add": 2}
@@ -2490,59 +2496,65 @@
swh_storage.refresh_stat_counters()
assert swh_storage.stat_counters()["snapshot"] == 2
- def test_snapshot_add_many_incremental(self, swh_storage):
- actual_result = swh_storage.snapshot_add([data.complete_snapshot])
+ def test_snapshot_add_many_incremental(self, swh_storage, sample_data_model):
+ snapshot, _, complete_snapshot = sample_data_model["snapshot"][:3]
+
+ actual_result = swh_storage.snapshot_add([complete_snapshot])
assert actual_result == {"snapshot:add": 1}
- actual_result2 = swh_storage.snapshot_add(
- [data.snapshot, data.complete_snapshot]
- )
+ actual_result2 = swh_storage.snapshot_add([snapshot, complete_snapshot])
assert actual_result2 == {"snapshot:add": 1}
- assert {
- **data.complete_snapshot,
+ assert swh_storage.snapshot_get(complete_snapshot.id) == {
+ **complete_snapshot.to_dict(),
"next_branch": None,
- } == swh_storage.snapshot_get(data.complete_snapshot["id"])
+ }
- assert {**data.snapshot, "next_branch": None} == swh_storage.snapshot_get(
- data.snapshot["id"]
- )
+ assert swh_storage.snapshot_get(snapshot.id) == {
+ **snapshot.to_dict(),
+ "next_branch": None,
+ }
- def test_snapshot_add_twice(self, swh_storage):
- actual_result = swh_storage.snapshot_add([data.empty_snapshot])
+ def test_snapshot_add_twice(self, swh_storage, sample_data_model):
+ snapshot, empty_snapshot = sample_data_model["snapshot"][:2]
+
+ actual_result = swh_storage.snapshot_add([empty_snapshot])
assert actual_result == {"snapshot:add": 1}
assert list(swh_storage.journal_writer.journal.objects) == [
- ("snapshot", Snapshot.from_dict(data.empty_snapshot))
+ ("snapshot", empty_snapshot)
]
- actual_result = swh_storage.snapshot_add([data.snapshot])
+ actual_result = swh_storage.snapshot_add([snapshot])
assert actual_result == {"snapshot:add": 1}
assert list(swh_storage.journal_writer.journal.objects) == [
- ("snapshot", Snapshot.from_dict(data.empty_snapshot)),
- ("snapshot", Snapshot.from_dict(data.snapshot)),
+ ("snapshot", empty_snapshot),
+ ("snapshot", snapshot),
]
- def test_snapshot_add_validation(self, swh_storage):
- snap = copy.deepcopy(data.snapshot)
+ def test_snapshot_add_validation(self, swh_storage, sample_data_model):
+ snapshot = sample_data_model["snapshot"][0]
+
+ snap = snapshot.to_dict()
snap["branches"][b"foo"] = {"target_type": "revision"}
with pytest.raises(StorageArgumentException, match="target"):
swh_storage.snapshot_add([snap])
- snap = copy.deepcopy(data.snapshot)
+ snap = snapshot.to_dict()
snap["branches"][b"foo"] = {"target": b"\x42" * 20}
with pytest.raises(StorageArgumentException, match="target_type"):
swh_storage.snapshot_add([snap])
- def test_snapshot_add_count_branches(self, swh_storage):
- actual_result = swh_storage.snapshot_add([data.complete_snapshot])
+ def test_snapshot_add_count_branches(self, swh_storage, sample_data_model):
+ complete_snapshot = sample_data_model["snapshot"][2]
+
+ actual_result = swh_storage.snapshot_add([complete_snapshot])
assert actual_result == {"snapshot:add": 1}
- snp_id = data.complete_snapshot["id"]
- snp_size = swh_storage.snapshot_count_branches(snp_id)
+ snp_size = swh_storage.snapshot_count_branches(complete_snapshot.id)
expected_snp_size = {
"alias": 1,
@@ -2555,11 +2567,13 @@
}
assert snp_size == expected_snp_size
- def test_snapshot_add_get_paginated(self, swh_storage):
- swh_storage.snapshot_add([data.complete_snapshot])
+ def test_snapshot_add_get_paginated(self, swh_storage, sample_data_model):
+ complete_snapshot = sample_data_model["snapshot"][2]
+
+ swh_storage.snapshot_add([complete_snapshot])
- snp_id = data.complete_snapshot["id"]
- branches = data.complete_snapshot["branches"]
+ snp_id = complete_snapshot.id
+ branches = complete_snapshot.to_dict()["branches"]
branch_names = list(sorted(branches))
# Test branch_from
@@ -2601,29 +2615,31 @@
assert snapshot == expected_snapshot
- def test_snapshot_add_get_filtered(self, swh_storage):
- swh_storage.origin_add([data.origin])
- origin_url = data.origin["url"]
+ def test_snapshot_add_get_filtered(self, swh_storage, sample_data_model):
+ origin = sample_data_model["origin"][0]
+ complete_snapshot = sample_data_model["snapshot"][2]
+
+ swh_storage.origin_add([origin])
visit = OriginVisit(
- origin=origin_url, date=data.date_visit1, type=data.type_visit1,
+ origin=origin.url, date=data.date_visit1, type=data.type_visit1,
)
origin_visit1 = swh_storage.origin_visit_add([visit])[0]
- swh_storage.snapshot_add([data.complete_snapshot])
+ swh_storage.snapshot_add([complete_snapshot])
swh_storage.origin_visit_status_add(
[
OriginVisitStatus(
- origin=origin_url,
+ origin=origin.url,
visit=origin_visit1.visit,
date=now(),
status="ongoing",
- snapshot=data.complete_snapshot["id"],
+ snapshot=complete_snapshot.id,
)
]
)
- snp_id = data.complete_snapshot["id"]
- branches = data.complete_snapshot["branches"]
+ snp_id = complete_snapshot.id
+ branches = complete_snapshot.to_dict()["branches"]
snapshot = swh_storage.snapshot_get_branches(
snp_id, target_types=["release", "revision"]
@@ -2655,11 +2671,15 @@
assert snapshot == expected_snapshot
- def test_snapshot_add_get_filtered_and_paginated(self, swh_storage):
- swh_storage.snapshot_add([data.complete_snapshot])
+ def test_snapshot_add_get_filtered_and_paginated(
+ self, swh_storage, sample_data_model
+ ):
+ complete_snapshot = sample_data_model["snapshot"][2]
+
+ swh_storage.snapshot_add([complete_snapshot])
- snp_id = data.complete_snapshot["id"]
- branches = data.complete_snapshot["branches"]
+ snp_id = complete_snapshot.id
+ branches = complete_snapshot.to_dict()["branches"]
branch_names = list(sorted(branches))
# Test branch_from
@@ -2722,8 +2742,9 @@
assert snapshot == expected_snapshot
- def test_snapshot_add_get_branch_by_type(self, swh_storage):
- snapshot = copy.deepcopy(data.complete_snapshot)
+ def test_snapshot_add_get_branch_by_type(self, swh_storage, sample_data_model):
+ complete_snapshot = sample_data_model["snapshot"][2]
+ snapshot = complete_snapshot.to_dict()
alias1 = b"alias1"
alias2 = b"alias2"
@@ -2740,10 +2761,11 @@
"target_type": "alias",
}
- swh_storage.snapshot_add([snapshot])
+ new_snapshot = Snapshot.from_dict(snapshot)
+ swh_storage.snapshot_add([new_snapshot])
branches = swh_storage.snapshot_get_branches(
- snapshot["id"],
+ new_snapshot.id,
target_types=["alias"],
branches_from=alias1,
branches_count=1,
@@ -2752,11 +2774,13 @@
assert len(branches) == 1
assert alias1 in branches
- def test_snapshot_add_get(self, swh_storage):
- swh_storage.origin_add([data.origin])
- origin_url = data.origin["url"]
+ def test_snapshot_add_get(self, swh_storage, sample_data_model):
+ snapshot = sample_data_model["snapshot"][0]
+ origin = sample_data_model["origin"][0]
+
+ swh_storage.origin_add([origin])
visit = OriginVisit(
- origin=origin_url, date=data.date_visit1, type=data.type_visit1,
+ origin=origin.url, date=data.date_visit1, type=data.type_visit1,
)
origin_visit1 = swh_storage.origin_visit_add([visit])[0]
visit_id = origin_visit1.visit
@@ -2765,80 +2789,85 @@
swh_storage.origin_visit_status_add(
[
OriginVisitStatus(
- origin=origin_url,
+ origin=origin.url,
visit=origin_visit1.visit,
date=now(),
status="ongoing",
- snapshot=data.snapshot["id"],
+ snapshot=snapshot.id,
)
]
)
- by_id = swh_storage.snapshot_get(data.snapshot["id"])
- assert by_id == {**data.snapshot, "next_branch": None}
+ expected_snapshot = {**snapshot.to_dict(), "next_branch": None}
+
+ by_id = swh_storage.snapshot_get(snapshot.id)
+ assert by_id == expected_snapshot
- by_ov = swh_storage.snapshot_get_by_origin_visit(origin_url, visit_id)
- assert by_ov == {**data.snapshot, "next_branch": None}
+ by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, visit_id)
+ assert by_ov == expected_snapshot
- origin_visit_info = swh_storage.origin_visit_get_by(origin_url, visit_id)
- assert origin_visit_info["snapshot"] == data.snapshot["id"]
+ origin_visit_info = swh_storage.origin_visit_get_by(origin.url, visit_id)
+ assert origin_visit_info["snapshot"] == snapshot.id
+
+ def test_snapshot_add_twice__by_origin_visit(self, swh_storage, sample_data_model):
+ snapshot = sample_data_model["snapshot"][0]
+ origin = sample_data_model["origin"][0]
- def test_snapshot_add_twice__by_origin_visit(self, swh_storage):
- swh_storage.origin_add([data.origin])
- origin_url = data.origin["url"]
+ swh_storage.origin_add([origin])
ov1 = swh_storage.origin_visit_add(
[
OriginVisit(
- origin=origin_url, date=data.date_visit1, type=data.type_visit1,
+ origin=origin.url, date=data.date_visit1, type=data.type_visit1,
)
]
)[0]
- swh_storage.snapshot_add([data.snapshot])
+ swh_storage.snapshot_add([snapshot])
date_now2 = now()
swh_storage.origin_visit_status_add(
[
OriginVisitStatus(
- origin=origin_url,
+ origin=origin.url,
visit=ov1.visit,
date=date_now2,
status="ongoing",
- snapshot=data.snapshot["id"],
+ snapshot=snapshot.id,
)
]
)
- by_ov1 = swh_storage.snapshot_get_by_origin_visit(origin_url, ov1.visit)
- assert by_ov1 == {**data.snapshot, "next_branch": None}
+ expected_snapshot = {**snapshot.to_dict(), "next_branch": None}
+
+ by_ov1 = swh_storage.snapshot_get_by_origin_visit(origin.url, ov1.visit)
+ assert by_ov1 == expected_snapshot
ov2 = swh_storage.origin_visit_add(
[
OriginVisit(
- origin=origin_url, date=data.date_visit2, type=data.type_visit2,
+ origin=origin.url, date=data.date_visit2, type=data.type_visit2,
)
]
)[0]
- swh_storage.snapshot_add([data.snapshot])
date_now4 = now()
swh_storage.origin_visit_status_add(
[
OriginVisitStatus(
- origin=origin_url,
+ origin=origin.url,
visit=ov2.visit,
date=date_now4,
status="ongoing",
- snapshot=data.snapshot["id"],
+ snapshot=snapshot.id,
)
]
)
- by_ov2 = swh_storage.snapshot_get_by_origin_visit(origin_url, ov2.visit)
- assert by_ov2 == {**data.snapshot, "next_branch": None}
+ by_ov2 = swh_storage.snapshot_get_by_origin_visit(origin.url, ov2.visit)
+ assert by_ov2 == expected_snapshot
ovs1 = OriginVisitStatus.from_dict(
{
- "origin": origin_url,
+ "origin": origin.url,
"date": data.date_visit1,
"visit": ov1.visit,
"status": "created",
@@ -2848,17 +2877,17 @@
)
ovs2 = OriginVisitStatus.from_dict(
{
- "origin": origin_url,
+ "origin": origin.url,
"date": date_now2,
"visit": ov1.visit,
"status": "ongoing",
"metadata": None,
- "snapshot": data.snapshot["id"],
+ "snapshot": snapshot.id,
}
)
ovs3 = OriginVisitStatus.from_dict(
{
- "origin": origin_url,
+ "origin": origin.url,
"date": data.date_visit2,
"visit": ov2.visit,
"status": "created",
@@ -2868,20 +2897,20 @@
)
ovs4 = OriginVisitStatus.from_dict(
{
- "origin": origin_url,
+ "origin": origin.url,
"date": date_now4,
"visit": ov2.visit,
"status": "ongoing",
"metadata": None,
- "snapshot": data.snapshot["id"],
+ "snapshot": snapshot.id,
}
)
actual_objects = list(swh_storage.journal_writer.journal.objects)
expected_objects = [
- ("origin", Origin.from_dict(data.origin)),
+ ("origin", origin),
("origin_visit", ov1),
("origin_visit_status", ovs1),
- ("snapshot", Snapshot.from_dict(data.snapshot)),
+ ("snapshot", snapshot),
("origin_visit_status", ovs2),
("origin_visit", ov2),
("origin_visit_status", ovs3),
@@ -2891,26 +2920,24 @@
for obj in expected_objects:
assert obj in actual_objects
- def test_snapshot_get_random(self, swh_storage):
- swh_storage.snapshot_add(
- [data.snapshot, data.empty_snapshot, data.complete_snapshot]
- )
+ def test_snapshot_get_random(self, swh_storage, sample_data_model):
+ snapshot, empty_snapshot, complete_snapshot = sample_data_model["snapshot"][:3]
+ swh_storage.snapshot_add([snapshot, empty_snapshot, complete_snapshot])
assert swh_storage.snapshot_get_random() in {
- data.snapshot["id"],
- data.empty_snapshot["id"],
- data.complete_snapshot["id"],
+ snapshot.id,
+ empty_snapshot.id,
+ complete_snapshot.id,
}
- def test_snapshot_missing(self, swh_storage):
- snap = data.snapshot
- missing_snap = data.empty_snapshot
- snapshots = [snap["id"], missing_snap["id"]]
- swh_storage.snapshot_add([snap])
+ def test_snapshot_missing(self, swh_storage, sample_data_model):
+ snapshot, missing_snapshot = sample_data_model["snapshot"][:2]
+ snapshots = [snapshot.id, missing_snapshot.id]
+ swh_storage.snapshot_add([snapshot])
missing_snapshots = swh_storage.snapshot_missing(snapshots)
- assert list(missing_snapshots) == [missing_snap["id"]]
+ assert list(missing_snapshots) == [missing_snapshot.id]
def test_stat_counters(self, swh_storage):
expected_keys = ["content", "directory", "origin", "revision"]
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 21 2024, 7:47 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3221200
Attached To
D3571: test_storage: snapshot: Use data model object
Event Timeline
Log In to Comment