Changeset View
Changeset View
Standalone View
Standalone View
swh/web/common/service.py
Show All 16 Lines | |||||
from swh.storage.algos.origin import origin_get_latest_visit_status | from swh.storage.algos.origin import origin_get_latest_visit_status | ||||
from swh.storage.algos.snapshot import snapshot_get_latest | from swh.storage.algos.snapshot import snapshot_get_latest | ||||
from swh.vault.exc import NotFoundExc as VaultNotFoundExc | from swh.vault.exc import NotFoundExc as VaultNotFoundExc | ||||
from swh.web import config | from swh.web import config | ||||
from swh.web.common import converters | from swh.web.common import converters | ||||
from swh.web.common import query | from swh.web.common import query | ||||
from swh.web.common.exc import BadInputExc, NotFoundExc | from swh.web.common.exc import BadInputExc, NotFoundExc | ||||
from swh.web.common.origin_visits import get_origin_visit | from swh.web.common.origin_visits import get_origin_visit | ||||
from swh.web.common.typing import OriginInfo, OriginVisitInfo, OriginMetadataInfo | from swh.web.common.typing import ( | ||||
OriginInfo, | |||||
OriginVisitInfo, | |||||
OriginMetadataInfo, | |||||
PagedResult, | |||||
) | |||||
search = config.search() | search = config.search() | ||||
storage = config.storage() | storage = config.storage() | ||||
vault = config.vault() | vault = config.vault() | ||||
idx_storage = config.indexer_storage() | idx_storage = config.indexer_storage() | ||||
▲ Show 20 Lines • Show All 204 Lines • ▼ Show 20 Lines | def lookup_origin(origin: OriginInfo) -> OriginInfo: | ||||
origins = [o for o in storage.origin_get(origin_urls) if o is not None] | origins = [o for o in storage.origin_get(origin_urls) if o is not None] | ||||
if not origins: | if not origins: | ||||
msg = "Origin with url %s not found!" % origin["url"] | msg = "Origin with url %s not found!" % origin["url"] | ||||
raise NotFoundExc(msg) | raise NotFoundExc(msg) | ||||
return converters.from_origin(origins[0].to_dict()) | return converters.from_origin(origins[0].to_dict()) | ||||
def lookup_origins( | def lookup_origins( | ||||
origin_from: int = 1, origin_count: int = 100 | page_token: Optional[str], limit: int = 100 | ||||
) -> Iterator[OriginInfo]: | ) -> PagedResult[OriginInfo]: | ||||
vlorentz: `-> PagedResultInfo[OriginInfo]` | |||||
Done Inline Actionsyes, fixed locally already, i waited for the build to finish. ardumont: yes, fixed locally already, i waited for the build to finish. | |||||
"""Get list of archived software origins in a paginated way. | """Get list of archived software origins in a paginated way. | ||||
Origins are sorted by id before returning them | Origins are sorted by id before returning them | ||||
Args: | Args: | ||||
origin_from (int): The minimum id of the origins to return | origin_from (int): The minimum id of the origins to return | ||||
origin_count (int): The maximum number of origins to return | origin_count (int): The maximum number of origins to return | ||||
Yields: | Returns: | ||||
origins information as dicts | Page of OriginInfo | ||||
""" | """ | ||||
origins = storage.origin_get_range(origin_from, origin_count) | page = storage.origin_list(page_token=page_token, limit=limit) | ||||
return map(converters.from_origin, origins) | return PagedResult( | ||||
[converters.from_origin(o.to_dict()) for o in page.results], | |||||
next_page_token=page.next_page_token, | |||||
) | |||||
def search_origin( | def search_origin( | ||||
url_pattern: str, | url_pattern: str, | ||||
limit: int = 50, | limit: int = 50, | ||||
with_visit: bool = False, | with_visit: bool = False, | ||||
page_token: Optional[str] = None, | page_token: Optional[str] = None, | ||||
) -> Tuple[List[OriginInfo], Optional[str]]: | ) -> Tuple[List[OriginInfo], Optional[str]]: | ||||
▲ Show 20 Lines • Show All 1,053 Lines • Show Last 20 Lines |
-> PagedResultInfo[OriginInfo]