Page MenuHomeSoftware Heritage

D3571.diff
No OneTemporary

D3571.diff

diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -31,6 +31,7 @@
from swh.model.model import (
Content,
Directory,
+ MetadataTargetType,
Origin,
OriginVisit,
OriginVisitStatus,
@@ -39,7 +40,6 @@
Revision,
SkippedContent,
Snapshot,
- MetadataTargetType,
)
from swh.model.hypothesis_strategies import objects
from swh.storage import get_storage
@@ -2346,38 +2346,39 @@
)
assert actual_origin_visit3 == ovs3
- def test_person_fullname_unicity(self, swh_storage):
- # given (person injection through revisions for example)
- revision = data.revision
-
+ def test_person_fullname_unicity(self, swh_storage, sample_data_model):
+ revision, rev2 = sample_data_model["revision"][0:2]
# create a revision with same committer fullname but wo name and email
- revision2 = copy.deepcopy(data.revision2)
- revision2["committer"] = dict(revision["committer"])
- revision2["committer"]["email"] = None
- revision2["committer"]["name"] = None
+ revision2 = attr.evolve(
+ rev2,
+ committer=Person(
+ fullname=revision.committer.fullname, name=None, email=None
+ ),
+ )
- swh_storage.revision_add([revision])
- swh_storage.revision_add([revision2])
+ swh_storage.revision_add([revision, revision2])
# when getting added revisions
- revisions = list(swh_storage.revision_get([revision["id"], revision2["id"]]))
+ revisions = list(swh_storage.revision_get([revision.id, revision2.id]))
- # then
- # check committers are the same
+ # then check committers are the same
assert revisions[0]["committer"] == revisions[1]["committer"]
- def test_snapshot_add_get_empty(self, swh_storage):
- swh_storage.origin_add([data.origin])
- origin_url = data.origin["url"]
+ def test_snapshot_add_get_empty(self, swh_storage, sample_data_model):
+ empty_snapshot = sample_data_model["snapshot"][1]
+ empty_snapshot_dict = empty_snapshot.to_dict()
+
+ origin = sample_data_model["origin"][0]
+ swh_storage.origin_add([origin])
ov1 = swh_storage.origin_visit_add(
[
OriginVisit(
- origin=origin_url, date=data.date_visit1, type=data.type_visit1,
+ origin=origin.url, date=data.date_visit1, type=data.type_visit1,
)
]
)[0]
- actual_result = swh_storage.snapshot_add([data.empty_snapshot])
+ actual_result = swh_storage.snapshot_add([empty_snapshot])
assert actual_result == {"snapshot:add": 1}
date_now = now()
@@ -2385,24 +2386,24 @@
swh_storage.origin_visit_status_add(
[
OriginVisitStatus(
- origin=origin_url,
+ origin=origin.url,
visit=ov1.visit,
date=date_now,
status="full",
- snapshot=data.empty_snapshot["id"],
+ snapshot=empty_snapshot.id,
)
]
)
- by_id = swh_storage.snapshot_get(data.empty_snapshot["id"])
- assert by_id == {**data.empty_snapshot, "next_branch": None}
+ by_id = swh_storage.snapshot_get(empty_snapshot.id)
+ assert by_id == {**empty_snapshot_dict, "next_branch": None}
- by_ov = swh_storage.snapshot_get_by_origin_visit(origin_url, ov1.visit)
- assert by_ov == {**data.empty_snapshot, "next_branch": None}
+ by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, ov1.visit)
+ assert by_ov == {**empty_snapshot_dict, "next_branch": None}
ovs1 = OriginVisitStatus.from_dict(
{
- "origin": origin_url,
+ "origin": origin.url,
"date": data.date_visit1,
"visit": ov1.visit,
"status": "created",
@@ -2412,77 +2413,82 @@
)
ovs2 = OriginVisitStatus.from_dict(
{
- "origin": origin_url,
+ "origin": origin.url,
"date": date_now,
"visit": ov1.visit,
"status": "full",
"metadata": None,
- "snapshot": data.empty_snapshot["id"],
+ "snapshot": empty_snapshot.id,
}
)
actual_objects = list(swh_storage.journal_writer.journal.objects)
expected_objects = [
- ("origin", Origin.from_dict(data.origin)),
+ ("origin", origin),
("origin_visit", ov1),
("origin_visit_status", ovs1,),
- ("snapshot", Snapshot.from_dict(data.empty_snapshot)),
+ ("snapshot", empty_snapshot),
("origin_visit_status", ovs2,),
]
for obj in expected_objects:
assert obj in actual_objects
- def test_snapshot_add_get_complete(self, swh_storage):
- origin_url = data.origin["url"]
- swh_storage.origin_add([data.origin])
+ def test_snapshot_add_get_complete(self, swh_storage, sample_data_model):
+ complete_snapshot = sample_data_model["snapshot"][2]
+ complete_snapshot_dict = complete_snapshot.to_dict()
+ origin = sample_data_model["origin"][0]
+
+ swh_storage.origin_add([origin])
visit = OriginVisit(
- origin=origin_url, date=data.date_visit1, type=data.type_visit1,
+ origin=origin.url, date=data.date_visit1, type=data.type_visit1,
)
origin_visit1 = swh_storage.origin_visit_add([visit])[0]
visit_id = origin_visit1.visit
- actual_result = swh_storage.snapshot_add([data.complete_snapshot])
+ actual_result = swh_storage.snapshot_add([complete_snapshot])
swh_storage.origin_visit_status_add(
[
OriginVisitStatus(
- origin=origin_url,
+ origin=origin.url,
visit=origin_visit1.visit,
date=now(),
status="ongoing",
- snapshot=data.complete_snapshot["id"],
+ snapshot=complete_snapshot.id,
)
]
)
assert actual_result == {"snapshot:add": 1}
- by_id = swh_storage.snapshot_get(data.complete_snapshot["id"])
- assert by_id == {**data.complete_snapshot, "next_branch": None}
+ by_id = swh_storage.snapshot_get(complete_snapshot.id)
+ assert by_id == {**complete_snapshot_dict, "next_branch": None}
- by_ov = swh_storage.snapshot_get_by_origin_visit(origin_url, visit_id)
- assert by_ov == {**data.complete_snapshot, "next_branch": None}
+ by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, visit_id)
+ assert by_ov == {**complete_snapshot_dict, "next_branch": None}
- def test_snapshot_add_many(self, swh_storage):
- actual_result = swh_storage.snapshot_add(
- [data.snapshot, data.complete_snapshot]
- )
+ def test_snapshot_add_many(self, swh_storage, sample_data_model):
+ snapshot, _, complete_snapshot = sample_data_model["snapshot"][:3]
+
+ actual_result = swh_storage.snapshot_add([snapshot, complete_snapshot])
assert actual_result == {"snapshot:add": 2}
- assert {
- **data.complete_snapshot,
+ assert swh_storage.snapshot_get(complete_snapshot.id) == {
+ **complete_snapshot.to_dict(),
"next_branch": None,
- } == swh_storage.snapshot_get(data.complete_snapshot["id"])
+ }
- assert {**data.snapshot, "next_branch": None} == swh_storage.snapshot_get(
- data.snapshot["id"]
- )
+ assert swh_storage.snapshot_get(snapshot.id) == {
+ **snapshot.to_dict(),
+ "next_branch": None,
+ }
swh_storage.refresh_stat_counters()
assert swh_storage.stat_counters()["snapshot"] == 2
- def test_snapshot_add_many_from_generator(self, swh_storage):
+ def test_snapshot_add_many_from_generator(self, swh_storage, sample_data_model):
+ snapshot, _, complete_snapshot = sample_data_model["snapshot"][:3]
+
def _snp_gen():
- yield data.snapshot
- yield data.complete_snapshot
+ yield from [snapshot, complete_snapshot]
actual_result = swh_storage.snapshot_add(_snp_gen())
assert actual_result == {"snapshot:add": 2}
@@ -2490,59 +2496,65 @@
swh_storage.refresh_stat_counters()
assert swh_storage.stat_counters()["snapshot"] == 2
- def test_snapshot_add_many_incremental(self, swh_storage):
- actual_result = swh_storage.snapshot_add([data.complete_snapshot])
+ def test_snapshot_add_many_incremental(self, swh_storage, sample_data_model):
+ snapshot, _, complete_snapshot = sample_data_model["snapshot"][:3]
+
+ actual_result = swh_storage.snapshot_add([complete_snapshot])
assert actual_result == {"snapshot:add": 1}
- actual_result2 = swh_storage.snapshot_add(
- [data.snapshot, data.complete_snapshot]
- )
+ actual_result2 = swh_storage.snapshot_add([snapshot, complete_snapshot])
assert actual_result2 == {"snapshot:add": 1}
- assert {
- **data.complete_snapshot,
+ assert swh_storage.snapshot_get(complete_snapshot.id) == {
+ **complete_snapshot.to_dict(),
"next_branch": None,
- } == swh_storage.snapshot_get(data.complete_snapshot["id"])
+ }
- assert {**data.snapshot, "next_branch": None} == swh_storage.snapshot_get(
- data.snapshot["id"]
- )
+ assert swh_storage.snapshot_get(snapshot.id) == {
+ **snapshot.to_dict(),
+ "next_branch": None,
+ }
- def test_snapshot_add_twice(self, swh_storage):
- actual_result = swh_storage.snapshot_add([data.empty_snapshot])
+ def test_snapshot_add_twice(self, swh_storage, sample_data_model):
+ snapshot, empty_snapshot = sample_data_model["snapshot"][:2]
+
+ actual_result = swh_storage.snapshot_add([empty_snapshot])
assert actual_result == {"snapshot:add": 1}
assert list(swh_storage.journal_writer.journal.objects) == [
- ("snapshot", Snapshot.from_dict(data.empty_snapshot))
+ ("snapshot", empty_snapshot)
]
- actual_result = swh_storage.snapshot_add([data.snapshot])
+ actual_result = swh_storage.snapshot_add([snapshot])
assert actual_result == {"snapshot:add": 1}
assert list(swh_storage.journal_writer.journal.objects) == [
- ("snapshot", Snapshot.from_dict(data.empty_snapshot)),
- ("snapshot", Snapshot.from_dict(data.snapshot)),
+ ("snapshot", empty_snapshot),
+ ("snapshot", snapshot),
]
- def test_snapshot_add_validation(self, swh_storage):
- snap = copy.deepcopy(data.snapshot)
+ def test_snapshot_add_validation(self, swh_storage, sample_data_model):
+ snapshot = sample_data_model["snapshot"][0]
+
+ snap = snapshot.to_dict()
snap["branches"][b"foo"] = {"target_type": "revision"}
with pytest.raises(StorageArgumentException, match="target"):
swh_storage.snapshot_add([snap])
- snap = copy.deepcopy(data.snapshot)
+ snap = snapshot.to_dict()
snap["branches"][b"foo"] = {"target": b"\x42" * 20}
with pytest.raises(StorageArgumentException, match="target_type"):
swh_storage.snapshot_add([snap])
- def test_snapshot_add_count_branches(self, swh_storage):
- actual_result = swh_storage.snapshot_add([data.complete_snapshot])
+ def test_snapshot_add_count_branches(self, swh_storage, sample_data_model):
+ complete_snapshot = sample_data_model["snapshot"][2]
+
+ actual_result = swh_storage.snapshot_add([complete_snapshot])
assert actual_result == {"snapshot:add": 1}
- snp_id = data.complete_snapshot["id"]
- snp_size = swh_storage.snapshot_count_branches(snp_id)
+ snp_size = swh_storage.snapshot_count_branches(complete_snapshot.id)
expected_snp_size = {
"alias": 1,
@@ -2555,11 +2567,13 @@
}
assert snp_size == expected_snp_size
- def test_snapshot_add_get_paginated(self, swh_storage):
- swh_storage.snapshot_add([data.complete_snapshot])
+ def test_snapshot_add_get_paginated(self, swh_storage, sample_data_model):
+ complete_snapshot = sample_data_model["snapshot"][2]
+
+ swh_storage.snapshot_add([complete_snapshot])
- snp_id = data.complete_snapshot["id"]
- branches = data.complete_snapshot["branches"]
+ snp_id = complete_snapshot.id
+ branches = complete_snapshot.to_dict()["branches"]
branch_names = list(sorted(branches))
# Test branch_from
@@ -2601,29 +2615,31 @@
assert snapshot == expected_snapshot
- def test_snapshot_add_get_filtered(self, swh_storage):
- swh_storage.origin_add([data.origin])
- origin_url = data.origin["url"]
+ def test_snapshot_add_get_filtered(self, swh_storage, sample_data_model):
+ origin = sample_data_model["origin"][0]
+ complete_snapshot = sample_data_model["snapshot"][2]
+
+ swh_storage.origin_add([origin])
visit = OriginVisit(
- origin=origin_url, date=data.date_visit1, type=data.type_visit1,
+ origin=origin.url, date=data.date_visit1, type=data.type_visit1,
)
origin_visit1 = swh_storage.origin_visit_add([visit])[0]
- swh_storage.snapshot_add([data.complete_snapshot])
+ swh_storage.snapshot_add([complete_snapshot])
swh_storage.origin_visit_status_add(
[
OriginVisitStatus(
- origin=origin_url,
+ origin=origin.url,
visit=origin_visit1.visit,
date=now(),
status="ongoing",
- snapshot=data.complete_snapshot["id"],
+ snapshot=complete_snapshot.id,
)
]
)
- snp_id = data.complete_snapshot["id"]
- branches = data.complete_snapshot["branches"]
+ snp_id = complete_snapshot.id
+ branches = complete_snapshot.to_dict()["branches"]
snapshot = swh_storage.snapshot_get_branches(
snp_id, target_types=["release", "revision"]
@@ -2655,11 +2671,15 @@
assert snapshot == expected_snapshot
- def test_snapshot_add_get_filtered_and_paginated(self, swh_storage):
- swh_storage.snapshot_add([data.complete_snapshot])
+ def test_snapshot_add_get_filtered_and_paginated(
+ self, swh_storage, sample_data_model
+ ):
+ complete_snapshot = sample_data_model["snapshot"][2]
+
+ swh_storage.snapshot_add([complete_snapshot])
- snp_id = data.complete_snapshot["id"]
- branches = data.complete_snapshot["branches"]
+ snp_id = complete_snapshot.id
+ branches = complete_snapshot.to_dict()["branches"]
branch_names = list(sorted(branches))
# Test branch_from
@@ -2722,8 +2742,9 @@
assert snapshot == expected_snapshot
- def test_snapshot_add_get_branch_by_type(self, swh_storage):
- snapshot = copy.deepcopy(data.complete_snapshot)
+ def test_snapshot_add_get_branch_by_type(self, swh_storage, sample_data_model):
+ complete_snapshot = sample_data_model["snapshot"][2]
+ snapshot = complete_snapshot.to_dict()
alias1 = b"alias1"
alias2 = b"alias2"
@@ -2740,10 +2761,11 @@
"target_type": "alias",
}
- swh_storage.snapshot_add([snapshot])
+ new_snapshot = Snapshot.from_dict(snapshot)
+ swh_storage.snapshot_add([new_snapshot])
branches = swh_storage.snapshot_get_branches(
- snapshot["id"],
+ new_snapshot.id,
target_types=["alias"],
branches_from=alias1,
branches_count=1,
@@ -2752,11 +2774,13 @@
assert len(branches) == 1
assert alias1 in branches
- def test_snapshot_add_get(self, swh_storage):
- swh_storage.origin_add([data.origin])
- origin_url = data.origin["url"]
+ def test_snapshot_add_get(self, swh_storage, sample_data_model):
+ snapshot = sample_data_model["snapshot"][0]
+ origin = sample_data_model["origin"][0]
+
+ swh_storage.origin_add([origin])
visit = OriginVisit(
- origin=origin_url, date=data.date_visit1, type=data.type_visit1,
+ origin=origin.url, date=data.date_visit1, type=data.type_visit1,
)
origin_visit1 = swh_storage.origin_visit_add([visit])[0]
visit_id = origin_visit1.visit
@@ -2765,80 +2789,85 @@
swh_storage.origin_visit_status_add(
[
OriginVisitStatus(
- origin=origin_url,
+ origin=origin.url,
visit=origin_visit1.visit,
date=now(),
status="ongoing",
- snapshot=data.snapshot["id"],
+ snapshot=snapshot.id,
)
]
)
- by_id = swh_storage.snapshot_get(data.snapshot["id"])
- assert by_id == {**data.snapshot, "next_branch": None}
+ expected_snapshot = {**snapshot.to_dict(), "next_branch": None}
+
+ by_id = swh_storage.snapshot_get(snapshot.id)
+ assert by_id == expected_snapshot
- by_ov = swh_storage.snapshot_get_by_origin_visit(origin_url, visit_id)
- assert by_ov == {**data.snapshot, "next_branch": None}
+ by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, visit_id)
+ assert by_ov == expected_snapshot
- origin_visit_info = swh_storage.origin_visit_get_by(origin_url, visit_id)
- assert origin_visit_info["snapshot"] == data.snapshot["id"]
+ origin_visit_info = swh_storage.origin_visit_get_by(origin.url, visit_id)
+ assert origin_visit_info["snapshot"] == snapshot.id
+
+ def test_snapshot_add_twice__by_origin_visit(self, swh_storage, sample_data_model):
+ snapshot = sample_data_model["snapshot"][0]
+ origin = sample_data_model["origin"][0]
- def test_snapshot_add_twice__by_origin_visit(self, swh_storage):
- swh_storage.origin_add([data.origin])
- origin_url = data.origin["url"]
+ swh_storage.origin_add([origin])
ov1 = swh_storage.origin_visit_add(
[
OriginVisit(
- origin=origin_url, date=data.date_visit1, type=data.type_visit1,
+ origin=origin.url, date=data.date_visit1, type=data.type_visit1,
)
]
)[0]
- swh_storage.snapshot_add([data.snapshot])
+ swh_storage.snapshot_add([snapshot])
date_now2 = now()
swh_storage.origin_visit_status_add(
[
OriginVisitStatus(
- origin=origin_url,
+ origin=origin.url,
visit=ov1.visit,
date=date_now2,
status="ongoing",
- snapshot=data.snapshot["id"],
+ snapshot=snapshot.id,
)
]
)
- by_ov1 = swh_storage.snapshot_get_by_origin_visit(origin_url, ov1.visit)
- assert by_ov1 == {**data.snapshot, "next_branch": None}
+ expected_snapshot = {**snapshot.to_dict(), "next_branch": None}
+
+ by_ov1 = swh_storage.snapshot_get_by_origin_visit(origin.url, ov1.visit)
+ assert by_ov1 == expected_snapshot
ov2 = swh_storage.origin_visit_add(
[
OriginVisit(
- origin=origin_url, date=data.date_visit2, type=data.type_visit2,
+ origin=origin.url, date=data.date_visit2, type=data.type_visit2,
)
]
)[0]
- swh_storage.snapshot_add([data.snapshot])
date_now4 = now()
swh_storage.origin_visit_status_add(
[
OriginVisitStatus(
- origin=origin_url,
+ origin=origin.url,
visit=ov2.visit,
date=date_now4,
status="ongoing",
- snapshot=data.snapshot["id"],
+ snapshot=snapshot.id,
)
]
)
- by_ov2 = swh_storage.snapshot_get_by_origin_visit(origin_url, ov2.visit)
- assert by_ov2 == {**data.snapshot, "next_branch": None}
+ by_ov2 = swh_storage.snapshot_get_by_origin_visit(origin.url, ov2.visit)
+ assert by_ov2 == expected_snapshot
ovs1 = OriginVisitStatus.from_dict(
{
- "origin": origin_url,
+ "origin": origin.url,
"date": data.date_visit1,
"visit": ov1.visit,
"status": "created",
@@ -2848,17 +2877,17 @@
)
ovs2 = OriginVisitStatus.from_dict(
{
- "origin": origin_url,
+ "origin": origin.url,
"date": date_now2,
"visit": ov1.visit,
"status": "ongoing",
"metadata": None,
- "snapshot": data.snapshot["id"],
+ "snapshot": snapshot.id,
}
)
ovs3 = OriginVisitStatus.from_dict(
{
- "origin": origin_url,
+ "origin": origin.url,
"date": data.date_visit2,
"visit": ov2.visit,
"status": "created",
@@ -2868,20 +2897,20 @@
)
ovs4 = OriginVisitStatus.from_dict(
{
- "origin": origin_url,
+ "origin": origin.url,
"date": date_now4,
"visit": ov2.visit,
"status": "ongoing",
"metadata": None,
- "snapshot": data.snapshot["id"],
+ "snapshot": snapshot.id,
}
)
actual_objects = list(swh_storage.journal_writer.journal.objects)
expected_objects = [
- ("origin", Origin.from_dict(data.origin)),
+ ("origin", origin),
("origin_visit", ov1),
("origin_visit_status", ovs1),
- ("snapshot", Snapshot.from_dict(data.snapshot)),
+ ("snapshot", snapshot),
("origin_visit_status", ovs2),
("origin_visit", ov2),
("origin_visit_status", ovs3),
@@ -2891,26 +2920,24 @@
for obj in expected_objects:
assert obj in actual_objects
- def test_snapshot_get_random(self, swh_storage):
- swh_storage.snapshot_add(
- [data.snapshot, data.empty_snapshot, data.complete_snapshot]
- )
+ def test_snapshot_get_random(self, swh_storage, sample_data_model):
+ snapshot, empty_snapshot, complete_snapshot = sample_data_model["snapshot"][:3]
+ swh_storage.snapshot_add([snapshot, empty_snapshot, complete_snapshot])
assert swh_storage.snapshot_get_random() in {
- data.snapshot["id"],
- data.empty_snapshot["id"],
- data.complete_snapshot["id"],
+ snapshot.id,
+ empty_snapshot.id,
+ complete_snapshot.id,
}
- def test_snapshot_missing(self, swh_storage):
- snap = data.snapshot
- missing_snap = data.empty_snapshot
- snapshots = [snap["id"], missing_snap["id"]]
- swh_storage.snapshot_add([snap])
+ def test_snapshot_missing(self, swh_storage, sample_data_model):
+ snapshot, missing_snapshot = sample_data_model["snapshot"][:2]
+ snapshots = [snapshot.id, missing_snapshot.id]
+ swh_storage.snapshot_add([snapshot])
missing_snapshots = swh_storage.snapshot_missing(snapshots)
- assert list(missing_snapshots) == [missing_snap["id"]]
+ assert list(missing_snapshots) == [missing_snapshot.id]
def test_stat_counters(self, swh_storage):
expected_keys = ["content", "directory", "origin", "revision"]

File Metadata

Mime Type
text/plain
Expires
Dec 21 2024, 7:47 AM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3221200

Event Timeline