Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
Show First 20 Lines • Show All 1,207 Lines • ▼ Show 20 Lines | def test_origin_visit_get_all(self, swh_storage, sample_data): | ||||
OriginVisit( | OriginVisit( | ||||
origin=origin.url, | origin=origin.url, | ||||
date=sample_data.date_visit2, | date=sample_data.date_visit2, | ||||
type=sample_data.type_visit2, | type=sample_data.type_visit2, | ||||
), | ), | ||||
] | ] | ||||
) | ) | ||||
# order asc, no pagination, no limit | # order asc, no token, no limit | ||||
actual_page = swh_storage.origin_visit_get(origin.url) | actual_page = swh_storage.origin_visit_get(origin.url) | ||||
assert actual_page.next_page_token is None | assert actual_page.next_page_token is None | ||||
assert actual_page == PagedResult(results=[ov1, ov2, ov3]) | assert actual_page.results == [ov1, ov2, ov3] | ||||
# order asc, no pagination, limit | # order asc, no token, limit | ||||
actual_page = swh_storage.origin_visit_get(origin.url, limit=2) | actual_page = swh_storage.origin_visit_get(origin.url, limit=2) | ||||
next_page_token = actual_page.next_page_token | next_page_token = actual_page.next_page_token | ||||
assert next_page_token is not None | assert next_page_token is not None | ||||
assert actual_page.results == [ov1, ov2] | assert actual_page.results == [ov1, ov2] | ||||
# order asc, pagination, no limit | # order asc, token, no limit | ||||
actual_page = swh_storage.origin_visit_get( | actual_page = swh_storage.origin_visit_get( | ||||
origin.url, page_token=next_page_token | origin.url, page_token=next_page_token | ||||
) | ) | ||||
assert actual_page.next_page_token is None | assert actual_page.next_page_token is None | ||||
assert actual_page.results == [ov3] | assert actual_page.results == [ov3] | ||||
assert actual_page == PagedResult(results=[ov3]) | |||||
next_page_token = str(ov1.visit) | next_page_token = str(ov1.visit) | ||||
actual_page = swh_storage.origin_visit_get( | actual_page = swh_storage.origin_visit_get( | ||||
origin.url, page_token=next_page_token | origin.url, page_token=next_page_token | ||||
) | ) | ||||
assert actual_page.next_page_token is None | assert actual_page.next_page_token is None | ||||
assert actual_page == PagedResult(results=[ov2, ov3]) | assert actual_page.results == [ov2, ov3] | ||||
# order asc, pagination, limit | # order asc, token, limit | ||||
actual_page = swh_storage.origin_visit_get( | actual_page = swh_storage.origin_visit_get( | ||||
origin.url, page_token=next_page_token, limit=2 | origin.url, page_token=next_page_token, limit=2 | ||||
) | ) | ||||
assert actual_page.next_page_token is None | assert actual_page.next_page_token is None | ||||
assert actual_page.results == [ov2, ov3] | assert actual_page.results == [ov2, ov3] | ||||
assert actual_page == PagedResult(results=[ov2, ov3]) | |||||
next_page_token = str(ov2.visit) | next_page_token = str(ov2.visit) | ||||
actual_page = swh_storage.origin_visit_get( | actual_page = swh_storage.origin_visit_get( | ||||
origin.url, page_token=next_page_token, limit=1 | origin.url, page_token=next_page_token, limit=1 | ||||
) | ) | ||||
assert actual_page.next_page_token is None | assert actual_page.next_page_token is None | ||||
assert actual_page == PagedResult(results=[ov3]) | assert actual_page.results == [ov3] | ||||
# order desc, no pagination, no limit | # order desc, no token, no limit | ||||
actual_page = swh_storage.origin_visit_get(origin.url, order=ListOrder.DESC) | actual_page = swh_storage.origin_visit_get(origin.url, order=ListOrder.DESC) | ||||
assert actual_page.next_page_token is None | assert actual_page.next_page_token is None | ||||
assert actual_page == PagedResult(results=[ov3, ov2, ov1]) | assert actual_page.results == [ov3, ov2, ov1] | ||||
# order desc, no pagination, limit | # order desc, no token, limit | ||||
actual_page = swh_storage.origin_visit_get( | actual_page = swh_storage.origin_visit_get( | ||||
origin.url, limit=2, order=ListOrder.DESC | origin.url, limit=2, order=ListOrder.DESC | ||||
) | ) | ||||
next_page_token = actual_page.next_page_token | next_page_token = actual_page.next_page_token | ||||
assert next_page_token is not None | assert next_page_token is not None | ||||
assert actual_page.results == [ov3, ov2] | assert actual_page.results == [ov3, ov2] | ||||
actual_page = swh_storage.origin_visit_get( | actual_page = swh_storage.origin_visit_get( | ||||
origin.url, page_token=next_page_token, order=ListOrder.DESC | origin.url, page_token=next_page_token, order=ListOrder.DESC | ||||
) | ) | ||||
assert actual_page.next_page_token is None | assert actual_page.next_page_token is None | ||||
assert actual_page.results == [ov1] | assert actual_page.results == [ov1] | ||||
assert actual_page == PagedResult(results=[ov1]) | |||||
# order desc, pagination, no limit | # order desc, token, no limit | ||||
next_page_token = str(ov3.visit) | next_page_token = str(ov3.visit) | ||||
actual_page = swh_storage.origin_visit_get( | actual_page = swh_storage.origin_visit_get( | ||||
origin.url, page_token=next_page_token, order=ListOrder.DESC | origin.url, page_token=next_page_token, order=ListOrder.DESC | ||||
) | ) | ||||
assert actual_page.next_page_token is None | assert actual_page.next_page_token is None | ||||
assert actual_page == PagedResult(results=[ov2, ov1]) | assert actual_page.results == [ov2, ov1] | ||||
# order desc, pagination, limit | # order desc, token, limit | ||||
actual_page = swh_storage.origin_visit_get( | actual_page = swh_storage.origin_visit_get( | ||||
origin.url, page_token=next_page_token, order=ListOrder.DESC, limit=1 | origin.url, page_token=next_page_token, order=ListOrder.DESC, limit=1 | ||||
) | ) | ||||
next_page_token = actual_page.next_page_token | next_page_token = actual_page.next_page_token | ||||
assert next_page_token is not None | assert next_page_token is not None | ||||
assert actual_page.results == [ov2] | assert actual_page.results == [ov2] | ||||
actual_page = swh_storage.origin_visit_get( | actual_page = swh_storage.origin_visit_get( | ||||
origin.url, page_token=next_page_token, order=ListOrder.DESC | origin.url, page_token=next_page_token, order=ListOrder.DESC | ||||
) | ) | ||||
assert actual_page == PagedResult(results=[ov1]) | assert actual_page.next_page_token is None | ||||
assert actual_page.results == [ov1] | |||||
def test_origin_visit_status_get_random(self, swh_storage, sample_data): | def test_origin_visit_status_get_random(self, swh_storage, sample_data): | ||||
origins = sample_data.origins[:2] | origins = sample_data.origins[:2] | ||||
swh_storage.origin_add(origins) | swh_storage.origin_add(origins) | ||||
# Add some random visits within the selection range | # Add some random visits within the selection range | ||||
visits = self._generate_random_visits() | visits = self._generate_random_visits() | ||||
visit_type = "git" | visit_type = "git" | ||||
▲ Show 20 Lines • Show All 2,854 Lines • Show Last 20 Lines |