diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -2346,38 +2346,39 @@ ) assert actual_origin_visit3 == ovs3 - def test_person_fullname_unicity(self, swh_storage): - # given (person injection through revisions for example) - revision = data.revision - + def test_person_fullname_unicity(self, swh_storage, sample_data_model): + revision, rev2 = sample_data_model["revision"][0:2] # create a revision with same committer fullname but wo name and email - revision2 = copy.deepcopy(data.revision2) - revision2["committer"] = dict(revision["committer"]) - revision2["committer"]["email"] = None - revision2["committer"]["name"] = None + revision2 = attr.evolve( + rev2, + committer=Person( + fullname=revision.committer.fullname, name=None, email=None + ), + ) - swh_storage.revision_add([revision]) - swh_storage.revision_add([revision2]) + swh_storage.revision_add([revision, revision2]) # when getting added revisions - revisions = list(swh_storage.revision_get([revision["id"], revision2["id"]])) + revisions = list(swh_storage.revision_get([revision.id, revision2.id])) - # then - # check committers are the same + # then check committers are the same assert revisions[0]["committer"] == revisions[1]["committer"] - def test_snapshot_add_get_empty(self, swh_storage): - swh_storage.origin_add([data.origin]) - origin_url = data.origin["url"] + def test_snapshot_add_get_empty(self, swh_storage, sample_data_model): + empty_snapshot = sample_data_model["snapshot"][1] + empty_snapshot_dict = empty_snapshot.to_dict() + + origin = sample_data_model["origin"][0] + swh_storage.origin_add([origin]) ov1 = swh_storage.origin_visit_add( [ OriginVisit( - origin=origin_url, date=data.date_visit1, type=data.type_visit1, + origin=origin.url, date=data.date_visit1, type=data.type_visit1, ) ] )[0] - actual_result = swh_storage.snapshot_add([data.empty_snapshot]) + actual_result = swh_storage.snapshot_add([empty_snapshot]) assert actual_result == {"snapshot:add": 1} date_now = now() @@ -2385,24 +2386,24 @@ swh_storage.origin_visit_status_add( [ OriginVisitStatus( - origin=origin_url, + origin=origin.url, visit=ov1.visit, date=date_now, status="full", - snapshot=data.empty_snapshot["id"], + snapshot=empty_snapshot.id, ) ] ) - by_id = swh_storage.snapshot_get(data.empty_snapshot["id"]) - assert by_id == {**data.empty_snapshot, "next_branch": None} + by_id = swh_storage.snapshot_get(empty_snapshot.id) + assert by_id == {**empty_snapshot_dict, "next_branch": None} - by_ov = swh_storage.snapshot_get_by_origin_visit(origin_url, ov1.visit) - assert by_ov == {**data.empty_snapshot, "next_branch": None} + by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, ov1.visit) + assert by_ov == {**empty_snapshot_dict, "next_branch": None} ovs1 = OriginVisitStatus.from_dict( { - "origin": origin_url, + "origin": origin.url, "date": data.date_visit1, "visit": ov1.visit, "status": "created", @@ -2412,77 +2413,82 @@ ) ovs2 = OriginVisitStatus.from_dict( { - "origin": origin_url, + "origin": origin.url, "date": date_now, "visit": ov1.visit, "status": "full", "metadata": None, - "snapshot": data.empty_snapshot["id"], + "snapshot": empty_snapshot.id, } ) actual_objects = list(swh_storage.journal_writer.journal.objects) expected_objects = [ - ("origin", Origin.from_dict(data.origin)), + ("origin", origin), ("origin_visit", ov1), ("origin_visit_status", ovs1,), - ("snapshot", Snapshot.from_dict(data.empty_snapshot)), + ("snapshot", empty_snapshot), ("origin_visit_status", ovs2,), ] for obj in expected_objects: assert obj in actual_objects - def test_snapshot_add_get_complete(self, swh_storage): - origin_url = data.origin["url"] - swh_storage.origin_add([data.origin]) + def test_snapshot_add_get_complete(self, swh_storage, sample_data_model): + complete_snapshot = sample_data_model["snapshot"][2] + complete_snapshot_dict = complete_snapshot.to_dict() + origin = sample_data_model["origin"][0] + + swh_storage.origin_add([origin]) visit = OriginVisit( - origin=origin_url, date=data.date_visit1, type=data.type_visit1, + origin=origin.url, date=data.date_visit1, type=data.type_visit1, ) origin_visit1 = swh_storage.origin_visit_add([visit])[0] visit_id = origin_visit1.visit - actual_result = swh_storage.snapshot_add([data.complete_snapshot]) + actual_result = swh_storage.snapshot_add([complete_snapshot]) swh_storage.origin_visit_status_add( [ OriginVisitStatus( - origin=origin_url, + origin=origin.url, visit=origin_visit1.visit, date=now(), status="ongoing", - snapshot=data.complete_snapshot["id"], + snapshot=complete_snapshot.id, ) ] ) assert actual_result == {"snapshot:add": 1} - by_id = swh_storage.snapshot_get(data.complete_snapshot["id"]) - assert by_id == {**data.complete_snapshot, "next_branch": None} + by_id = swh_storage.snapshot_get(complete_snapshot.id) + assert by_id == {**complete_snapshot_dict, "next_branch": None} - by_ov = swh_storage.snapshot_get_by_origin_visit(origin_url, visit_id) - assert by_ov == {**data.complete_snapshot, "next_branch": None} + by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, visit_id) + assert by_ov == {**complete_snapshot_dict, "next_branch": None} - def test_snapshot_add_many(self, swh_storage): - actual_result = swh_storage.snapshot_add( - [data.snapshot, data.complete_snapshot] - ) + def test_snapshot_add_many(self, swh_storage, sample_data_model): + snapshot, _, complete_snapshot = sample_data_model["snapshot"][:3] + + actual_result = swh_storage.snapshot_add([snapshot, complete_snapshot]) assert actual_result == {"snapshot:add": 2} - assert { - **data.complete_snapshot, + assert swh_storage.snapshot_get(complete_snapshot.id) == { + **complete_snapshot.to_dict(), "next_branch": None, - } == swh_storage.snapshot_get(data.complete_snapshot["id"]) + } - assert {**data.snapshot, "next_branch": None} == swh_storage.snapshot_get( - data.snapshot["id"] - ) + assert swh_storage.snapshot_get(snapshot.id) == { + **snapshot.to_dict(), + "next_branch": None, + } swh_storage.refresh_stat_counters() assert swh_storage.stat_counters()["snapshot"] == 2 - def test_snapshot_add_many_from_generator(self, swh_storage): + def test_snapshot_add_many_from_generator(self, swh_storage, sample_data_model): + snapshot, _, complete_snapshot = sample_data_model["snapshot"][:3] + def _snp_gen(): - yield data.snapshot - yield data.complete_snapshot + yield from [snapshot, complete_snapshot] actual_result = swh_storage.snapshot_add(_snp_gen()) assert actual_result == {"snapshot:add": 2} @@ -2490,59 +2496,65 @@ swh_storage.refresh_stat_counters() assert swh_storage.stat_counters()["snapshot"] == 2 - def test_snapshot_add_many_incremental(self, swh_storage): - actual_result = swh_storage.snapshot_add([data.complete_snapshot]) + def test_snapshot_add_many_incremental(self, swh_storage, sample_data_model): + snapshot, _, complete_snapshot = sample_data_model["snapshot"][:3] + + actual_result = swh_storage.snapshot_add([complete_snapshot]) assert actual_result == {"snapshot:add": 1} - actual_result2 = swh_storage.snapshot_add( - [data.snapshot, data.complete_snapshot] - ) + actual_result2 = swh_storage.snapshot_add([snapshot, complete_snapshot]) assert actual_result2 == {"snapshot:add": 1} - assert { - **data.complete_snapshot, + assert swh_storage.snapshot_get(complete_snapshot.id) == { + **complete_snapshot.to_dict(), "next_branch": None, - } == swh_storage.snapshot_get(data.complete_snapshot["id"]) + } - assert {**data.snapshot, "next_branch": None} == swh_storage.snapshot_get( - data.snapshot["id"] - ) + assert swh_storage.snapshot_get(snapshot.id) == { + **snapshot.to_dict(), + "next_branch": None, + } - def test_snapshot_add_twice(self, swh_storage): - actual_result = swh_storage.snapshot_add([data.empty_snapshot]) + def test_snapshot_add_twice(self, swh_storage, sample_data_model): + snapshot, empty_snapshot = sample_data_model["snapshot"][:2] + + actual_result = swh_storage.snapshot_add([empty_snapshot]) assert actual_result == {"snapshot:add": 1} assert list(swh_storage.journal_writer.journal.objects) == [ - ("snapshot", Snapshot.from_dict(data.empty_snapshot)) + ("snapshot", empty_snapshot) ] - actual_result = swh_storage.snapshot_add([data.snapshot]) + actual_result = swh_storage.snapshot_add([snapshot]) assert actual_result == {"snapshot:add": 1} assert list(swh_storage.journal_writer.journal.objects) == [ - ("snapshot", Snapshot.from_dict(data.empty_snapshot)), - ("snapshot", Snapshot.from_dict(data.snapshot)), + ("snapshot", empty_snapshot), + ("snapshot", snapshot), ] - def test_snapshot_add_validation(self, swh_storage): - snap = copy.deepcopy(data.snapshot) + def test_snapshot_add_validation(self, swh_storage, sample_data_model): + snapshot = sample_data_model["snapshot"][0] + + snap = snapshot.to_dict() snap["branches"][b"foo"] = {"target_type": "revision"} with pytest.raises(StorageArgumentException, match="target"): swh_storage.snapshot_add([snap]) - snap = copy.deepcopy(data.snapshot) + snap = snapshot.to_dict() snap["branches"][b"foo"] = {"target": b"\x42" * 20} with pytest.raises(StorageArgumentException, match="target_type"): swh_storage.snapshot_add([snap]) - def test_snapshot_add_count_branches(self, swh_storage): - actual_result = swh_storage.snapshot_add([data.complete_snapshot]) + def test_snapshot_add_count_branches(self, swh_storage, sample_data_model): + complete_snapshot = sample_data_model["snapshot"][2] + + actual_result = swh_storage.snapshot_add([complete_snapshot]) assert actual_result == {"snapshot:add": 1} - snp_id = data.complete_snapshot["id"] - snp_size = swh_storage.snapshot_count_branches(snp_id) + snp_size = swh_storage.snapshot_count_branches(complete_snapshot.id) expected_snp_size = { "alias": 1,