Page MenuHomeSoftware Heritage
Paste P1109

stream_results_optional
ActivePublic

Authored by vlorentz on Jul 30 2021, 3:21 PM.
def stream_results_optional(
f: Callable[..., Optional[PagedResult[TResult, TToken]]], *args, **kwargs
) -> Optional[Iterable[TResult]]:
"""Like stream_results(), but for functions ``f`` that return an Optional.
"""
if "page_token" in kwargs:
raise TypeError('stream_results has no argument "page_token".')
page_token = None
for i in itertools.count():
page_result = f(*args, page_token=page_token, **kwargs)
if page_result is None:
if page == 0:
return None
else:
raise ValueError(
f"First iteration(s) of {f!r} return non-None, but iteration "
f"{i} (with page_token={page_token!r}) returned None."
)
yield from page_result.results
page_token = page_result.next_page_token
if page_token is None:
break