diff --git a/swh/lister/cgit/lister.py b/swh/lister/cgit/lister.py index 9a19cd5..c513884 100644 --- a/swh/lister/cgit/lister.py +++ b/swh/lister/cgit/lister.py @@ -1,213 +1,209 @@ # Copyright (C) 2019-2021 The Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import logging import re from typing import Any, Dict, Iterator, List, Optional from urllib.parse import urljoin, urlparse from bs4 import BeautifulSoup import requests from requests.exceptions import HTTPError from swh.lister import USER_AGENT from swh.lister.pattern import CredentialsType, StatelessLister from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin logger = logging.getLogger(__name__) Repositories = List[Dict[str, Any]] class CGitLister(StatelessLister[Repositories]): """Lister class for CGit repositories. This lister will retrieve the list of published git repositories by parsing the HTML page(s) of the index retrieved at `url`. The lister currently defines 2 listing behaviors: - If the `base_git_url` is provided, the listed origin urls are computed out of the base git url link and the one listed in the main listed page (resulting in less HTTP queries than the 2nd behavior below). This is expected to be the main deployed behavior. - Otherwise (with no `base_git_url`), for each found git repository listed, one extra HTTP query is made at the given url found in the main listing page to gather published "Clone" URLs to be used as origin URL for that git repo. If several "Clone" urls are provided, prefer the http/https one, if any, otherwise fallback to the first one. """ LISTER_NAME = "cgit" def __init__( self, scheduler: SchedulerInterface, url: str, instance: Optional[str] = None, credentials: Optional[CredentialsType] = None, base_git_url: Optional[str] = None, ): """Lister class for CGit repositories. Args: url: main URL of the CGit instance, i.e. url of the index of published git repositories on this instance. - instance: Name of cgit instance. Defaults to url's hostname + instance: Name of cgit instance. Defaults to url's network location if unset. base_git_url: Optional base git url which allows the origin url computations. """ - if not instance: - instance = urlparse(url).hostname - assert instance is not None # Make mypy happy - super().__init__( scheduler=scheduler, url=url, instance=instance, credentials=credentials, ) self.session = requests.Session() self.session.headers.update( {"Accept": "application/html", "User-Agent": USER_AGENT} ) self.base_git_url = base_git_url def _get_and_parse(self, url: str) -> BeautifulSoup: """Get the given url and parse the retrieved HTML using BeautifulSoup""" response = self.session.get(url) response.raise_for_status() return BeautifulSoup(response.text, features="html.parser") def get_pages(self) -> Iterator[Repositories]: """Generate git 'project' URLs found on the current CGit server The last_update date is retrieved on the list of repo page to avoid to compute it on the repository details which only give a date per branch """ next_page: Optional[str] = self.url while next_page: bs_idx = self._get_and_parse(next_page) page_results = [] for tr in bs_idx.find("div", {"class": "content"}).find_all( "tr", {"class": ""} ): repository_link = tr.find("a")["href"] repo_url = None git_url = None base_url = urljoin(self.url, repository_link).strip("/") if self.base_git_url: # mapping provided # computing git url git_url = base_url.replace(self.url, self.base_git_url) else: # we compute the git detailed page url from which we will retrieve # the git url (cf. self.get_origins_from_page) repo_url = base_url span = tr.find("span", {"class": re.compile("age-")}) last_updated_date = span.get("title") if span else None page_results.append( { "url": repo_url, "git_url": git_url, "last_updated_date": last_updated_date, } ) yield page_results try: pager = bs_idx.find("ul", {"class": "pager"}) current_page = pager.find("a", {"class": "current"}) if current_page: next_page = current_page.parent.next_sibling.a["href"] next_page = urljoin(self.url, next_page) except (AttributeError, KeyError): # no pager, or no next page next_page = None def get_origins_from_page( self, repositories: Repositories ) -> Iterator[ListedOrigin]: """Convert a page of cgit repositories into a list of ListedOrigins.""" assert self.lister_obj.id is not None for repo in repositories: origin_url = repo["git_url"] or self._get_origin_from_repository_url( repo["url"] ) if origin_url is None: continue yield ListedOrigin( lister_id=self.lister_obj.id, url=origin_url, visit_type="git", last_update=_parse_last_updated_date(repo), ) def _get_origin_from_repository_url(self, repository_url: str) -> Optional[str]: """Extract the git url from the repository page""" try: bs = self._get_and_parse(repository_url) except HTTPError as e: logger.warning( "Unexpected HTTP status code %s on %s", e.response.status_code, e.response.url, ) return None # origin urls are listed on the repository page # TODO check if forcing https is better or not ? # # # urls = [x["href"] for x in bs.find_all("a", {"rel": "vcs-git"})] if not urls: return None # look for the http/https url, if any, and use it as origin_url for url in urls: if urlparse(url).scheme in ("http", "https"): origin_url = url break else: # otherwise, choose the first one origin_url = urls[0] return origin_url def _parse_last_updated_date(repository: Dict[str, Any]) -> Optional[datetime]: """Parse the last updated date""" date = repository.get("last_updated_date") if not date: return None parsed_date = None for date_format in ("%Y-%m-%d %H:%M:%S %z", "%Y-%m-%d %H:%M:%S (%Z)"): try: parsed_date = datetime.strptime(date, date_format) # force UTC to avoid naive datetime if not parsed_date.tzinfo: parsed_date = parsed_date.replace(tzinfo=timezone.utc) break except Exception: pass if not parsed_date: logger.warning( "Could not parse %s last_updated date: %s", repository["url"], date, ) return parsed_date diff --git a/swh/lister/gitea/lister.py b/swh/lister/gitea/lister.py index ebb7b6e..19ca4aa 100644 --- a/swh/lister/gitea/lister.py +++ b/swh/lister/gitea/lister.py @@ -1,142 +1,138 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import random from typing import Any, Dict, Iterator, List, Optional from urllib.parse import urljoin import iso8601 import requests from tenacity.before_sleep import before_sleep_log -from urllib3.util import parse_url from swh.lister.utils import throttling_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin from .. import USER_AGENT from ..pattern import CredentialsType, StatelessLister logger = logging.getLogger(__name__) RepoListPage = List[Dict[str, Any]] class GiteaLister(StatelessLister[RepoListPage]): """List origins from Gitea. Gitea API documentation: https://try.gitea.io/api/swagger The API does pagination and provides navigation URLs through the 'Link' header. The default value for page size is the maximum value observed on the instances accessible at https://try.gitea.io/api/v1/ and https://codeberg.org/api/v1/.""" LISTER_NAME = "gitea" REPO_LIST_PATH = "repos/search" def __init__( self, scheduler: SchedulerInterface, url: str, instance: Optional[str] = None, api_token: Optional[str] = None, page_size: int = 50, credentials: CredentialsType = None, ): - if instance is None: - instance = parse_url(url).host - super().__init__( scheduler=scheduler, credentials=credentials, url=url, instance=instance, ) self.query_params = { "sort": "id", "order": "asc", "limit": page_size, "page": 1, } self.session = requests.Session() self.session.headers.update( {"Accept": "application/json", "User-Agent": USER_AGENT,} ) if api_token is None: if len(self.credentials) > 0: cred = random.choice(self.credentials) username = cred.get("username") api_token = cred["password"] logger.warning( "Using authentication token from user %s", username or "???" ) else: logger.warning( "No authentication token set in configuration, using anonymous mode" ) if api_token: self.session.headers["Authorization"] = "Token %s" % api_token @throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) def page_request(self, url: str, params: Dict[str, Any]) -> requests.Response: logger.info("Fetching URL %s with params %s", url, params) response = self.session.get(url, params=params) if response.status_code != 200: logger.warning( "Unexpected HTTP status code %s on %s: %s", response.status_code, response.url, response.content, ) response.raise_for_status() return response @classmethod def results_simplified(cls, body: Dict[str, RepoListPage]) -> RepoListPage: fields_filter = ["id", "clone_url", "updated_at"] return [{k: r[k] for k in fields_filter} for r in body["data"]] def get_pages(self) -> Iterator[RepoListPage]: # base with trailing slash, path without leading slash for urljoin url: str = urljoin(self.url, self.REPO_LIST_PATH) response = self.page_request(url, self.query_params) while True: page_results = self.results_simplified(response.json()) yield page_results assert len(response.links) > 0, "API changed: no Link header found" if "next" in response.links: url = response.links["next"]["url"] else: # last page break response = self.page_request(url, {}) def get_origins_from_page(self, page: RepoListPage) -> Iterator[ListedOrigin]: """Convert a page of Gitea repositories into a list of ListedOrigins. """ assert self.lister_obj.id is not None for repo in page: last_update = iso8601.parse_date(repo["updated_at"]) yield ListedOrigin( lister_id=self.lister_obj.id, url=repo["clone_url"], visit_type="git", last_update=last_update, ) diff --git a/swh/lister/gitlab/lister.py b/swh/lister/gitlab/lister.py index 2528e25..7adf73b 100644 --- a/swh/lister/gitlab/lister.py +++ b/swh/lister/gitlab/lister.py @@ -1,239 +1,237 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import asdict, dataclass import logging import random from typing import Any, Dict, Iterator, Optional, Tuple from urllib.parse import parse_qs, urlencode, urlparse import iso8601 import requests from requests.exceptions import HTTPError from requests.status_codes import codes from tenacity.before_sleep import before_sleep_log -from urllib3.util import parse_url from swh.lister import USER_AGENT from swh.lister.pattern import CredentialsType, Lister from swh.lister.utils import retry_attempt, throttling_retry from swh.scheduler.model import ListedOrigin logger = logging.getLogger(__name__) @dataclass class GitLabListerState: """State of the GitLabLister""" last_seen_next_link: Optional[str] = None """Last link header (not visited yet) during an incremental pass """ Repository = Dict[str, Any] @dataclass class PageResult: """Result from a query to a gitlab project api page.""" repositories: Optional[Tuple[Repository, ...]] = None next_page: Optional[str] = None def _if_rate_limited(retry_state) -> bool: """Custom tenacity retry predicate for handling HTTP responses with status code 403 with specific ratelimit header. """ attempt = retry_attempt(retry_state) if attempt.failed: exc = attempt.exception() return ( isinstance(exc, HTTPError) and exc.response.status_code == codes.forbidden and int(exc.response.headers.get("RateLimit-Remaining", "0")) == 0 ) return False def _parse_id_after(url: Optional[str]) -> Optional[int]: """Given an url, extract a return the 'id_after' query parameter associated value or None. This is the the repository id used for pagination purposes. """ if not url: return None # link: https://${project-api}/?...&id_after=2x... query_data = parse_qs(urlparse(url).query) page = query_data.get("id_after") if page and len(page) > 0: return int(page[0]) return None class GitLabLister(Lister[GitLabListerState, PageResult]): """List origins for a gitlab instance. By default, the lister runs in incremental mode: it lists all repositories, starting with the `last_seen_next_link` stored in the scheduler backend. Args: scheduler: a scheduler instance url: the api v4 url of the gitlab instance to visit (e.g. https://gitlab.com/api/v4/) - instance: a specific instance name (e.g. gitlab, tor, git-kernel, ...) + instance: a specific instance name (e.g. gitlab, tor, git-kernel, ...), + url network location will be used if not provided incremental: defines if incremental listing is activated or not """ LISTER_NAME = "gitlab" def __init__( self, scheduler, url: str, instance: Optional[str] = None, credentials: Optional[CredentialsType] = None, incremental: bool = False, ): - if instance is None: - instance = parse_url(url).host super().__init__( scheduler=scheduler, url=url.rstrip("/"), instance=instance, credentials=credentials, ) self.incremental = incremental self.last_page: Optional[str] = None self.session = requests.Session() self.session.headers.update( {"Accept": "application/json", "User-Agent": USER_AGENT} ) if len(self.credentials) > 0: cred = random.choice(self.credentials) logger.info( "Using %s credentials from user %s", self.instance, cred["username"] ) api_token = cred["password"] if api_token: self.session.headers["Authorization"] = f"Bearer {api_token}" def state_from_dict(self, d: Dict[str, Any]) -> GitLabListerState: return GitLabListerState(**d) def state_to_dict(self, state: GitLabListerState) -> Dict[str, Any]: return asdict(state) @throttling_retry( retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING) ) def get_page_result(self, url: str) -> PageResult: logger.debug("Fetching URL %s", url) response = self.session.get(url) if response.status_code != 200: logger.warning( "Unexpected HTTP status code %s on %s: %s", response.status_code, response.url, response.content, ) response.raise_for_status() repositories: Tuple[Repository, ...] = tuple(response.json()) if hasattr(response, "links") and response.links.get("next"): next_page = response.links["next"]["url"] else: next_page = None return PageResult(repositories, next_page) def page_url(self, id_after: Optional[int] = None) -> str: parameters = { "pagination": "keyset", "order_by": "id", "sort": "asc", } if id_after is not None: parameters["id_after"] = str(id_after) return f"{self.url}/projects?{urlencode(parameters)}" def get_pages(self) -> Iterator[PageResult]: next_page: Optional[str] if self.incremental and self.state and self.state.last_seen_next_link: next_page = self.state.last_seen_next_link else: next_page = self.page_url() while next_page: self.last_page = next_page page_result = self.get_page_result(next_page) yield page_result next_page = page_result.next_page def get_origins_from_page(self, page_result: PageResult) -> Iterator[ListedOrigin]: assert self.lister_obj.id is not None repositories = page_result.repositories if page_result.repositories else [] for repo in repositories: yield ListedOrigin( lister_id=self.lister_obj.id, url=repo["http_url_to_repo"], visit_type="git", last_update=iso8601.parse_date(repo["last_activity_at"]), ) def commit_page(self, page_result: PageResult) -> None: """Update currently stored state using the latest listed "next" page if relevant. Relevancy is determined by the next_page link whose 'page' id must be strictly superior to the currently stored one. Note: this is a noop for full listing mode """ if self.incremental: # link: https://${project-api}/?...&page=2x... next_page = page_result.next_page if not next_page and self.last_page: next_page = self.last_page if next_page: id_after = _parse_id_after(next_page) previous_next_page = self.state.last_seen_next_link previous_id_after = _parse_id_after(previous_next_page) if previous_next_page is None or ( previous_id_after and id_after and previous_id_after < id_after ): self.state.last_seen_next_link = next_page def finalize(self) -> None: """finalize the lister state when relevant (see `fn:commit_page` for details) Note: this is a noop for full listing mode """ next_page = self.state.last_seen_next_link if self.incremental and next_page: # link: https://${project-api}/?...&page=2x... next_id_after = _parse_id_after(next_page) scheduler_state = self.get_state_from_scheduler() previous_next_id_after = _parse_id_after( scheduler_state.last_seen_next_link ) if (not previous_next_id_after and next_id_after) or ( previous_next_id_after and next_id_after and previous_next_id_after < next_id_after ): self.updated = True diff --git a/swh/lister/pattern.py b/swh/lister/pattern.py index 6f077e4..63a2fff 100644 --- a/swh/lister/pattern.py +++ b/swh/lister/pattern.py @@ -1,278 +1,284 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from __future__ import annotations + from dataclasses import dataclass from typing import Any, Dict, Generic, Iterable, Iterator, List, Optional, TypeVar +from urllib.parse import urlparse from swh.core.config import load_from_envvar from swh.core.utils import grouper from swh.scheduler import get_scheduler, model from swh.scheduler.interface import SchedulerInterface @dataclass class ListerStats: pages: int = 0 origins: int = 0 - def __add__(self, other: "ListerStats") -> "ListerStats": + def __add__(self, other: ListerStats) -> ListerStats: return self.__class__(self.pages + other.pages, self.origins + other.origins) - def __iadd__(self, other: "ListerStats"): + def __iadd__(self, other: ListerStats): self.pages += other.pages self.origins += other.origins def dict(self) -> Dict[str, int]: return {"pages": self.pages, "origins": self.origins} StateType = TypeVar("StateType") PageType = TypeVar("PageType") BackendStateType = Dict[str, Any] CredentialsType = Optional[Dict[str, Dict[str, List[Dict[str, str]]]]] class Lister(Generic[StateType, PageType]): """The base class for a Software Heritage lister. A lister scrapes a page by page list of origins from an upstream (a forge, the API of a package manager, ...), and massages the results of that scrape into a list of origins that are recorded by the scheduler backend. The main loop of the lister, :meth:`run`, basically revolves around the :meth:`get_pages` iterator, which sets up the lister state, then yields the scrape results page by page. The :meth:`get_origins_from_page` method converts the pages into a list of :class:`model.ListedOrigin`, sent to the scheduler at every page. The :meth:`commit_page` method can be used to update the lister state after a page of origins has been recorded in the scheduler backend. The :func:`finalize` method is called at lister teardown (whether the run has been successful or not) to update the local :attr:`state` object before it's sent to the database. This method must set the :attr:`updated` attribute if an updated state needs to be sent to the scheduler backend. This method can call :func:`get_state_from_scheduler` to refresh and merge the lister state from the scheduler before it's finalized (and potentially minimize the risk of race conditions between concurrent runs of the lister). The state of the lister is serialized and deserialized from the dict stored in the scheduler backend, using the :meth:`state_from_dict` and :meth:`state_to_dict` methods. Args: scheduler: the instance of the Scheduler being used to register the origins listed by this lister url: a URL representing this lister, e.g. the API's base URL - instance: the instance name used, in conjunction with :attr:`LISTER_NAME`, to - uniquely identify this lister instance. + instance: the instance name, to uniquely identify this lister instance, + if not provided the URL network location will be used credentials: dictionary of credentials for all listers. The first level identifies the :attr:`LISTER_NAME`, the second level the lister :attr:`instance`. The final level is a list of dicts containing the expected credentials for the given instance of that lister. Generic types: - *StateType*: concrete lister type; should usually be a :class:`dataclass` for stricter typing - *PageType*: type of scrape results; can usually be a :class:`requests.Response`, or a :class:`dict` """ LISTER_NAME: str = "" def __init__( self, scheduler: SchedulerInterface, url: str, - instance: str, + instance: Optional[str] = None, credentials: CredentialsType = None, ): if not self.LISTER_NAME: raise ValueError("Must set the LISTER_NAME attribute on Lister classes") self.url = url - self.instance = instance + if instance is not None: + self.instance = instance + else: + self.instance = urlparse(url).netloc self.scheduler = scheduler if not credentials: credentials = {} self.credentials = list( credentials.get(self.LISTER_NAME, {}).get(self.instance, []) ) # store the initial state of the lister self.state = self.get_state_from_scheduler() self.updated = False def run(self) -> ListerStats: """Run the lister. Returns: A counter with the number of pages and origins seen for this run of the lister. """ full_stats = ListerStats() try: for page in self.get_pages(): full_stats.pages += 1 origins = self.get_origins_from_page(page) full_stats.origins += self.send_origins(origins) self.commit_page(page) finally: self.finalize() if self.updated: self.set_state_in_scheduler() return full_stats def get_state_from_scheduler(self) -> StateType: """Update the state in the current instance from the state in the scheduler backend. This updates :attr:`lister_obj`, and returns its (deserialized) current state, to allow for comparison with the local state. Returns: the state retrieved from the scheduler backend """ self.lister_obj = self.scheduler.get_or_create_lister( name=self.LISTER_NAME, instance_name=self.instance ) return self.state_from_dict(self.lister_obj.current_state) def set_state_in_scheduler(self) -> None: """Update the state in the scheduler backend from the state of the current instance. Raises: swh.scheduler.exc.StaleData: in case of a race condition between concurrent listers (from :meth:`swh.scheduler.Scheduler.update_lister`). """ self.lister_obj.current_state = self.state_to_dict(self.state) self.lister_obj = self.scheduler.update_lister(self.lister_obj) # State management to/from the scheduler def state_from_dict(self, d: BackendStateType) -> StateType: """Convert the state stored in the scheduler backend (as a dict), to the concrete StateType for this lister.""" raise NotImplementedError def state_to_dict(self, state: StateType) -> BackendStateType: """Convert the StateType for this lister to its serialization as dict for storage in the scheduler. Values must be JSON-compatible as that's what the backend database expects. """ raise NotImplementedError def finalize(self) -> None: """Custom hook to finalize the lister state before returning from the main loop. This method must set :attr:`updated` if the lister has done some work. If relevant, this method can use :meth`get_state_from_scheduler` to merge the current lister state with the one from the scheduler backend, reducing the risk of race conditions if we're running concurrent listings. This method is called in a `finally` block, which means it will also run when the lister fails. """ pass # Actual listing logic def get_pages(self) -> Iterator[PageType]: """Retrieve a list of pages of listed results. This is the main loop of the lister. Returns: an iterator of raw pages fetched from the platform currently being listed. """ raise NotImplementedError def get_origins_from_page(self, page: PageType) -> Iterator[model.ListedOrigin]: """Extract a list of :class:`model.ListedOrigin` from a raw page of results. Args: page: a single page of results Returns: an iterator for the origins present on the given page of results """ raise NotImplementedError def commit_page(self, page: PageType) -> None: """Custom hook called after the current page has been committed in the scheduler backend. This method can be used to update the state after a page of origins has been successfully recorded in the scheduler backend. If the new state should be recorded at the point the lister completes, the :attr:`updated` attribute must be set. """ pass def send_origins(self, origins: Iterable[model.ListedOrigin]) -> int: """Record a list of :class:`model.ListedOrigin` in the scheduler. Returns: the number of listed origins recorded in the scheduler """ count = 0 for batch_origins in grouper(origins, n=1000): ret = self.scheduler.record_listed_origins(batch_origins) count += len(ret) return count @classmethod def from_config(cls, scheduler: Dict[str, Any], **config: Any): """Instantiate a lister from a configuration dict. This is basically a backwards-compatibility shim for the CLI. Args: scheduler: instantiation config for the scheduler config: the configuration dict for the lister, with the following keys: - credentials (optional): credentials list for the scheduler - any other kwargs passed to the lister. Returns: the instantiated lister """ # Drop the legacy config keys which aren't used for this generation of listers. for legacy_key in ("storage", "lister", "celery"): config.pop(legacy_key, None) # Instantiate the scheduler scheduler_instance = get_scheduler(**scheduler) return cls(scheduler=scheduler_instance, **config) @classmethod def from_configfile(cls, **kwargs: Any): """Instantiate a lister from the configuration loaded from the SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their value is not None. Args: kwargs: kwargs passed to the lister instantiation """ config = dict(load_from_envvar()) config.update({k: v for k, v in kwargs.items() if v is not None}) return cls.from_config(**config) class StatelessLister(Lister[None, PageType], Generic[PageType]): def state_from_dict(self, d: BackendStateType) -> None: """Always return empty state""" return None def state_to_dict(self, state: None) -> BackendStateType: """Always set empty state""" return {} diff --git a/swh/lister/phabricator/lister.py b/swh/lister/phabricator/lister.py index 193239c..1dbee37 100644 --- a/swh/lister/phabricator/lister.py +++ b/swh/lister/phabricator/lister.py @@ -1,182 +1,183 @@ -# Copyright (C) 2019-2020 the Software Heritage developers +# Copyright (C) 2019-2021 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging import random from typing import Any, Dict, Iterator, List, Optional from urllib.parse import urljoin import requests from swh.lister import USER_AGENT from swh.lister.pattern import CredentialsType, StatelessLister from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin logger = logging.getLogger(__name__) PageType = List[Dict[str, Any]] class PhabricatorLister(StatelessLister[PageType]): """ List all repositories hosted on a Phabricator instance. Args: url: base URL of a phabricator forge (for instance https://forge.softwareheritage.org) - instance: string identifier for the listed forge + instance: string identifier for the listed forge, + URL network location will be used if not provided api_token: authentication token for Conduit API """ LISTER_NAME = "phabricator" API_REPOSITORY_PATH = "/api/diffusion.repository.search" def __init__( self, scheduler: SchedulerInterface, url: str, - instance: str, + instance: Optional[str] = None, api_token: Optional[str] = None, credentials: CredentialsType = None, ): super().__init__( scheduler, urljoin(url, self.API_REPOSITORY_PATH), instance, credentials ) self.session = requests.Session() self.session.headers.update( {"Accept": "application/json", "User-Agent": USER_AGENT} ) if api_token is not None: self.api_token = api_token else: if not self.credentials: raise ValueError( f"No credentials found for phabricator instance {self.instance};" " Please set them in the lister configuration file." ) self.api_token = random.choice(self.credentials)["password"] def get_request_params(self, after: Optional[str]) -> Dict[str, str]: """Get the query parameters for the request.""" base_params = { # Stable order "order": "oldest", # Add all URIs to the response "attachments[uris]": "1", # API token from stored credentials "api.token": self.api_token, } if after is not None: base_params["after"] = after return base_params @staticmethod def filter_params(params: Dict[str, str]) -> Dict[str, str]: """Filter the parameters for debug purposes""" return { k: (v if k != "api.token" else "**redacted**") for k, v in params.items() } def get_pages(self) -> Iterator[PageType]: after: Optional[str] = None while True: params = self.get_request_params(after) logger.debug( "Retrieving results on URI %s with parameters %s", self.url, self.filter_params(params), ) response = self.session.post(self.url, data=params) if response.status_code != 200: logger.warning( "Unexpected HTTP status code %s on %s: %s", response.status_code, response.url, response.content, ) response.raise_for_status() response_data = response.json() if response_data.get("result") is None: logger.warning( "Got unexpected response on %s: %s", response.url, response_data, ) break result = response_data["result"] yield result["data"] after = None if "cursor" in result and "after" in result["cursor"]: after = result["cursor"]["after"] if not after: logger.debug("Empty `after` cursor. All done") break def get_origins_from_page(self, page: PageType) -> Iterator[ListedOrigin]: assert self.lister_obj.id is not None for repo in page: url = get_repo_url(repo["attachments"]["uris"]["uris"]) if url is None: short_name: Optional[str] = None for field in "shortName", "name", "callsign": short_name = repo["fields"].get(field) if short_name: break logger.warning( "No valid url for repository [%s] (phid=%s)", short_name or repo["phid"], repo["phid"], ) continue yield ListedOrigin( lister_id=self.lister_obj.id, url=url, visit_type=repo["fields"]["vcs"], # The "dateUpdated" field returned by the Phabricator API only refers to # the repository metadata; We can't use it for our purposes. last_update=None, ) def get_repo_url(attachments: List[Dict[str, Any]]) -> Optional[str]: """ Return url for a hosted repository from its uris attachments according to the following priority lists: * protocol: https > http * identifier: shortname > callsign > id """ processed_urls = defaultdict(dict) # type: Dict[str, Any] for uri in attachments: protocol = uri["fields"]["builtin"]["protocol"] url = uri["fields"]["uri"]["effective"] identifier = uri["fields"]["builtin"]["identifier"] if protocol in ("http", "https"): processed_urls[protocol][identifier] = url elif protocol is None: for protocol in ("https", "http"): if url.startswith(protocol): processed_urls[protocol]["undefined"] = url break for protocol in ["https", "http"]: for identifier in ["shortname", "callsign", "id", "undefined"]: if protocol in processed_urls and identifier in processed_urls[protocol]: return processed_urls[protocol][identifier] return None diff --git a/swh/lister/tests/test_pattern.py b/swh/lister/tests/test_pattern.py index 2ff15e9..795b715 100644 --- a/swh/lister/tests/test_pattern.py +++ b/swh/lister/tests/test_pattern.py @@ -1,186 +1,198 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import TYPE_CHECKING, Any, Dict, Iterator, List import pytest from swh.lister import pattern from swh.scheduler.model import ListedOrigin StateType = Dict[str, str] OriginType = Dict[str, str] PageType = List[OriginType] class InstantiableLister(pattern.Lister[StateType, PageType]): """A lister that can only be instantiated, not run.""" LISTER_NAME = "test-pattern-lister" def state_from_dict(self, d: Dict[str, str]) -> StateType: return d def test_instantiation(swh_scheduler): lister = InstantiableLister( scheduler=swh_scheduler, url="https://example.com", instance="example.com" ) # check the lister was registered in the scheduler backend stored_lister = swh_scheduler.get_or_create_lister( name="test-pattern-lister", instance_name="example.com" ) assert stored_lister == lister.lister_obj with pytest.raises(NotImplementedError): lister.run() +def test_lister_instance_name(swh_scheduler): + lister = InstantiableLister( + scheduler=swh_scheduler, url="https://example.org", instance="example" + ) + + assert lister.instance == "example" + + lister = InstantiableLister(scheduler=swh_scheduler, url="https://example.org") + + assert lister.instance == "example.org" + + def test_instantiation_from_configfile(swh_scheduler, mocker): mock_load_from_envvar = mocker.patch("swh.lister.pattern.load_from_envvar") mock_get_scheduler = mocker.patch("swh.lister.pattern.get_scheduler") mock_load_from_envvar.return_value = { "scheduler": {}, "url": "foo", "instance": "bar", } mock_get_scheduler.return_value = swh_scheduler lister = InstantiableLister.from_configfile() assert lister.url == "foo" assert lister.instance == "bar" lister = InstantiableLister.from_configfile(url="bar", instance="foo") assert lister.url == "bar" assert lister.instance == "foo" lister = InstantiableLister.from_configfile(url=None, instance="foo") assert lister.url == "foo" assert lister.instance == "foo" if TYPE_CHECKING: _Base = pattern.Lister[Any, PageType] else: _Base = object class ListerMixin(_Base): def get_pages(self) -> Iterator[PageType]: for pageno in range(2): yield [ {"url": f"https://example.com/{pageno:02d}{i:03d}"} for i in range(10) ] def get_origins_from_page(self, page: PageType) -> Iterator[ListedOrigin]: assert self.lister_obj.id is not None for origin in page: yield ListedOrigin( lister_id=self.lister_obj.id, url=origin["url"], visit_type="git" ) def check_listed_origins(swh_scheduler, lister, stored_lister): """Check that the listed origins match the ones in the lister""" # Gather the origins that are supposed to be listed lister_urls = sorted( sum([[o["url"] for o in page] for page in lister.get_pages()], []) ) # And check the state of origins in the scheduler ret = swh_scheduler.get_listed_origins() assert ret.next_page_token is None assert len(ret.results) == len(lister_urls) for origin, expected_url in zip(ret.results, lister_urls): assert origin.url == expected_url assert origin.lister_id == stored_lister.id class RunnableLister(ListerMixin, InstantiableLister): """A lister that can be run.""" def state_to_dict(self, state: StateType) -> Dict[str, str]: return state def finalize(self) -> None: self.state["updated"] = "yes" self.updated = True def test_run(swh_scheduler): lister = RunnableLister( scheduler=swh_scheduler, url="https://example.com", instance="example.com" ) assert "updated" not in lister.state update_date = lister.lister_obj.updated run_result = lister.run() assert run_result.pages == 2 assert run_result.origins == 20 stored_lister = swh_scheduler.get_or_create_lister( name="test-pattern-lister", instance_name="example.com" ) # Check that the finalize operation happened assert stored_lister.updated > update_date assert stored_lister.current_state["updated"] == "yes" check_listed_origins(swh_scheduler, lister, stored_lister) class InstantiableStatelessLister(pattern.StatelessLister[PageType]): LISTER_NAME = "test-stateless-lister" def test_stateless_instantiation(swh_scheduler): lister = InstantiableStatelessLister( scheduler=swh_scheduler, url="https://example.com", instance="example.com", ) # check the lister was registered in the scheduler backend stored_lister = swh_scheduler.get_or_create_lister( name="test-stateless-lister", instance_name="example.com" ) assert stored_lister == lister.lister_obj assert stored_lister.current_state == {} assert lister.state is None with pytest.raises(NotImplementedError): lister.run() class RunnableStatelessLister(ListerMixin, InstantiableStatelessLister): def finalize(self): self.updated = True def test_stateless_run(swh_scheduler): lister = RunnableStatelessLister( scheduler=swh_scheduler, url="https://example.com", instance="example.com" ) update_date = lister.lister_obj.updated run_result = lister.run() assert run_result.pages == 2 assert run_result.origins == 20 stored_lister = swh_scheduler.get_or_create_lister( name="test-stateless-lister", instance_name="example.com" ) # Check that the finalize operation happened assert stored_lister.updated > update_date assert stored_lister.current_state == {} # And that all origins are stored check_listed_origins(swh_scheduler, lister, stored_lister) diff --git a/swh/lister/tuleap/lister.py b/swh/lister/tuleap/lister.py index 6145359..b630508 100644 --- a/swh/lister/tuleap/lister.py +++ b/swh/lister/tuleap/lister.py @@ -1,150 +1,146 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from typing import Any, Dict, Iterator, List, Optional from urllib.parse import urljoin import iso8601 import requests from tenacity.before_sleep import before_sleep_log -from urllib3.util import parse_url from swh.lister.utils import throttling_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin from .. import USER_AGENT from ..pattern import CredentialsType, StatelessLister logger = logging.getLogger(__name__) RepoPage = Dict[str, Any] class TuleapLister(StatelessLister[RepoPage]): """List origins from Tuleap. Tuleap provides SVN and Git repositories hosting. Tuleap API getting started: https://tuleap.net/doc/en/user-guide/integration/rest.html Tuleap API reference: https://tuleap.net/api/explorer/ Using the API we first request a list of projects, and from there request their associated repositories individually. Everything is paginated, code uses throttling at the individual GET call level.""" LISTER_NAME = "tuleap" REPO_LIST_PATH = "/api" REPO_GIT_PATH = "plugins/git/" REPO_SVN_PATH = "plugins/svn/" def __init__( self, scheduler: SchedulerInterface, url: str, instance: Optional[str] = None, credentials: CredentialsType = None, ): - if instance is None: - instance = parse_url(url).host - super().__init__( scheduler=scheduler, credentials=credentials, url=url, instance=instance, ) self.session = requests.Session() self.session.headers.update( {"Accept": "application/json", "User-Agent": USER_AGENT,} ) @throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) def page_request(self, url: str, params: Dict[str, Any]) -> requests.Response: logger.info("Fetching URL %s with params %s", url, params) response = self.session.get(url, params=params) if response.status_code != 200: logger.warning( "Unexpected HTTP status code %s on %s: %s", response.status_code, response.url, response.content, ) response.raise_for_status() return response @classmethod def results_simplified(cls, url: str, repo_type: str, repo: RepoPage) -> RepoPage: if repo_type == "git": prefix_url = TuleapLister.REPO_GIT_PATH else: prefix_url = TuleapLister.REPO_SVN_PATH rep = { "project": repo["name"], "type": repo_type, "uri": urljoin(url, f"{prefix_url}{repo['path']}"), "last_update_date": repo["last_update_date"], } return rep def _get_repositories(self, url_repo) -> List[Dict[str, Any]]: ret = self.page_request(url_repo, {}) reps_list = ret.json()["repositories"] limit = int(ret.headers["X-PAGINATION-LIMIT-MAX"]) offset = int(ret.headers["X-PAGINATION-LIMIT"]) size = int(ret.headers["X-PAGINATION-SIZE"]) while offset < size: url_offset = url_repo + "?offset=" + str(offset) + "&limit=" + str(limit) ret = self.page_request(url_offset, {}).json() reps_list = reps_list + ret["repositories"] offset += limit return reps_list def get_pages(self) -> Iterator[RepoPage]: # base with trailing slash, path without leading slash for urljoin url_api: str = urljoin(self.url, self.REPO_LIST_PATH) url_projects = url_api + "/projects/" # Get the list of projects. response = self.page_request(url_projects, {}) projects_list = response.json() limit = int(response.headers["X-PAGINATION-LIMIT-MAX"]) offset = int(response.headers["X-PAGINATION-LIMIT"]) size = int(response.headers["X-PAGINATION-SIZE"]) while offset < size: url_offset = ( url_projects + "?offset=" + str(offset) + "&limit=" + str(limit) ) ret = self.page_request(url_offset, {}).json() projects_list = projects_list + ret offset += limit # Get list of repositories for each project. for p in projects_list: p_id = p["id"] # Fetch Git repositories for project url_git = url_projects + str(p_id) + "/git" repos = self._get_repositories(url_git) for repo in repos: yield self.results_simplified(url_api, "git", repo) def get_origins_from_page(self, page: RepoPage) -> Iterator[ListedOrigin]: """Convert a page of Tuleap repositories into a list of ListedOrigins. """ assert self.lister_obj.id is not None yield ListedOrigin( lister_id=self.lister_obj.id, url=page["uri"], visit_type=page["type"], last_update=iso8601.parse_date(page["last_update_date"]), )