Page MenuHomeSoftware Heritage

D3664.id12904.diff
No OneTemporary

D3664.id12904.diff

diff --git a/swh/core/api/classes.py b/swh/core/api/classes.py
--- a/swh/core/api/classes.py
+++ b/swh/core/api/classes.py
@@ -6,7 +6,9 @@
from dataclasses import dataclass, field
from typing import (
+ Callable,
Generic,
+ Iterable,
List,
Optional,
TypeVar,
@@ -23,3 +25,20 @@
results: List[TResult] = field(default_factory=list)
next_page_token: Optional[TToken] = field(default=None)
+
+
+def stream_results(
+ f: Callable[..., PagedResult[TResult, TToken]], *args, **kwargs
+) -> Iterable[TResult]:
+ """Consume the paginated result and stream the page results
+
+ """
+ if "page_token" in kwargs:
+ raise TypeError('stream_results has no argument "page_token".')
+ page_token = None
+ while True:
+ page_result = f(*args, page_token=page_token, **kwargs)
+ yield from page_result.results
+ page_token = page_result.next_page_token
+ if page_token is None:
+ break
diff --git a/swh/core/api/tests/test_classes.py b/swh/core/api/tests/test_classes.py
new file mode 100644
--- /dev/null
+++ b/swh/core/api/tests/test_classes.py
@@ -0,0 +1,60 @@
+# Copyright (C) 2020 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from swh.core.api.classes import PagedResult as CorePagedResult, stream_results
+
+from typing import TypeVar
+
+
+T = TypeVar("T")
+TestPagedResult = CorePagedResult[T, bytes]
+
+
+def test_stream_results_no_result():
+ def paged_results(page_token) -> TestPagedResult:
+ return TestPagedResult(results=[], next_page_token=None)
+
+ # only 1 call, no pagination
+ actual_data = stream_results(paged_results)
+ assert list(actual_data) == []
+
+
+def test_stream_results_no_pagination():
+ input_data = [
+ {"url": "something"},
+ {"url": "something2"},
+ ]
+
+ def paged_results(page_token) -> TestPagedResult:
+ return TestPagedResult(results=input_data, next_page_token=None)
+
+ # only 1 call, no pagination
+ actual_data = stream_results(paged_results)
+ assert list(actual_data) == input_data
+
+
+def test_stream_results_pagination():
+ input_data = [
+ {"url": "something"},
+ {"url": "something2"},
+ ]
+ input_data2 = [
+ {"url": "something3"},
+ ]
+ input_data3 = [
+ {"url": "something4"},
+ ]
+
+ def page_results2(page_token=None) -> TestPagedResult:
+ result_per_token = {
+ None: TestPagedResult(results=input_data, next_page_token=b"two"),
+ b"two": TestPagedResult(results=input_data2, next_page_token=b"three"),
+ b"three": TestPagedResult(results=input_data3, next_page_token=None),
+ }
+ return result_per_token[page_token]
+
+ # multiple calls to solve the pagination calls
+ actual_data = stream_results(page_results2)
+ assert list(actual_data) == input_data + input_data2 + input_data3

File Metadata

Mime Type
text/plain
Expires
Sun, Aug 17, 10:54 PM (1 w, 3 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220230

Event Timeline