Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/pattern.py
- This file was added.
# Copyright (C) 2020 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
from dataclasses import dataclass | |||||
from typing import ( | |||||
Any, | |||||
Dict, | |||||
Generic, | |||||
Iterable, | |||||
Iterator, | |||||
List, | |||||
TypeVar, | |||||
) | |||||
from swh.core import config | |||||
from swh.scheduler import get_scheduler | |||||
from swh.scheduler import model | |||||
@dataclass | |||||
class ListerStats: | |||||
pages: int = 0 | |||||
origins: int = 0 | |||||
def __add__(self, other: "ListerStats") -> "ListerStats": | |||||
return self.__class__(self.pages + other.pages, self.origins + other.origins) | |||||
def __iadd__(self, other: "ListerStats"): | |||||
self.pages += other.pages | |||||
self.origins += other.origins | |||||
def dict(self) -> Dict[str, int]: | |||||
return {"pages": self.pages, "origins": self.origins} | |||||
StateType = TypeVar("StateType") | |||||
PageType = TypeVar("PageType") | |||||
BackendStateType = Dict[str, Any] | |||||
class Lister(Generic[StateType, PageType]): | |||||
"""The base class for a Software Heritage lister. | |||||
A lister scrapes a page by page list of origins from an upstream (a forge, the API | |||||
of a package manager, ...), and massages the results of that scrape into a list of | |||||
origins that are recorded by the scheduler backend. | |||||
The main loop of the lister, :meth:`run`, basically revolves around the | |||||
:meth:`get_pages` iterator, which sets up the lister state, then yields the scrape | |||||
results page by page. The :meth:`get_origins_from_page` method converts the pages | |||||
into a list of :class:`model.ListedOrigin`, sent to the scheduler at every page. The | |||||
:meth:`commit_page` method can be used to update the lister state after a page of | |||||
origins has been recorded in the scheduler backend. | |||||
The :func:`finalize_state` method is called at lister teardown (whether the run has | |||||
been successful or not) to update the local :attr:`state` object before it's sent to | |||||
the database. This method must set the :attr:`state_updated` attribute if an updated | |||||
state needs to be sent to the scheduler backend. This method can call | |||||
:func:`get_state_from_scheduler` to refresh and merge the lister state from the | |||||
scheduler before it's finalized (and potentially minimize the risk of race | |||||
conditions between concurrent runs of the lister). | |||||
The state of the lister is serialized and deserialized from the dict stored in the | |||||
scheduler backend, using the :meth:`state_from_dict` and :meth:`state_to_dict` | |||||
methods. | |||||
Args: | |||||
url: a URL representing this lister, e.g. the API's base URL | |||||
instance: the instance name used, in conjunction with :attr:`LISTER_NAME`, to | |||||
uniquely identify this lister instance. | |||||
Generic types: | |||||
- *StateType*: concrete lister type; should usually be a :class:`dataclass` for | |||||
stricter typing | |||||
- *PageType*: type of scrape results; can usually be a :class:`requests.Response`, | |||||
or a :class:`dict` | |||||
""" | |||||
LISTER_NAME: str = "" | |||||
vlorentz: I would go for TypeError instead | |||||
def __init__(self, url: str, instance: str): | |||||
if not self.LISTER_NAME: | |||||
raise ValueError("Must set the LISTER_NAME attribute on Lister classes") | |||||
self.url = url | |||||
self.instance = instance | |||||
self.config = self.load_config() | |||||
scheduler_config = self.config.get( | |||||
"scheduler", {"cls": "remote", "args": {"url": "http://localhost:5008/"}} | |||||
) | |||||
self.scheduler = get_scheduler(**scheduler_config) | |||||
# store the initial state of the lister | |||||
self.state = self.get_state_from_scheduler() | |||||
self.state_updated = False | |||||
def config_base_filename(self) -> str: | |||||
return "lister_%s" % self.LISTER_NAME | |||||
def load_config(self) -> Dict[str, Any]: | |||||
"""Load the configuration from the configured filename""" | |||||
return config.SWHConfig().parse_config_file( | |||||
base_filename=self.config_base_filename(), | |||||
) | |||||
def get_credentials(self) -> List[Dict[str, str]]: | |||||
"""Get the credentials for the current instance of the lister""" | |||||
return ( | |||||
self.config.get("credentials", {}) | |||||
.get(self.LISTER_NAME, {}) | |||||
.get(self.instance, []) | |||||
) | |||||
def run(self) -> ListerStats: | |||||
"""Run the lister. | |||||
Returns: | |||||
Not Done Inline ActionsWouldn't it make more sense to do self.config.get(self.LISTER_NAME, {}).get(self.instance, {}).get("credentials", [])? vlorentz: Wouldn't it make more sense to do `self.config.get(self.LISTER_NAME, {}).get(self.instance, {}). | |||||
Done Inline ActionsI used this schema to be backwards-compatible with the current config. I think there's space for a more extensible config schema, but overall we haven't really needed more than a credentials key for now. Maybe I'll want to change this when I reach the Debian lister. We'll see. olasd: I used this schema to be backwards-compatible with the current config. I think there's space… | |||||
Done Inline Actionsok vlorentz: ok | |||||
A counter with the number of pages and origins seen for this run | |||||
of the lister. | |||||
""" | |||||
full_stats = ListerStats() | |||||
try: | |||||
for page in self.get_pages(): | |||||
full_stats.pages += 1 | |||||
origins = self.get_origins_from_page(page) | |||||
full_stats.origins += self.send_origins(origins) | |||||
self.commit_page(page) | |||||
finally: | |||||
self.finalize_state() | |||||
if self.state_updated: | |||||
self.set_state_in_scheduler() | |||||
return full_stats | |||||
def get_state_from_scheduler(self) -> StateType: | |||||
"""Update the state in the current instance from the state in the scheduler backend. | |||||
This updates :attr:`lister_obj`, and returns its (deserialized) current state, | |||||
to allow for comparison with the local state. | |||||
Returns: | |||||
the state retrieved from the scheduler backend | |||||
""" | |||||
self.lister_obj = self.scheduler.get_or_create_lister( | |||||
name=self.LISTER_NAME, instance_name=self.instance | |||||
) | |||||
return self.state_from_dict(self.lister_obj.current_state) | |||||
def set_state_in_scheduler(self) -> None: | |||||
"""Update the state in the scheduler backend from the state of the current instance. | |||||
Raises: | |||||
:class:`swh.scheduler.exc.StaleData` in case of a race condition between | |||||
concurrent listers (from :meth:`swh.scheduler.Scheduler.update_lister`). | |||||
""" | |||||
if self.state is None: | |||||
raise ValueError("Current state unset!") | |||||
self.lister_obj.current_state = self.state_to_dict(self.state) | |||||
self.lister_obj = self.scheduler.update_lister(self.lister_obj) | |||||
# State management to/from the scheduler | |||||
def state_from_dict(self, d: BackendStateType) -> StateType: | |||||
"""Convert the state stored in the scheduler backend (as a dict), | |||||
to the concrete StateType for this lister.""" | |||||
raise NotImplementedError | |||||
def state_to_dict(self, state: StateType) -> BackendStateType: | |||||
"""Convert the StateType for this lister to its serialization as dict for | |||||
storage in the scheduler. | |||||
Values must be JSON-compatible as that's what the backend database expects. | |||||
""" | |||||
raise NotImplementedError | |||||
def finalize_state(self) -> None: | |||||
"""Custom hook to finalize the lister state before returning from the main loop. | |||||
This method must set :attr:`state_updated` if the updated state must be sent to | |||||
the scheduler backend. | |||||
If relevant, this method can use :meth`get_state_from_scheduler` to merge the | |||||
current lister state with the one from the scheduler backend, reducing the risk | |||||
of race conditions if we're running concurrent listings. | |||||
This method is called in a `finally` block, which means it will also run when | |||||
the lister fails. | |||||
""" | |||||
pass | |||||
# Actual listing logic | |||||
def get_pages(self) -> Iterator[PageType]: | |||||
"""Retrieve a list of pages of listed results. This is the main loop of the lister. | |||||
Returns: | |||||
an iterator of raw pages fetched from the platform currently being listed. | |||||
""" | |||||
raise NotImplementedError | |||||
def get_origins_from_page(self, page: PageType) -> Iterator[model.ListedOrigin]: | |||||
"""Extract a list of :class:`model.ListedOrigin` from a raw page of results. | |||||
Args: | |||||
page: a single page of results | |||||
Returns: | |||||
an iterator for the origins present on the given page of results | |||||
""" | |||||
raise NotImplementedError | |||||
def commit_page(self, page: PageType) -> None: | |||||
"""Custom hook called after the current page has been committed in the scheduler | |||||
backend. | |||||
This method can be used to update the state after a page of origins has been | |||||
successfully recorded in the scheduler backend. If the new state should be | |||||
recorded at the point the lister completes, the :attr:`state_updated` attribute | |||||
must be set. | |||||
""" | |||||
pass | |||||
def send_origins(self, origins: Iterable[model.ListedOrigin]) -> int: | |||||
"""Record a list of :class:`model.ListedOrigin` in the scheduler. | |||||
Returns: | |||||
the number of listed origins recorded in the scheduler | |||||
""" | |||||
ret = self.scheduler.record_listed_origins(origins) | |||||
return len(ret) |
I would go for TypeError instead