Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
Show First 20 Lines • Show All 1,307 Lines • ▼ Show 20 Lines | def test_origin_visit_get_all(self, swh_storage, sample_data): | ||||
assert actual_page.results == [ov2] | assert actual_page.results == [ov2] | ||||
actual_page = swh_storage.origin_visit_get( | actual_page = swh_storage.origin_visit_get( | ||||
origin.url, page_token=next_page_token, order=ListOrder.DESC | origin.url, page_token=next_page_token, order=ListOrder.DESC | ||||
) | ) | ||||
assert actual_page.next_page_token is None | assert actual_page.next_page_token is None | ||||
assert actual_page.results == [ov1] | assert actual_page.results == [ov1] | ||||
def test_origin_visit_status_get__unknown_cases(self, swh_storage, sample_data): | |||||
origin = sample_data.origin | |||||
actual_page = swh_storage.origin_visit_status_get("foobar", 1) | |||||
assert actual_page.next_page_token is None | |||||
assert actual_page.results == [] | |||||
actual_page = swh_storage.origin_visit_status_get(origin.url, 1) | |||||
assert actual_page.next_page_token is None | |||||
assert actual_page.results == [] | |||||
origin = sample_data.origin | |||||
swh_storage.origin_add([origin]) | |||||
ov1 = swh_storage.origin_visit_add( | |||||
[ | |||||
OriginVisit( | |||||
origin=origin.url, | |||||
date=sample_data.date_visit1, | |||||
type=sample_data.type_visit1, | |||||
), | |||||
] | |||||
)[0] | |||||
actual_page = swh_storage.origin_visit_status_get(origin.url, ov1.visit + 10) | |||||
assert actual_page.next_page_token is None | |||||
assert actual_page.results == [] | |||||
def test_origin_visit_status_get_all(self, swh_storage, sample_data): | |||||
origin = sample_data.origin | |||||
swh_storage.origin_add([origin]) | |||||
date_visit3 = round_to_milliseconds(now()) | |||||
date_visit1 = date_visit3 - datetime.timedelta(hours=2) | |||||
date_visit2 = date_visit3 - datetime.timedelta(hours=1) | |||||
assert date_visit1 < date_visit2 < date_visit3 | |||||
ov1 = swh_storage.origin_visit_add( | |||||
[ | |||||
OriginVisit( | |||||
origin=origin.url, date=date_visit1, type=sample_data.type_visit1, | |||||
), | |||||
] | |||||
)[0] | |||||
ovs1 = OriginVisitStatus( | |||||
origin=origin.url, | |||||
visit=ov1.visit, | |||||
date=date_visit1, | |||||
status="created", | |||||
snapshot=None, | |||||
) | |||||
ovs2 = OriginVisitStatus( | |||||
origin=origin.url, | |||||
visit=ov1.visit, | |||||
date=date_visit2, | |||||
status="partial", | |||||
snapshot=None, | |||||
) | |||||
ovs3 = OriginVisitStatus( | |||||
origin=origin.url, | |||||
vlorentz: assert date_visit1 < date_visit2 < date_visit3 | |||||
visit=ov1.visit, | |||||
date=date_visit3, | |||||
status="full", | |||||
snapshot=sample_data.snapshot.id, | |||||
metadata={}, | |||||
) | |||||
swh_storage.origin_visit_status_add([ovs2, ovs3]) | |||||
# order asc, no token, no limit | |||||
actual_page = swh_storage.origin_visit_status_get(origin.url, ov1.visit) | |||||
assert actual_page.next_page_token is None | |||||
assert actual_page.results == [ovs1, ovs2, ovs3] | |||||
# order asc, no token, limit | |||||
actual_page = swh_storage.origin_visit_status_get( | |||||
origin.url, ov1.visit, limit=2 | |||||
) | |||||
next_page_token = actual_page.next_page_token | |||||
assert next_page_token is not None | |||||
assert actual_page.results == [ovs1, ovs2] | |||||
# order asc, token, no limit | |||||
actual_page = swh_storage.origin_visit_status_get( | |||||
origin.url, ov1.visit, page_token=next_page_token | |||||
) | |||||
assert actual_page.next_page_token is None | |||||
assert actual_page.results == [ovs3] | |||||
# order asc, no token, limit | |||||
actual_page = swh_storage.origin_visit_status_get( | |||||
origin.url, ov1.visit, limit=1 | |||||
) | |||||
next_page_token = actual_page.next_page_token | |||||
assert next_page_token is not None | |||||
assert actual_page.results == [ovs1] | |||||
actual_page = swh_storage.origin_visit_status_get( | |||||
origin.url, ov1.visit, page_token=next_page_token | |||||
) | |||||
assert actual_page.next_page_token is None | |||||
assert actual_page.results == [ovs2, ovs3] | |||||
# order asc, token, limit | |||||
actual_page = swh_storage.origin_visit_status_get( | |||||
origin.url, ov1.visit, page_token=next_page_token, limit=2 | |||||
) | |||||
assert actual_page.next_page_token is None | |||||
assert actual_page.results == [ovs2, ovs3] | |||||
# order asc, no token, limit | |||||
actual_page = swh_storage.origin_visit_status_get( | |||||
origin.url, ov1.visit, limit=2 | |||||
) | |||||
next_page_token = actual_page.next_page_token | |||||
assert next_page_token is not None | |||||
assert actual_page.results == [ovs1, ovs2] | |||||
actual_page = swh_storage.origin_visit_status_get( | |||||
origin.url, ov1.visit, page_token=next_page_token, limit=1 | |||||
) | |||||
assert actual_page.next_page_token is None | |||||
assert actual_page.results == [ovs3] | |||||
# order desc, no token, no limit | |||||
actual_page = swh_storage.origin_visit_status_get( | |||||
origin.url, ov1.visit, order=ListOrder.DESC | |||||
) | |||||
assert actual_page.next_page_token is None | |||||
assert actual_page.results == [ovs3, ovs2, ovs1] | |||||
# order desc, no token, limit | |||||
actual_page = swh_storage.origin_visit_status_get( | |||||
origin.url, ov1.visit, limit=2, order=ListOrder.DESC | |||||
) | |||||
next_page_token = actual_page.next_page_token | |||||
assert next_page_token is not None | |||||
assert actual_page.results == [ovs3, ovs2] | |||||
actual_page = swh_storage.origin_visit_status_get( | |||||
origin.url, ov1.visit, page_token=next_page_token, order=ListOrder.DESC | |||||
) | |||||
assert actual_page.next_page_token is None | |||||
assert actual_page.results == [ovs1] | |||||
# order desc, no token, limit | |||||
actual_page = swh_storage.origin_visit_status_get( | |||||
origin.url, ov1.visit, order=ListOrder.DESC, limit=1 | |||||
) | |||||
next_page_token = actual_page.next_page_token | |||||
assert next_page_token is not None | |||||
assert actual_page.results == [ovs3] | |||||
# order desc, token, no limit | |||||
actual_page = swh_storage.origin_visit_status_get( | |||||
origin.url, ov1.visit, page_token=next_page_token, order=ListOrder.DESC | |||||
) | |||||
assert actual_page.next_page_token is None | |||||
assert actual_page.results == [ovs2, ovs1] | |||||
# order desc, token, limit | |||||
actual_page = swh_storage.origin_visit_status_get( | |||||
origin.url, | |||||
ov1.visit, | |||||
page_token=next_page_token, | |||||
order=ListOrder.DESC, | |||||
limit=1, | |||||
) | |||||
next_page_token = actual_page.next_page_token | |||||
assert next_page_token is not None | |||||
assert actual_page.results == [ovs2] | |||||
actual_page = swh_storage.origin_visit_status_get( | |||||
origin.url, ov1.visit, page_token=next_page_token, order=ListOrder.DESC | |||||
) | |||||
assert actual_page.next_page_token is None | |||||
assert actual_page.results == [ovs1] | |||||
def test_origin_visit_status_get_random(self, swh_storage, sample_data): | def test_origin_visit_status_get_random(self, swh_storage, sample_data): | ||||
origins = sample_data.origins[:2] | origins = sample_data.origins[:2] | ||||
swh_storage.origin_add(origins) | swh_storage.origin_add(origins) | ||||
# Add some random visits within the selection range | # Add some random visits within the selection range | ||||
visits = self._generate_random_visits() | visits = self._generate_random_visits() | ||||
visit_type = "git" | visit_type = "git" | ||||
▲ Show 20 Lines • Show All 2,853 Lines • Show Last 20 Lines |
assert date_visit1 < date_visit2 < date_visit3