Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/algos/origin.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Iterator, List, Optional, Tuple | from typing import Iterator, List, Optional, Tuple | ||||
from swh.core.api.classes import stream_results | from swh.core.api.classes import stream_results | ||||
from swh.model.model import Origin, OriginVisit, OriginVisitStatus | from swh.model.model import Origin, OriginVisit, OriginVisitStatus | ||||
from swh.storage.interface import ListOrder, StorageInterface | from swh.storage.interface import ListOrder, StorageInterface | ||||
def iter_origins( | def iter_origins( | ||||
storage: StorageInterface, | storage: StorageInterface, page_token: Optional[str] = None, limit: int = 10000, | ||||
origin_from: int = 1, | |||||
origin_to: Optional[int] = None, | |||||
batch_size: int = 10000, | |||||
) -> Iterator[Origin]: | ) -> Iterator[Origin]: | ||||
"""Iterates over all origins in the storage. | """Iterates over origins in the storage. | ||||
Args: | Args: | ||||
storage: the storage object used for queries. | storage: the storage object used for queries. | ||||
origin_from: lower interval boundary | page_token: opaque token used for pagination | ||||
origin_to: upper interval boundary | limit: maximum number of origins per query | ||||
batch_size: number of origins per query | |||||
Yields: | Yields: | ||||
origin within the boundary [origin_to, origin_from] in batch_size | origin within the boundary [origin_to, origin_from] in batch_size | ||||
""" | """ | ||||
start = origin_from | |||||
while True: | while True: | ||||
if origin_to: | page_result = storage.origin_list(page_token=page_token, limit=limit) | ||||
origin_count = min(origin_to - start, batch_size) | origins = page_result.results | ||||
else: | |||||
origin_count = batch_size | |||||
origins = list( | |||||
storage.origin_get_range(origin_from=start, origin_count=origin_count) | |||||
) | |||||
if not origins: | if not origins: | ||||
break | break | ||||
start = origins[-1]["id"] + 1 | yield from origins | ||||
for origin in origins: | page_token = page_result.next_page_token | ||||
del origin["id"] | if not page_token: | ||||
yield Origin.from_dict(origin) | |||||
if origin_to and start > origin_to: | |||||
break | break | ||||
vlorentz: use `stream_results` | |||||
Done Inline ActionsI cannot, can I? D3682 needs it. That's why I opened this like this at least. ardumont: I cannot, can I?
D3682 needs it. That's why I opened this like this at least. | |||||
Done Inline ActionsAlso, as mentioned in the scheduler diff, we can add here the logger.info about the page_token (or in stream_results directly). I don't really see which is best? Right now, it's needed for the origins so i gather here is enough. ardumont: Also, as mentioned in the scheduler diff, we can add here the logger.info about the page_token… | |||||
Not Done Inline ActionsI think we should impelement iter_origins directly in the scheduler CLI as it has a rather specific need vlorentz: I think we should impelement `iter_origins` directly in the scheduler CLI as it has a rather… | |||||
Done Inline ActionsOk, so here, just plain stream_results call as the origin-visit and status in the same module at the end. sounds like a plan to me. Thanks for your input ;) ardumont: Ok, so here, just plain stream_results call as the origin-visit and status in the same module… | |||||
Not Done Inline Actions👍 vlorentz: 👍 | |||||
def origin_get_latest_visit_status( | def origin_get_latest_visit_status( | ||||
storage: StorageInterface, | storage: StorageInterface, | ||||
origin_url: str, | origin_url: str, | ||||
type: Optional[str] = None, | type: Optional[str] = None, | ||||
allowed_statuses: Optional[List[str]] = None, | allowed_statuses: Optional[List[str]] = None, | ||||
require_snapshot: bool = False, | require_snapshot: bool = False, | ||||
▲ Show 20 Lines • Show All 63 Lines • Show Last 20 Lines |
use stream_results