diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -31,6 +31,7 @@ from swh.model.model import ( Content, Directory, + MetadataTargetType, Origin, OriginVisit, OriginVisitStatus, @@ -39,7 +40,6 @@ Revision, SkippedContent, Snapshot, - MetadataTargetType, ) from swh.model.hypothesis_strategies import objects from swh.storage import get_storage @@ -2346,38 +2346,39 @@ ) assert actual_origin_visit3 == ovs3 - def test_person_fullname_unicity(self, swh_storage): - # given (person injection through revisions for example) - revision = data.revision - + def test_person_fullname_unicity(self, swh_storage, sample_data_model): + revision, rev2 = sample_data_model["revision"][0:2] # create a revision with same committer fullname but wo name and email - revision2 = copy.deepcopy(data.revision2) - revision2["committer"] = dict(revision["committer"]) - revision2["committer"]["email"] = None - revision2["committer"]["name"] = None + revision2 = attr.evolve( + rev2, + committer=Person( + fullname=revision.committer.fullname, name=None, email=None + ), + ) - swh_storage.revision_add([revision]) - swh_storage.revision_add([revision2]) + swh_storage.revision_add([revision, revision2]) # when getting added revisions - revisions = list(swh_storage.revision_get([revision["id"], revision2["id"]])) + revisions = list(swh_storage.revision_get([revision.id, revision2.id])) - # then - # check committers are the same + # then check committers are the same assert revisions[0]["committer"] == revisions[1]["committer"] - def test_snapshot_add_get_empty(self, swh_storage): - swh_storage.origin_add([data.origin]) - origin_url = data.origin["url"] + def test_snapshot_add_get_empty(self, swh_storage, sample_data_model): + empty_snapshot = sample_data_model["snapshot"][1] + empty_snapshot_dict = empty_snapshot.to_dict() + + origin = sample_data_model["origin"][0] + swh_storage.origin_add([origin]) ov1 = swh_storage.origin_visit_add( [ OriginVisit( - origin=origin_url, date=data.date_visit1, type=data.type_visit1, + origin=origin.url, date=data.date_visit1, type=data.type_visit1, ) ] )[0] - actual_result = swh_storage.snapshot_add([data.empty_snapshot]) + actual_result = swh_storage.snapshot_add([empty_snapshot]) assert actual_result == {"snapshot:add": 1} date_now = now() @@ -2385,24 +2386,24 @@ swh_storage.origin_visit_status_add( [ OriginVisitStatus( - origin=origin_url, + origin=origin.url, visit=ov1.visit, date=date_now, status="full", - snapshot=data.empty_snapshot["id"], + snapshot=empty_snapshot.id, ) ] ) - by_id = swh_storage.snapshot_get(data.empty_snapshot["id"]) - assert by_id == {**data.empty_snapshot, "next_branch": None} + by_id = swh_storage.snapshot_get(empty_snapshot.id) + assert by_id == {**empty_snapshot_dict, "next_branch": None} - by_ov = swh_storage.snapshot_get_by_origin_visit(origin_url, ov1.visit) - assert by_ov == {**data.empty_snapshot, "next_branch": None} + by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, ov1.visit) + assert by_ov == {**empty_snapshot_dict, "next_branch": None} ovs1 = OriginVisitStatus.from_dict( { - "origin": origin_url, + "origin": origin.url, "date": data.date_visit1, "visit": ov1.visit, "status": "created", @@ -2412,77 +2413,82 @@ ) ovs2 = OriginVisitStatus.from_dict( { - "origin": origin_url, + "origin": origin.url, "date": date_now, "visit": ov1.visit, "status": "full", "metadata": None, - "snapshot": data.empty_snapshot["id"], + "snapshot": empty_snapshot.id, } ) actual_objects = list(swh_storage.journal_writer.journal.objects) expected_objects = [ - ("origin", Origin.from_dict(data.origin)), + ("origin", origin), ("origin_visit", ov1), ("origin_visit_status", ovs1,), - ("snapshot", Snapshot.from_dict(data.empty_snapshot)), + ("snapshot", empty_snapshot), ("origin_visit_status", ovs2,), ] for obj in expected_objects: assert obj in actual_objects - def test_snapshot_add_get_complete(self, swh_storage): - origin_url = data.origin["url"] - swh_storage.origin_add([data.origin]) + def test_snapshot_add_get_complete(self, swh_storage, sample_data_model): + complete_snapshot = sample_data_model["snapshot"][2] + complete_snapshot_dict = complete_snapshot.to_dict() + origin = sample_data_model["origin"][0] + + swh_storage.origin_add([origin]) visit = OriginVisit( - origin=origin_url, date=data.date_visit1, type=data.type_visit1, + origin=origin.url, date=data.date_visit1, type=data.type_visit1, ) origin_visit1 = swh_storage.origin_visit_add([visit])[0] visit_id = origin_visit1.visit - actual_result = swh_storage.snapshot_add([data.complete_snapshot]) + actual_result = swh_storage.snapshot_add([complete_snapshot]) swh_storage.origin_visit_status_add( [ OriginVisitStatus( - origin=origin_url, + origin=origin.url, visit=origin_visit1.visit, date=now(), status="ongoing", - snapshot=data.complete_snapshot["id"], + snapshot=complete_snapshot.id, ) ] ) assert actual_result == {"snapshot:add": 1} - by_id = swh_storage.snapshot_get(data.complete_snapshot["id"]) - assert by_id == {**data.complete_snapshot, "next_branch": None} + by_id = swh_storage.snapshot_get(complete_snapshot.id) + assert by_id == {**complete_snapshot_dict, "next_branch": None} - by_ov = swh_storage.snapshot_get_by_origin_visit(origin_url, visit_id) - assert by_ov == {**data.complete_snapshot, "next_branch": None} + by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, visit_id) + assert by_ov == {**complete_snapshot_dict, "next_branch": None} - def test_snapshot_add_many(self, swh_storage): - actual_result = swh_storage.snapshot_add( - [data.snapshot, data.complete_snapshot] - ) + def test_snapshot_add_many(self, swh_storage, sample_data_model): + snapshot, _, complete_snapshot = sample_data_model["snapshot"][:3] + + actual_result = swh_storage.snapshot_add([snapshot, complete_snapshot]) assert actual_result == {"snapshot:add": 2} - assert { - **data.complete_snapshot, + assert swh_storage.snapshot_get(complete_snapshot.id) == { + **complete_snapshot.to_dict(), "next_branch": None, - } == swh_storage.snapshot_get(data.complete_snapshot["id"]) + } - assert {**data.snapshot, "next_branch": None} == swh_storage.snapshot_get( - data.snapshot["id"] - ) + assert swh_storage.snapshot_get(snapshot.id) == { + **snapshot.to_dict(), + "next_branch": None, + } swh_storage.refresh_stat_counters() assert swh_storage.stat_counters()["snapshot"] == 2 - def test_snapshot_add_many_from_generator(self, swh_storage): + def test_snapshot_add_many_from_generator(self, swh_storage, sample_data_model): + snapshot, _, complete_snapshot = sample_data_model["snapshot"][:3] + def _snp_gen(): - yield data.snapshot - yield data.complete_snapshot + yield from [snapshot, complete_snapshot] actual_result = swh_storage.snapshot_add(_snp_gen()) assert actual_result == {"snapshot:add": 2} @@ -2490,59 +2496,65 @@ swh_storage.refresh_stat_counters() assert swh_storage.stat_counters()["snapshot"] == 2 - def test_snapshot_add_many_incremental(self, swh_storage): - actual_result = swh_storage.snapshot_add([data.complete_snapshot]) + def test_snapshot_add_many_incremental(self, swh_storage, sample_data_model): + snapshot, _, complete_snapshot = sample_data_model["snapshot"][:3] + + actual_result = swh_storage.snapshot_add([complete_snapshot]) assert actual_result == {"snapshot:add": 1} - actual_result2 = swh_storage.snapshot_add( - [data.snapshot, data.complete_snapshot] - ) + actual_result2 = swh_storage.snapshot_add([snapshot, complete_snapshot]) assert actual_result2 == {"snapshot:add": 1} - assert { - **data.complete_snapshot, + assert swh_storage.snapshot_get(complete_snapshot.id) == { + **complete_snapshot.to_dict(), "next_branch": None, - } == swh_storage.snapshot_get(data.complete_snapshot["id"]) + } - assert {**data.snapshot, "next_branch": None} == swh_storage.snapshot_get( - data.snapshot["id"] - ) + assert swh_storage.snapshot_get(snapshot.id) == { + **snapshot.to_dict(), + "next_branch": None, + } - def test_snapshot_add_twice(self, swh_storage): - actual_result = swh_storage.snapshot_add([data.empty_snapshot]) + def test_snapshot_add_twice(self, swh_storage, sample_data_model): + snapshot, empty_snapshot = sample_data_model["snapshot"][:2] + + actual_result = swh_storage.snapshot_add([empty_snapshot]) assert actual_result == {"snapshot:add": 1} assert list(swh_storage.journal_writer.journal.objects) == [ - ("snapshot", Snapshot.from_dict(data.empty_snapshot)) + ("snapshot", empty_snapshot) ] - actual_result = swh_storage.snapshot_add([data.snapshot]) + actual_result = swh_storage.snapshot_add([snapshot]) assert actual_result == {"snapshot:add": 1} assert list(swh_storage.journal_writer.journal.objects) == [ - ("snapshot", Snapshot.from_dict(data.empty_snapshot)), - ("snapshot", Snapshot.from_dict(data.snapshot)), + ("snapshot", empty_snapshot), + ("snapshot", snapshot), ] - def test_snapshot_add_validation(self, swh_storage): - snap = copy.deepcopy(data.snapshot) + def test_snapshot_add_validation(self, swh_storage, sample_data_model): + snapshot = sample_data_model["snapshot"][0] + + snap = snapshot.to_dict() snap["branches"][b"foo"] = {"target_type": "revision"} with pytest.raises(StorageArgumentException, match="target"): swh_storage.snapshot_add([snap]) - snap = copy.deepcopy(data.snapshot) + snap = snapshot.to_dict() snap["branches"][b"foo"] = {"target": b"\x42" * 20} with pytest.raises(StorageArgumentException, match="target_type"): swh_storage.snapshot_add([snap]) - def test_snapshot_add_count_branches(self, swh_storage): - actual_result = swh_storage.snapshot_add([data.complete_snapshot]) + def test_snapshot_add_count_branches(self, swh_storage, sample_data_model): + complete_snapshot = sample_data_model["snapshot"][2] + + actual_result = swh_storage.snapshot_add([complete_snapshot]) assert actual_result == {"snapshot:add": 1} - snp_id = data.complete_snapshot["id"] - snp_size = swh_storage.snapshot_count_branches(snp_id) + snp_size = swh_storage.snapshot_count_branches(complete_snapshot.id) expected_snp_size = { "alias": 1, @@ -2555,11 +2567,13 @@ } assert snp_size == expected_snp_size - def test_snapshot_add_get_paginated(self, swh_storage): - swh_storage.snapshot_add([data.complete_snapshot]) + def test_snapshot_add_get_paginated(self, swh_storage, sample_data_model): + complete_snapshot = sample_data_model["snapshot"][2] + + swh_storage.snapshot_add([complete_snapshot]) - snp_id = data.complete_snapshot["id"] - branches = data.complete_snapshot["branches"] + snp_id = complete_snapshot.id + branches = complete_snapshot.to_dict()["branches"] branch_names = list(sorted(branches)) # Test branch_from @@ -2601,29 +2615,31 @@ assert snapshot == expected_snapshot - def test_snapshot_add_get_filtered(self, swh_storage): - swh_storage.origin_add([data.origin]) - origin_url = data.origin["url"] + def test_snapshot_add_get_filtered(self, swh_storage, sample_data_model): + origin = sample_data_model["origin"][0] + complete_snapshot = sample_data_model["snapshot"][2] + + swh_storage.origin_add([origin]) visit = OriginVisit( - origin=origin_url, date=data.date_visit1, type=data.type_visit1, + origin=origin.url, date=data.date_visit1, type=data.type_visit1, ) origin_visit1 = swh_storage.origin_visit_add([visit])[0] - swh_storage.snapshot_add([data.complete_snapshot]) + swh_storage.snapshot_add([complete_snapshot]) swh_storage.origin_visit_status_add( [ OriginVisitStatus( - origin=origin_url, + origin=origin.url, visit=origin_visit1.visit, date=now(), status="ongoing", - snapshot=data.complete_snapshot["id"], + snapshot=complete_snapshot.id, ) ] ) - snp_id = data.complete_snapshot["id"] - branches = data.complete_snapshot["branches"] + snp_id = complete_snapshot.id + branches = complete_snapshot.to_dict()["branches"] snapshot = swh_storage.snapshot_get_branches( snp_id, target_types=["release", "revision"] @@ -2655,11 +2671,15 @@ assert snapshot == expected_snapshot - def test_snapshot_add_get_filtered_and_paginated(self, swh_storage): - swh_storage.snapshot_add([data.complete_snapshot]) + def test_snapshot_add_get_filtered_and_paginated( + self, swh_storage, sample_data_model + ): + complete_snapshot = sample_data_model["snapshot"][2] + + swh_storage.snapshot_add([complete_snapshot]) - snp_id = data.complete_snapshot["id"] - branches = data.complete_snapshot["branches"] + snp_id = complete_snapshot.id + branches = complete_snapshot.to_dict()["branches"] branch_names = list(sorted(branches)) # Test branch_from @@ -2722,8 +2742,9 @@ assert snapshot == expected_snapshot - def test_snapshot_add_get_branch_by_type(self, swh_storage): - snapshot = copy.deepcopy(data.complete_snapshot) + def test_snapshot_add_get_branch_by_type(self, swh_storage, sample_data_model): + complete_snapshot = sample_data_model["snapshot"][2] + snapshot = complete_snapshot.to_dict() alias1 = b"alias1" alias2 = b"alias2" @@ -2740,10 +2761,11 @@ "target_type": "alias", } - swh_storage.snapshot_add([snapshot]) + new_snapshot = Snapshot.from_dict(snapshot) + swh_storage.snapshot_add([new_snapshot]) branches = swh_storage.snapshot_get_branches( - snapshot["id"], + new_snapshot.id, target_types=["alias"], branches_from=alias1, branches_count=1, @@ -2752,11 +2774,13 @@ assert len(branches) == 1 assert alias1 in branches - def test_snapshot_add_get(self, swh_storage): - swh_storage.origin_add([data.origin]) - origin_url = data.origin["url"] + def test_snapshot_add_get(self, swh_storage, sample_data_model): + snapshot = sample_data_model["snapshot"][0] + origin = sample_data_model["origin"][0] + + swh_storage.origin_add([origin]) visit = OriginVisit( - origin=origin_url, date=data.date_visit1, type=data.type_visit1, + origin=origin.url, date=data.date_visit1, type=data.type_visit1, ) origin_visit1 = swh_storage.origin_visit_add([visit])[0] visit_id = origin_visit1.visit @@ -2765,80 +2789,85 @@ swh_storage.origin_visit_status_add( [ OriginVisitStatus( - origin=origin_url, + origin=origin.url, visit=origin_visit1.visit, date=now(), status="ongoing", - snapshot=data.snapshot["id"], + snapshot=snapshot.id, ) ] ) - by_id = swh_storage.snapshot_get(data.snapshot["id"]) - assert by_id == {**data.snapshot, "next_branch": None} + expected_snapshot = {**snapshot.to_dict(), "next_branch": None} + + by_id = swh_storage.snapshot_get(snapshot.id) + assert by_id == expected_snapshot - by_ov = swh_storage.snapshot_get_by_origin_visit(origin_url, visit_id) - assert by_ov == {**data.snapshot, "next_branch": None} + by_ov = swh_storage.snapshot_get_by_origin_visit(origin.url, visit_id) + assert by_ov == expected_snapshot - origin_visit_info = swh_storage.origin_visit_get_by(origin_url, visit_id) - assert origin_visit_info["snapshot"] == data.snapshot["id"] + origin_visit_info = swh_storage.origin_visit_get_by(origin.url, visit_id) + assert origin_visit_info["snapshot"] == snapshot.id + + def test_snapshot_add_twice__by_origin_visit(self, swh_storage, sample_data_model): + snapshot = sample_data_model["snapshot"][0] + origin = sample_data_model["origin"][0] - def test_snapshot_add_twice__by_origin_visit(self, swh_storage): - swh_storage.origin_add([data.origin]) - origin_url = data.origin["url"] + swh_storage.origin_add([origin]) ov1 = swh_storage.origin_visit_add( [ OriginVisit( - origin=origin_url, date=data.date_visit1, type=data.type_visit1, + origin=origin.url, date=data.date_visit1, type=data.type_visit1, ) ] )[0] - swh_storage.snapshot_add([data.snapshot]) + swh_storage.snapshot_add([snapshot]) date_now2 = now() swh_storage.origin_visit_status_add( [ OriginVisitStatus( - origin=origin_url, + origin=origin.url, visit=ov1.visit, date=date_now2, status="ongoing", - snapshot=data.snapshot["id"], + snapshot=snapshot.id, ) ] ) - by_ov1 = swh_storage.snapshot_get_by_origin_visit(origin_url, ov1.visit) - assert by_ov1 == {**data.snapshot, "next_branch": None} + expected_snapshot = {**snapshot.to_dict(), "next_branch": None} + + by_ov1 = swh_storage.snapshot_get_by_origin_visit(origin.url, ov1.visit) + assert by_ov1 == expected_snapshot ov2 = swh_storage.origin_visit_add( [ OriginVisit( - origin=origin_url, date=data.date_visit2, type=data.type_visit2, + origin=origin.url, date=data.date_visit2, type=data.type_visit2, ) ] )[0] - swh_storage.snapshot_add([data.snapshot]) date_now4 = now() swh_storage.origin_visit_status_add( [ OriginVisitStatus( - origin=origin_url, + origin=origin.url, visit=ov2.visit, date=date_now4, status="ongoing", - snapshot=data.snapshot["id"], + snapshot=snapshot.id, ) ] ) - by_ov2 = swh_storage.snapshot_get_by_origin_visit(origin_url, ov2.visit) - assert by_ov2 == {**data.snapshot, "next_branch": None} + by_ov2 = swh_storage.snapshot_get_by_origin_visit(origin.url, ov2.visit) + assert by_ov2 == expected_snapshot ovs1 = OriginVisitStatus.from_dict( { - "origin": origin_url, + "origin": origin.url, "date": data.date_visit1, "visit": ov1.visit, "status": "created", @@ -2848,17 +2877,17 @@ ) ovs2 = OriginVisitStatus.from_dict( { - "origin": origin_url, + "origin": origin.url, "date": date_now2, "visit": ov1.visit, "status": "ongoing", "metadata": None, - "snapshot": data.snapshot["id"], + "snapshot": snapshot.id, } ) ovs3 = OriginVisitStatus.from_dict( { - "origin": origin_url, + "origin": origin.url, "date": data.date_visit2, "visit": ov2.visit, "status": "created", @@ -2868,20 +2897,20 @@ ) ovs4 = OriginVisitStatus.from_dict( { - "origin": origin_url, + "origin": origin.url, "date": date_now4, "visit": ov2.visit, "status": "ongoing", "metadata": None, - "snapshot": data.snapshot["id"], + "snapshot": snapshot.id, } ) actual_objects = list(swh_storage.journal_writer.journal.objects) expected_objects = [ - ("origin", Origin.from_dict(data.origin)), + ("origin", origin), ("origin_visit", ov1), ("origin_visit_status", ovs1), - ("snapshot", Snapshot.from_dict(data.snapshot)), + ("snapshot", snapshot), ("origin_visit_status", ovs2), ("origin_visit", ov2), ("origin_visit_status", ovs3), @@ -2891,26 +2920,24 @@ for obj in expected_objects: assert obj in actual_objects - def test_snapshot_get_random(self, swh_storage): - swh_storage.snapshot_add( - [data.snapshot, data.empty_snapshot, data.complete_snapshot] - ) + def test_snapshot_get_random(self, swh_storage, sample_data_model): + snapshot, empty_snapshot, complete_snapshot = sample_data_model["snapshot"][:3] + swh_storage.snapshot_add([snapshot, empty_snapshot, complete_snapshot]) assert swh_storage.snapshot_get_random() in { - data.snapshot["id"], - data.empty_snapshot["id"], - data.complete_snapshot["id"], + snapshot.id, + empty_snapshot.id, + complete_snapshot.id, } - def test_snapshot_missing(self, swh_storage): - snap = data.snapshot - missing_snap = data.empty_snapshot - snapshots = [snap["id"], missing_snap["id"]] - swh_storage.snapshot_add([snap]) + def test_snapshot_missing(self, swh_storage, sample_data_model): + snapshot, missing_snapshot = sample_data_model["snapshot"][:2] + snapshots = [snapshot.id, missing_snapshot.id] + swh_storage.snapshot_add([snapshot]) missing_snapshots = swh_storage.snapshot_missing(snapshots) - assert list(missing_snapshots) == [missing_snap["id"]] + assert list(missing_snapshots) == [missing_snapshot.id] def test_stat_counters(self, swh_storage): expected_keys = ["content", "directory", "origin", "revision"]