Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
Show First 20 Lines • Show All 1,962 Lines • ▼ Show 20 Lines | def test_origin_visit_get_by(self, swh_storage): | ||||
status="ongoing", | status="ongoing", | ||||
snapshot=data.snapshot["id"], | snapshot=data.snapshot["id"], | ||||
) | ) | ||||
] | ] | ||||
) | ) | ||||
# Add some other {origin, visit} entries | # Add some other {origin, visit} entries | ||||
visit2 = OriginVisit( | visit2 = OriginVisit( | ||||
origin=origin_url, | origin=origin_url, date=data.date_visit3, type=data.type_visit3, | ||||
date=data.date_visit3, | |||||
type=data.type_visit3, | |||||
status="ongoing", | |||||
snapshot=None, | |||||
) | ) | ||||
visit3 = OriginVisit( | visit3 = OriginVisit( | ||||
origin=origin_url2, | origin=origin_url2, date=data.date_visit3, type=data.type_visit3, | ||||
date=data.date_visit3, | |||||
type=data.type_visit3, | |||||
status="ongoing", | |||||
snapshot=None, | |||||
) | ) | ||||
swh_storage.origin_visit_add([visit2, visit3]) | swh_storage.origin_visit_add([visit2, visit3]) | ||||
# when | # when | ||||
visit1_metadata = { | visit1_metadata = { | ||||
"contents": 42, | "contents": 42, | ||||
"directories": 22, | "directories": 22, | ||||
} | } | ||||
▲ Show 20 Lines • Show All 437 Lines • ▼ Show 20 Lines | def test_snapshot_add_get_empty(self, swh_storage): | ||||
] | ] | ||||
for obj in expected_objects: | for obj in expected_objects: | ||||
assert obj in actual_objects | assert obj in actual_objects | ||||
def test_snapshot_add_get_complete(self, swh_storage): | def test_snapshot_add_get_complete(self, swh_storage): | ||||
origin_url = data.origin["url"] | origin_url = data.origin["url"] | ||||
origin_url = swh_storage.origin_add_one(data.origin) | origin_url = swh_storage.origin_add_one(data.origin) | ||||
visit = OriginVisit( | visit = OriginVisit( | ||||
origin=origin_url, | origin=origin_url, date=data.date_visit1, type=data.type_visit1, | ||||
date=data.date_visit1, | |||||
type=data.type_visit1, | |||||
status="ongoing", | |||||
snapshot=None, | |||||
) | ) | ||||
origin_visit1 = swh_storage.origin_visit_add([visit])[0] | origin_visit1 = swh_storage.origin_visit_add([visit])[0] | ||||
visit_id = origin_visit1.visit | visit_id = origin_visit1.visit | ||||
actual_result = swh_storage.snapshot_add([data.complete_snapshot]) | actual_result = swh_storage.snapshot_add([data.complete_snapshot]) | ||||
swh_storage.origin_visit_status_add( | swh_storage.origin_visit_status_add( | ||||
[ | [ | ||||
OriginVisitStatus( | OriginVisitStatus( | ||||
▲ Show 20 Lines • Show All 454 Lines • ▼ Show 20 Lines | def test_stat_counters(self, swh_storage): | ||||
if key != "content": | if key != "content": | ||||
assert counters[key] == 0 | assert counters[key] == 0 | ||||
assert counters["content"] == 1 | assert counters["content"] == 1 | ||||
# Add other objects. Check their counter increased as well. | # Add other objects. Check their counter increased as well. | ||||
origin_url = swh_storage.origin_add_one(data.origin2) | origin_url = swh_storage.origin_add_one(data.origin2) | ||||
visit = OriginVisit( | visit = OriginVisit( | ||||
origin=origin_url, | origin=origin_url, date=data.date_visit2, type=data.type_visit2, | ||||
date=data.date_visit2, | |||||
type=data.type_visit2, | |||||
status="ongoing", | |||||
snapshot=None, | |||||
) | ) | ||||
origin_visit1 = swh_storage.origin_visit_add([visit])[0] | origin_visit1 = swh_storage.origin_visit_add([visit])[0] | ||||
swh_storage.snapshot_add([data.snapshot]) | swh_storage.snapshot_add([data.snapshot]) | ||||
swh_storage.origin_visit_status_add( | swh_storage.origin_visit_status_add( | ||||
[ | [ | ||||
OriginVisitStatus( | OriginVisitStatus( | ||||
origin=origin_url, | origin=origin_url, | ||||
▲ Show 20 Lines • Show All 1,024 Lines • Show Last 20 Lines |