Changeset View
Changeset View
Standalone View
Standalone View
swh/core/api/classes.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import asyncio | |||||
from dataclasses import dataclass, field | from dataclasses import dataclass, field | ||||
import itertools | import itertools | ||||
from typing import Callable, Generic, Iterable, List, Optional, TypeVar | from typing import ( | ||||
AsyncIterable, | |||||
Awaitable, | |||||
Callable, | |||||
Generic, | |||||
Iterable, | |||||
List, | |||||
Optional, | |||||
TypeVar, | |||||
) | |||||
TResult = TypeVar("TResult") | TResult = TypeVar("TResult") | ||||
TToken = TypeVar("TToken") | TToken = TypeVar("TToken") | ||||
@dataclass(eq=True) | @dataclass(eq=True) | ||||
class PagedResult(Generic[TResult, TToken]): | class PagedResult(Generic[TResult, TToken]): | ||||
"""Represents a page of results; with a token to get the next page""" | """Represents a page of results; with a token to get the next page""" | ||||
Show All 37 Lines | ) -> Optional[Iterable[TResult]]: | ||||
else: | else: | ||||
if res.next_page_token is None: | if res.next_page_token is None: | ||||
return iter(res.results) | return iter(res.results) | ||||
else: | else: | ||||
return itertools.chain( | return itertools.chain( | ||||
res.results, | res.results, | ||||
_stream_results(f, *args, page_token=res.next_page_token, **kwargs), | _stream_results(f, *args, page_token=res.next_page_token, **kwargs), | ||||
) | ) | ||||
async def asyncchain(iterable1, iterable2): | |||||
"""Like itertools.chain, but async""" | |||||
async for item in iterable1: | |||||
yield item | |||||
async for item in iterable2: | |||||
yield item | |||||
async def asynciter(iterable): | |||||
"""Like iter(), but async""" | |||||
for item in iterable: | |||||
yield item | |||||
async def _stream_results_async(f, *args, page_token, **kwargs): | |||||
"""Helper for stream_results_async() and stream_results_optional_async()""" | |||||
while True: | |||||
page_result = await f(*args, page_token=page_token, **kwargs) | |||||
for res in page_result.results: | |||||
yield res | |||||
page_token = page_result.next_page_token | |||||
if page_token is None: | |||||
break | |||||
async def stream_results_async( | |||||
f: Callable[..., Awaitable[PagedResult[TResult, TToken]]], *args, **kwargs | |||||
) -> AsyncIterable[TResult]: | |||||
"""Consume the paginated result and stream the page results | |||||
""" | |||||
if "page_token" in kwargs: | |||||
raise TypeError('stream_results has no argument "page_token".') | |||||
return await _stream_results(f, *args, page_token=None, **kwargs) | |||||
async def stream_results_optional_async( | |||||
f: Callable[..., Awaitable[Optional[PagedResult[TResult, TToken]]]], *args, **kwargs | |||||
) -> Awaitable[Optional[AsyncIterable[TResult]]]: | |||||
"""Like stream_results(), but for functions ``f`` that return an Optional. | |||||
""" | |||||
if "page_token" in kwargs: | |||||
raise TypeError('stream_results_optional has no argument "page_token".') | |||||
res = await f(*args, page_token=None, **kwargs) | |||||
if res is None: | |||||
future: asyncio.Future[None] = asyncio.Future(loop=asyncio.get_running_loop()) | |||||
future.set_result(None) | |||||
return future | |||||
else: | |||||
if res.next_page_token is None: | |||||
return asynciter(res.results) | |||||
else: | |||||
return asyncchain( | |||||
asynciter(res.results), | |||||
_stream_results_async( | |||||
f, *args, page_token=res.next_page_token, **kwargs | |||||
), | |||||
) |