diff --git a/swh/lister/cgit/lister.py b/swh/lister/cgit/lister.py
index 9a19cd5..c513884 100644
--- a/swh/lister/cgit/lister.py
+++ b/swh/lister/cgit/lister.py
@@ -1,213 +1,209 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
import logging
import re
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
import requests
from requests.exceptions import HTTPError
from swh.lister import USER_AGENT
from swh.lister.pattern import CredentialsType, StatelessLister
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
logger = logging.getLogger(__name__)
Repositories = List[Dict[str, Any]]
class CGitLister(StatelessLister[Repositories]):
"""Lister class for CGit repositories.
This lister will retrieve the list of published git repositories by
parsing the HTML page(s) of the index retrieved at `url`.
The lister currently defines 2 listing behaviors:
- If the `base_git_url` is provided, the listed origin urls are computed out of the
base git url link and the one listed in the main listed page (resulting in less
HTTP queries than the 2nd behavior below). This is expected to be the main
deployed behavior.
- Otherwise (with no `base_git_url`), for each found git repository listed, one
extra HTTP query is made at the given url found in the main listing page to gather
published "Clone" URLs to be used as origin URL for that git repo. If several
"Clone" urls are provided, prefer the http/https one, if any, otherwise fallback
to the first one.
"""
LISTER_NAME = "cgit"
def __init__(
self,
scheduler: SchedulerInterface,
url: str,
instance: Optional[str] = None,
credentials: Optional[CredentialsType] = None,
base_git_url: Optional[str] = None,
):
"""Lister class for CGit repositories.
Args:
url: main URL of the CGit instance, i.e. url of the index
of published git repositories on this instance.
- instance: Name of cgit instance. Defaults to url's hostname
+ instance: Name of cgit instance. Defaults to url's network location
if unset.
base_git_url: Optional base git url which allows the origin url
computations.
"""
- if not instance:
- instance = urlparse(url).hostname
- assert instance is not None # Make mypy happy
-
super().__init__(
scheduler=scheduler, url=url, instance=instance, credentials=credentials,
)
self.session = requests.Session()
self.session.headers.update(
{"Accept": "application/html", "User-Agent": USER_AGENT}
)
self.base_git_url = base_git_url
def _get_and_parse(self, url: str) -> BeautifulSoup:
"""Get the given url and parse the retrieved HTML using BeautifulSoup"""
response = self.session.get(url)
response.raise_for_status()
return BeautifulSoup(response.text, features="html.parser")
def get_pages(self) -> Iterator[Repositories]:
"""Generate git 'project' URLs found on the current CGit server
The last_update date is retrieved on the list of repo page to avoid
to compute it on the repository details which only give a date per branch
"""
next_page: Optional[str] = self.url
while next_page:
bs_idx = self._get_and_parse(next_page)
page_results = []
for tr in bs_idx.find("div", {"class": "content"}).find_all(
"tr", {"class": ""}
):
repository_link = tr.find("a")["href"]
repo_url = None
git_url = None
base_url = urljoin(self.url, repository_link).strip("/")
if self.base_git_url: # mapping provided
# computing git url
git_url = base_url.replace(self.url, self.base_git_url)
else:
# we compute the git detailed page url from which we will retrieve
# the git url (cf. self.get_origins_from_page)
repo_url = base_url
span = tr.find("span", {"class": re.compile("age-")})
last_updated_date = span.get("title") if span else None
page_results.append(
{
"url": repo_url,
"git_url": git_url,
"last_updated_date": last_updated_date,
}
)
yield page_results
try:
pager = bs_idx.find("ul", {"class": "pager"})
current_page = pager.find("a", {"class": "current"})
if current_page:
next_page = current_page.parent.next_sibling.a["href"]
next_page = urljoin(self.url, next_page)
except (AttributeError, KeyError):
# no pager, or no next page
next_page = None
def get_origins_from_page(
self, repositories: Repositories
) -> Iterator[ListedOrigin]:
"""Convert a page of cgit repositories into a list of ListedOrigins."""
assert self.lister_obj.id is not None
for repo in repositories:
origin_url = repo["git_url"] or self._get_origin_from_repository_url(
repo["url"]
)
if origin_url is None:
continue
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=origin_url,
visit_type="git",
last_update=_parse_last_updated_date(repo),
)
def _get_origin_from_repository_url(self, repository_url: str) -> Optional[str]:
"""Extract the git url from the repository page"""
try:
bs = self._get_and_parse(repository_url)
except HTTPError as e:
logger.warning(
"Unexpected HTTP status code %s on %s",
e.response.status_code,
e.response.url,
)
return None
# origin urls are listed on the repository page
# TODO check if forcing https is better or not ?
#
#
#
urls = [x["href"] for x in bs.find_all("a", {"rel": "vcs-git"})]
if not urls:
return None
# look for the http/https url, if any, and use it as origin_url
for url in urls:
if urlparse(url).scheme in ("http", "https"):
origin_url = url
break
else:
# otherwise, choose the first one
origin_url = urls[0]
return origin_url
def _parse_last_updated_date(repository: Dict[str, Any]) -> Optional[datetime]:
"""Parse the last updated date"""
date = repository.get("last_updated_date")
if not date:
return None
parsed_date = None
for date_format in ("%Y-%m-%d %H:%M:%S %z", "%Y-%m-%d %H:%M:%S (%Z)"):
try:
parsed_date = datetime.strptime(date, date_format)
# force UTC to avoid naive datetime
if not parsed_date.tzinfo:
parsed_date = parsed_date.replace(tzinfo=timezone.utc)
break
except Exception:
pass
if not parsed_date:
logger.warning(
"Could not parse %s last_updated date: %s", repository["url"], date,
)
return parsed_date
diff --git a/swh/lister/gitea/lister.py b/swh/lister/gitea/lister.py
index ebb7b6e..19ca4aa 100644
--- a/swh/lister/gitea/lister.py
+++ b/swh/lister/gitea/lister.py
@@ -1,142 +1,138 @@
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import random
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import urljoin
import iso8601
import requests
from tenacity.before_sleep import before_sleep_log
-from urllib3.util import parse_url
from swh.lister.utils import throttling_retry
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from .. import USER_AGENT
from ..pattern import CredentialsType, StatelessLister
logger = logging.getLogger(__name__)
RepoListPage = List[Dict[str, Any]]
class GiteaLister(StatelessLister[RepoListPage]):
"""List origins from Gitea.
Gitea API documentation: https://try.gitea.io/api/swagger
The API does pagination and provides navigation URLs through the 'Link' header.
The default value for page size is the maximum value observed on the instances
accessible at https://try.gitea.io/api/v1/ and https://codeberg.org/api/v1/."""
LISTER_NAME = "gitea"
REPO_LIST_PATH = "repos/search"
def __init__(
self,
scheduler: SchedulerInterface,
url: str,
instance: Optional[str] = None,
api_token: Optional[str] = None,
page_size: int = 50,
credentials: CredentialsType = None,
):
- if instance is None:
- instance = parse_url(url).host
-
super().__init__(
scheduler=scheduler, credentials=credentials, url=url, instance=instance,
)
self.query_params = {
"sort": "id",
"order": "asc",
"limit": page_size,
"page": 1,
}
self.session = requests.Session()
self.session.headers.update(
{"Accept": "application/json", "User-Agent": USER_AGENT,}
)
if api_token is None:
if len(self.credentials) > 0:
cred = random.choice(self.credentials)
username = cred.get("username")
api_token = cred["password"]
logger.warning(
"Using authentication token from user %s", username or "???"
)
else:
logger.warning(
"No authentication token set in configuration, using anonymous mode"
)
if api_token:
self.session.headers["Authorization"] = "Token %s" % api_token
@throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING))
def page_request(self, url: str, params: Dict[str, Any]) -> requests.Response:
logger.info("Fetching URL %s with params %s", url, params)
response = self.session.get(url, params=params)
if response.status_code != 200:
logger.warning(
"Unexpected HTTP status code %s on %s: %s",
response.status_code,
response.url,
response.content,
)
response.raise_for_status()
return response
@classmethod
def results_simplified(cls, body: Dict[str, RepoListPage]) -> RepoListPage:
fields_filter = ["id", "clone_url", "updated_at"]
return [{k: r[k] for k in fields_filter} for r in body["data"]]
def get_pages(self) -> Iterator[RepoListPage]:
# base with trailing slash, path without leading slash for urljoin
url: str = urljoin(self.url, self.REPO_LIST_PATH)
response = self.page_request(url, self.query_params)
while True:
page_results = self.results_simplified(response.json())
yield page_results
assert len(response.links) > 0, "API changed: no Link header found"
if "next" in response.links:
url = response.links["next"]["url"]
else:
# last page
break
response = self.page_request(url, {})
def get_origins_from_page(self, page: RepoListPage) -> Iterator[ListedOrigin]:
"""Convert a page of Gitea repositories into a list of ListedOrigins.
"""
assert self.lister_obj.id is not None
for repo in page:
last_update = iso8601.parse_date(repo["updated_at"])
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=repo["clone_url"],
visit_type="git",
last_update=last_update,
)
diff --git a/swh/lister/gitlab/lister.py b/swh/lister/gitlab/lister.py
index 2528e25..7adf73b 100644
--- a/swh/lister/gitlab/lister.py
+++ b/swh/lister/gitlab/lister.py
@@ -1,239 +1,237 @@
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import asdict, dataclass
import logging
import random
from typing import Any, Dict, Iterator, Optional, Tuple
from urllib.parse import parse_qs, urlencode, urlparse
import iso8601
import requests
from requests.exceptions import HTTPError
from requests.status_codes import codes
from tenacity.before_sleep import before_sleep_log
-from urllib3.util import parse_url
from swh.lister import USER_AGENT
from swh.lister.pattern import CredentialsType, Lister
from swh.lister.utils import retry_attempt, throttling_retry
from swh.scheduler.model import ListedOrigin
logger = logging.getLogger(__name__)
@dataclass
class GitLabListerState:
"""State of the GitLabLister"""
last_seen_next_link: Optional[str] = None
"""Last link header (not visited yet) during an incremental pass
"""
Repository = Dict[str, Any]
@dataclass
class PageResult:
"""Result from a query to a gitlab project api page."""
repositories: Optional[Tuple[Repository, ...]] = None
next_page: Optional[str] = None
def _if_rate_limited(retry_state) -> bool:
"""Custom tenacity retry predicate for handling HTTP responses with status code 403
with specific ratelimit header.
"""
attempt = retry_attempt(retry_state)
if attempt.failed:
exc = attempt.exception()
return (
isinstance(exc, HTTPError)
and exc.response.status_code == codes.forbidden
and int(exc.response.headers.get("RateLimit-Remaining", "0")) == 0
)
return False
def _parse_id_after(url: Optional[str]) -> Optional[int]:
"""Given an url, extract a return the 'id_after' query parameter associated value
or None.
This is the the repository id used for pagination purposes.
"""
if not url:
return None
# link: https://${project-api}/?...&id_after=2x...
query_data = parse_qs(urlparse(url).query)
page = query_data.get("id_after")
if page and len(page) > 0:
return int(page[0])
return None
class GitLabLister(Lister[GitLabListerState, PageResult]):
"""List origins for a gitlab instance.
By default, the lister runs in incremental mode: it lists all repositories,
starting with the `last_seen_next_link` stored in the scheduler backend.
Args:
scheduler: a scheduler instance
url: the api v4 url of the gitlab instance to visit (e.g.
https://gitlab.com/api/v4/)
- instance: a specific instance name (e.g. gitlab, tor, git-kernel, ...)
+ instance: a specific instance name (e.g. gitlab, tor, git-kernel, ...),
+ url network location will be used if not provided
incremental: defines if incremental listing is activated or not
"""
LISTER_NAME = "gitlab"
def __init__(
self,
scheduler,
url: str,
instance: Optional[str] = None,
credentials: Optional[CredentialsType] = None,
incremental: bool = False,
):
- if instance is None:
- instance = parse_url(url).host
super().__init__(
scheduler=scheduler,
url=url.rstrip("/"),
instance=instance,
credentials=credentials,
)
self.incremental = incremental
self.last_page: Optional[str] = None
self.session = requests.Session()
self.session.headers.update(
{"Accept": "application/json", "User-Agent": USER_AGENT}
)
if len(self.credentials) > 0:
cred = random.choice(self.credentials)
logger.info(
"Using %s credentials from user %s", self.instance, cred["username"]
)
api_token = cred["password"]
if api_token:
self.session.headers["Authorization"] = f"Bearer {api_token}"
def state_from_dict(self, d: Dict[str, Any]) -> GitLabListerState:
return GitLabListerState(**d)
def state_to_dict(self, state: GitLabListerState) -> Dict[str, Any]:
return asdict(state)
@throttling_retry(
retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING)
)
def get_page_result(self, url: str) -> PageResult:
logger.debug("Fetching URL %s", url)
response = self.session.get(url)
if response.status_code != 200:
logger.warning(
"Unexpected HTTP status code %s on %s: %s",
response.status_code,
response.url,
response.content,
)
response.raise_for_status()
repositories: Tuple[Repository, ...] = tuple(response.json())
if hasattr(response, "links") and response.links.get("next"):
next_page = response.links["next"]["url"]
else:
next_page = None
return PageResult(repositories, next_page)
def page_url(self, id_after: Optional[int] = None) -> str:
parameters = {
"pagination": "keyset",
"order_by": "id",
"sort": "asc",
}
if id_after is not None:
parameters["id_after"] = str(id_after)
return f"{self.url}/projects?{urlencode(parameters)}"
def get_pages(self) -> Iterator[PageResult]:
next_page: Optional[str]
if self.incremental and self.state and self.state.last_seen_next_link:
next_page = self.state.last_seen_next_link
else:
next_page = self.page_url()
while next_page:
self.last_page = next_page
page_result = self.get_page_result(next_page)
yield page_result
next_page = page_result.next_page
def get_origins_from_page(self, page_result: PageResult) -> Iterator[ListedOrigin]:
assert self.lister_obj.id is not None
repositories = page_result.repositories if page_result.repositories else []
for repo in repositories:
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=repo["http_url_to_repo"],
visit_type="git",
last_update=iso8601.parse_date(repo["last_activity_at"]),
)
def commit_page(self, page_result: PageResult) -> None:
"""Update currently stored state using the latest listed "next" page if relevant.
Relevancy is determined by the next_page link whose 'page' id must be strictly
superior to the currently stored one.
Note: this is a noop for full listing mode
"""
if self.incremental:
# link: https://${project-api}/?...&page=2x...
next_page = page_result.next_page
if not next_page and self.last_page:
next_page = self.last_page
if next_page:
id_after = _parse_id_after(next_page)
previous_next_page = self.state.last_seen_next_link
previous_id_after = _parse_id_after(previous_next_page)
if previous_next_page is None or (
previous_id_after and id_after and previous_id_after < id_after
):
self.state.last_seen_next_link = next_page
def finalize(self) -> None:
"""finalize the lister state when relevant (see `fn:commit_page` for details)
Note: this is a noop for full listing mode
"""
next_page = self.state.last_seen_next_link
if self.incremental and next_page:
# link: https://${project-api}/?...&page=2x...
next_id_after = _parse_id_after(next_page)
scheduler_state = self.get_state_from_scheduler()
previous_next_id_after = _parse_id_after(
scheduler_state.last_seen_next_link
)
if (not previous_next_id_after and next_id_after) or (
previous_next_id_after
and next_id_after
and previous_next_id_after < next_id_after
):
self.updated = True
diff --git a/swh/lister/pattern.py b/swh/lister/pattern.py
index 6f077e4..63a2fff 100644
--- a/swh/lister/pattern.py
+++ b/swh/lister/pattern.py
@@ -1,278 +1,284 @@
-# Copyright (C) 2020 The Software Heritage developers
+# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from __future__ import annotations
+
from dataclasses import dataclass
from typing import Any, Dict, Generic, Iterable, Iterator, List, Optional, TypeVar
+from urllib.parse import urlparse
from swh.core.config import load_from_envvar
from swh.core.utils import grouper
from swh.scheduler import get_scheduler, model
from swh.scheduler.interface import SchedulerInterface
@dataclass
class ListerStats:
pages: int = 0
origins: int = 0
- def __add__(self, other: "ListerStats") -> "ListerStats":
+ def __add__(self, other: ListerStats) -> ListerStats:
return self.__class__(self.pages + other.pages, self.origins + other.origins)
- def __iadd__(self, other: "ListerStats"):
+ def __iadd__(self, other: ListerStats):
self.pages += other.pages
self.origins += other.origins
def dict(self) -> Dict[str, int]:
return {"pages": self.pages, "origins": self.origins}
StateType = TypeVar("StateType")
PageType = TypeVar("PageType")
BackendStateType = Dict[str, Any]
CredentialsType = Optional[Dict[str, Dict[str, List[Dict[str, str]]]]]
class Lister(Generic[StateType, PageType]):
"""The base class for a Software Heritage lister.
A lister scrapes a page by page list of origins from an upstream (a forge, the API
of a package manager, ...), and massages the results of that scrape into a list of
origins that are recorded by the scheduler backend.
The main loop of the lister, :meth:`run`, basically revolves around the
:meth:`get_pages` iterator, which sets up the lister state, then yields the scrape
results page by page. The :meth:`get_origins_from_page` method converts the pages
into a list of :class:`model.ListedOrigin`, sent to the scheduler at every page. The
:meth:`commit_page` method can be used to update the lister state after a page of
origins has been recorded in the scheduler backend.
The :func:`finalize` method is called at lister teardown (whether the run has
been successful or not) to update the local :attr:`state` object before it's sent to
the database. This method must set the :attr:`updated` attribute if an updated
state needs to be sent to the scheduler backend. This method can call
:func:`get_state_from_scheduler` to refresh and merge the lister state from the
scheduler before it's finalized (and potentially minimize the risk of race
conditions between concurrent runs of the lister).
The state of the lister is serialized and deserialized from the dict stored in the
scheduler backend, using the :meth:`state_from_dict` and :meth:`state_to_dict`
methods.
Args:
scheduler: the instance of the Scheduler being used to register the
origins listed by this lister
url: a URL representing this lister, e.g. the API's base URL
- instance: the instance name used, in conjunction with :attr:`LISTER_NAME`, to
- uniquely identify this lister instance.
+ instance: the instance name, to uniquely identify this lister instance,
+ if not provided the URL network location will be used
credentials: dictionary of credentials for all listers. The first level
identifies the :attr:`LISTER_NAME`, the second level the lister
:attr:`instance`. The final level is a list of dicts containing the
expected credentials for the given instance of that lister.
Generic types:
- *StateType*: concrete lister type; should usually be a :class:`dataclass` for
stricter typing
- *PageType*: type of scrape results; can usually be a :class:`requests.Response`,
or a :class:`dict`
"""
LISTER_NAME: str = ""
def __init__(
self,
scheduler: SchedulerInterface,
url: str,
- instance: str,
+ instance: Optional[str] = None,
credentials: CredentialsType = None,
):
if not self.LISTER_NAME:
raise ValueError("Must set the LISTER_NAME attribute on Lister classes")
self.url = url
- self.instance = instance
+ if instance is not None:
+ self.instance = instance
+ else:
+ self.instance = urlparse(url).netloc
self.scheduler = scheduler
if not credentials:
credentials = {}
self.credentials = list(
credentials.get(self.LISTER_NAME, {}).get(self.instance, [])
)
# store the initial state of the lister
self.state = self.get_state_from_scheduler()
self.updated = False
def run(self) -> ListerStats:
"""Run the lister.
Returns:
A counter with the number of pages and origins seen for this run
of the lister.
"""
full_stats = ListerStats()
try:
for page in self.get_pages():
full_stats.pages += 1
origins = self.get_origins_from_page(page)
full_stats.origins += self.send_origins(origins)
self.commit_page(page)
finally:
self.finalize()
if self.updated:
self.set_state_in_scheduler()
return full_stats
def get_state_from_scheduler(self) -> StateType:
"""Update the state in the current instance from the state in the scheduler backend.
This updates :attr:`lister_obj`, and returns its (deserialized) current state,
to allow for comparison with the local state.
Returns:
the state retrieved from the scheduler backend
"""
self.lister_obj = self.scheduler.get_or_create_lister(
name=self.LISTER_NAME, instance_name=self.instance
)
return self.state_from_dict(self.lister_obj.current_state)
def set_state_in_scheduler(self) -> None:
"""Update the state in the scheduler backend from the state of the current
instance.
Raises:
swh.scheduler.exc.StaleData: in case of a race condition between
concurrent listers (from :meth:`swh.scheduler.Scheduler.update_lister`).
"""
self.lister_obj.current_state = self.state_to_dict(self.state)
self.lister_obj = self.scheduler.update_lister(self.lister_obj)
# State management to/from the scheduler
def state_from_dict(self, d: BackendStateType) -> StateType:
"""Convert the state stored in the scheduler backend (as a dict),
to the concrete StateType for this lister."""
raise NotImplementedError
def state_to_dict(self, state: StateType) -> BackendStateType:
"""Convert the StateType for this lister to its serialization as dict for
storage in the scheduler.
Values must be JSON-compatible as that's what the backend database expects.
"""
raise NotImplementedError
def finalize(self) -> None:
"""Custom hook to finalize the lister state before returning from the main loop.
This method must set :attr:`updated` if the lister has done some work.
If relevant, this method can use :meth`get_state_from_scheduler` to merge the
current lister state with the one from the scheduler backend, reducing the risk
of race conditions if we're running concurrent listings.
This method is called in a `finally` block, which means it will also run when
the lister fails.
"""
pass
# Actual listing logic
def get_pages(self) -> Iterator[PageType]:
"""Retrieve a list of pages of listed results. This is the main loop of the lister.
Returns:
an iterator of raw pages fetched from the platform currently being listed.
"""
raise NotImplementedError
def get_origins_from_page(self, page: PageType) -> Iterator[model.ListedOrigin]:
"""Extract a list of :class:`model.ListedOrigin` from a raw page of results.
Args:
page: a single page of results
Returns:
an iterator for the origins present on the given page of results
"""
raise NotImplementedError
def commit_page(self, page: PageType) -> None:
"""Custom hook called after the current page has been committed in the scheduler
backend.
This method can be used to update the state after a page of origins has been
successfully recorded in the scheduler backend. If the new state should be
recorded at the point the lister completes, the :attr:`updated` attribute must
be set.
"""
pass
def send_origins(self, origins: Iterable[model.ListedOrigin]) -> int:
"""Record a list of :class:`model.ListedOrigin` in the scheduler.
Returns:
the number of listed origins recorded in the scheduler
"""
count = 0
for batch_origins in grouper(origins, n=1000):
ret = self.scheduler.record_listed_origins(batch_origins)
count += len(ret)
return count
@classmethod
def from_config(cls, scheduler: Dict[str, Any], **config: Any):
"""Instantiate a lister from a configuration dict.
This is basically a backwards-compatibility shim for the CLI.
Args:
scheduler: instantiation config for the scheduler
config: the configuration dict for the lister, with the following keys:
- credentials (optional): credentials list for the scheduler
- any other kwargs passed to the lister.
Returns:
the instantiated lister
"""
# Drop the legacy config keys which aren't used for this generation of listers.
for legacy_key in ("storage", "lister", "celery"):
config.pop(legacy_key, None)
# Instantiate the scheduler
scheduler_instance = get_scheduler(**scheduler)
return cls(scheduler=scheduler_instance, **config)
@classmethod
def from_configfile(cls, **kwargs: Any):
"""Instantiate a lister from the configuration loaded from the
SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments
if their value is not None.
Args:
kwargs: kwargs passed to the lister instantiation
"""
config = dict(load_from_envvar())
config.update({k: v for k, v in kwargs.items() if v is not None})
return cls.from_config(**config)
class StatelessLister(Lister[None, PageType], Generic[PageType]):
def state_from_dict(self, d: BackendStateType) -> None:
"""Always return empty state"""
return None
def state_to_dict(self, state: None) -> BackendStateType:
"""Always set empty state"""
return {}
diff --git a/swh/lister/phabricator/lister.py b/swh/lister/phabricator/lister.py
index 193239c..1dbee37 100644
--- a/swh/lister/phabricator/lister.py
+++ b/swh/lister/phabricator/lister.py
@@ -1,182 +1,183 @@
-# Copyright (C) 2019-2020 the Software Heritage developers
+# Copyright (C) 2019-2021 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
import logging
import random
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import urljoin
import requests
from swh.lister import USER_AGENT
from swh.lister.pattern import CredentialsType, StatelessLister
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
logger = logging.getLogger(__name__)
PageType = List[Dict[str, Any]]
class PhabricatorLister(StatelessLister[PageType]):
"""
List all repositories hosted on a Phabricator instance.
Args:
url: base URL of a phabricator forge
(for instance https://forge.softwareheritage.org)
- instance: string identifier for the listed forge
+ instance: string identifier for the listed forge,
+ URL network location will be used if not provided
api_token: authentication token for Conduit API
"""
LISTER_NAME = "phabricator"
API_REPOSITORY_PATH = "/api/diffusion.repository.search"
def __init__(
self,
scheduler: SchedulerInterface,
url: str,
- instance: str,
+ instance: Optional[str] = None,
api_token: Optional[str] = None,
credentials: CredentialsType = None,
):
super().__init__(
scheduler, urljoin(url, self.API_REPOSITORY_PATH), instance, credentials
)
self.session = requests.Session()
self.session.headers.update(
{"Accept": "application/json", "User-Agent": USER_AGENT}
)
if api_token is not None:
self.api_token = api_token
else:
if not self.credentials:
raise ValueError(
f"No credentials found for phabricator instance {self.instance};"
" Please set them in the lister configuration file."
)
self.api_token = random.choice(self.credentials)["password"]
def get_request_params(self, after: Optional[str]) -> Dict[str, str]:
"""Get the query parameters for the request."""
base_params = {
# Stable order
"order": "oldest",
# Add all URIs to the response
"attachments[uris]": "1",
# API token from stored credentials
"api.token": self.api_token,
}
if after is not None:
base_params["after"] = after
return base_params
@staticmethod
def filter_params(params: Dict[str, str]) -> Dict[str, str]:
"""Filter the parameters for debug purposes"""
return {
k: (v if k != "api.token" else "**redacted**") for k, v in params.items()
}
def get_pages(self) -> Iterator[PageType]:
after: Optional[str] = None
while True:
params = self.get_request_params(after)
logger.debug(
"Retrieving results on URI %s with parameters %s",
self.url,
self.filter_params(params),
)
response = self.session.post(self.url, data=params)
if response.status_code != 200:
logger.warning(
"Unexpected HTTP status code %s on %s: %s",
response.status_code,
response.url,
response.content,
)
response.raise_for_status()
response_data = response.json()
if response_data.get("result") is None:
logger.warning(
"Got unexpected response on %s: %s", response.url, response_data,
)
break
result = response_data["result"]
yield result["data"]
after = None
if "cursor" in result and "after" in result["cursor"]:
after = result["cursor"]["after"]
if not after:
logger.debug("Empty `after` cursor. All done")
break
def get_origins_from_page(self, page: PageType) -> Iterator[ListedOrigin]:
assert self.lister_obj.id is not None
for repo in page:
url = get_repo_url(repo["attachments"]["uris"]["uris"])
if url is None:
short_name: Optional[str] = None
for field in "shortName", "name", "callsign":
short_name = repo["fields"].get(field)
if short_name:
break
logger.warning(
"No valid url for repository [%s] (phid=%s)",
short_name or repo["phid"],
repo["phid"],
)
continue
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=url,
visit_type=repo["fields"]["vcs"],
# The "dateUpdated" field returned by the Phabricator API only refers to
# the repository metadata; We can't use it for our purposes.
last_update=None,
)
def get_repo_url(attachments: List[Dict[str, Any]]) -> Optional[str]:
"""
Return url for a hosted repository from its uris attachments according
to the following priority lists:
* protocol: https > http
* identifier: shortname > callsign > id
"""
processed_urls = defaultdict(dict) # type: Dict[str, Any]
for uri in attachments:
protocol = uri["fields"]["builtin"]["protocol"]
url = uri["fields"]["uri"]["effective"]
identifier = uri["fields"]["builtin"]["identifier"]
if protocol in ("http", "https"):
processed_urls[protocol][identifier] = url
elif protocol is None:
for protocol in ("https", "http"):
if url.startswith(protocol):
processed_urls[protocol]["undefined"] = url
break
for protocol in ["https", "http"]:
for identifier in ["shortname", "callsign", "id", "undefined"]:
if protocol in processed_urls and identifier in processed_urls[protocol]:
return processed_urls[protocol][identifier]
return None
diff --git a/swh/lister/tests/test_pattern.py b/swh/lister/tests/test_pattern.py
index 2ff15e9..795b715 100644
--- a/swh/lister/tests/test_pattern.py
+++ b/swh/lister/tests/test_pattern.py
@@ -1,186 +1,198 @@
-# Copyright (C) 2020 The Software Heritage developers
+# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import TYPE_CHECKING, Any, Dict, Iterator, List
import pytest
from swh.lister import pattern
from swh.scheduler.model import ListedOrigin
StateType = Dict[str, str]
OriginType = Dict[str, str]
PageType = List[OriginType]
class InstantiableLister(pattern.Lister[StateType, PageType]):
"""A lister that can only be instantiated, not run."""
LISTER_NAME = "test-pattern-lister"
def state_from_dict(self, d: Dict[str, str]) -> StateType:
return d
def test_instantiation(swh_scheduler):
lister = InstantiableLister(
scheduler=swh_scheduler, url="https://example.com", instance="example.com"
)
# check the lister was registered in the scheduler backend
stored_lister = swh_scheduler.get_or_create_lister(
name="test-pattern-lister", instance_name="example.com"
)
assert stored_lister == lister.lister_obj
with pytest.raises(NotImplementedError):
lister.run()
+def test_lister_instance_name(swh_scheduler):
+ lister = InstantiableLister(
+ scheduler=swh_scheduler, url="https://example.org", instance="example"
+ )
+
+ assert lister.instance == "example"
+
+ lister = InstantiableLister(scheduler=swh_scheduler, url="https://example.org")
+
+ assert lister.instance == "example.org"
+
+
def test_instantiation_from_configfile(swh_scheduler, mocker):
mock_load_from_envvar = mocker.patch("swh.lister.pattern.load_from_envvar")
mock_get_scheduler = mocker.patch("swh.lister.pattern.get_scheduler")
mock_load_from_envvar.return_value = {
"scheduler": {},
"url": "foo",
"instance": "bar",
}
mock_get_scheduler.return_value = swh_scheduler
lister = InstantiableLister.from_configfile()
assert lister.url == "foo"
assert lister.instance == "bar"
lister = InstantiableLister.from_configfile(url="bar", instance="foo")
assert lister.url == "bar"
assert lister.instance == "foo"
lister = InstantiableLister.from_configfile(url=None, instance="foo")
assert lister.url == "foo"
assert lister.instance == "foo"
if TYPE_CHECKING:
_Base = pattern.Lister[Any, PageType]
else:
_Base = object
class ListerMixin(_Base):
def get_pages(self) -> Iterator[PageType]:
for pageno in range(2):
yield [
{"url": f"https://example.com/{pageno:02d}{i:03d}"} for i in range(10)
]
def get_origins_from_page(self, page: PageType) -> Iterator[ListedOrigin]:
assert self.lister_obj.id is not None
for origin in page:
yield ListedOrigin(
lister_id=self.lister_obj.id, url=origin["url"], visit_type="git"
)
def check_listed_origins(swh_scheduler, lister, stored_lister):
"""Check that the listed origins match the ones in the lister"""
# Gather the origins that are supposed to be listed
lister_urls = sorted(
sum([[o["url"] for o in page] for page in lister.get_pages()], [])
)
# And check the state of origins in the scheduler
ret = swh_scheduler.get_listed_origins()
assert ret.next_page_token is None
assert len(ret.results) == len(lister_urls)
for origin, expected_url in zip(ret.results, lister_urls):
assert origin.url == expected_url
assert origin.lister_id == stored_lister.id
class RunnableLister(ListerMixin, InstantiableLister):
"""A lister that can be run."""
def state_to_dict(self, state: StateType) -> Dict[str, str]:
return state
def finalize(self) -> None:
self.state["updated"] = "yes"
self.updated = True
def test_run(swh_scheduler):
lister = RunnableLister(
scheduler=swh_scheduler, url="https://example.com", instance="example.com"
)
assert "updated" not in lister.state
update_date = lister.lister_obj.updated
run_result = lister.run()
assert run_result.pages == 2
assert run_result.origins == 20
stored_lister = swh_scheduler.get_or_create_lister(
name="test-pattern-lister", instance_name="example.com"
)
# Check that the finalize operation happened
assert stored_lister.updated > update_date
assert stored_lister.current_state["updated"] == "yes"
check_listed_origins(swh_scheduler, lister, stored_lister)
class InstantiableStatelessLister(pattern.StatelessLister[PageType]):
LISTER_NAME = "test-stateless-lister"
def test_stateless_instantiation(swh_scheduler):
lister = InstantiableStatelessLister(
scheduler=swh_scheduler, url="https://example.com", instance="example.com",
)
# check the lister was registered in the scheduler backend
stored_lister = swh_scheduler.get_or_create_lister(
name="test-stateless-lister", instance_name="example.com"
)
assert stored_lister == lister.lister_obj
assert stored_lister.current_state == {}
assert lister.state is None
with pytest.raises(NotImplementedError):
lister.run()
class RunnableStatelessLister(ListerMixin, InstantiableStatelessLister):
def finalize(self):
self.updated = True
def test_stateless_run(swh_scheduler):
lister = RunnableStatelessLister(
scheduler=swh_scheduler, url="https://example.com", instance="example.com"
)
update_date = lister.lister_obj.updated
run_result = lister.run()
assert run_result.pages == 2
assert run_result.origins == 20
stored_lister = swh_scheduler.get_or_create_lister(
name="test-stateless-lister", instance_name="example.com"
)
# Check that the finalize operation happened
assert stored_lister.updated > update_date
assert stored_lister.current_state == {}
# And that all origins are stored
check_listed_origins(swh_scheduler, lister, stored_lister)
diff --git a/swh/lister/tuleap/lister.py b/swh/lister/tuleap/lister.py
index 6145359..b630508 100644
--- a/swh/lister/tuleap/lister.py
+++ b/swh/lister/tuleap/lister.py
@@ -1,150 +1,146 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import urljoin
import iso8601
import requests
from tenacity.before_sleep import before_sleep_log
-from urllib3.util import parse_url
from swh.lister.utils import throttling_retry
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from .. import USER_AGENT
from ..pattern import CredentialsType, StatelessLister
logger = logging.getLogger(__name__)
RepoPage = Dict[str, Any]
class TuleapLister(StatelessLister[RepoPage]):
"""List origins from Tuleap.
Tuleap provides SVN and Git repositories hosting.
Tuleap API getting started:
https://tuleap.net/doc/en/user-guide/integration/rest.html
Tuleap API reference:
https://tuleap.net/api/explorer/
Using the API we first request a list of projects, and from there request their
associated repositories individually. Everything is paginated, code uses throttling
at the individual GET call level."""
LISTER_NAME = "tuleap"
REPO_LIST_PATH = "/api"
REPO_GIT_PATH = "plugins/git/"
REPO_SVN_PATH = "plugins/svn/"
def __init__(
self,
scheduler: SchedulerInterface,
url: str,
instance: Optional[str] = None,
credentials: CredentialsType = None,
):
- if instance is None:
- instance = parse_url(url).host
-
super().__init__(
scheduler=scheduler, credentials=credentials, url=url, instance=instance,
)
self.session = requests.Session()
self.session.headers.update(
{"Accept": "application/json", "User-Agent": USER_AGENT,}
)
@throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING))
def page_request(self, url: str, params: Dict[str, Any]) -> requests.Response:
logger.info("Fetching URL %s with params %s", url, params)
response = self.session.get(url, params=params)
if response.status_code != 200:
logger.warning(
"Unexpected HTTP status code %s on %s: %s",
response.status_code,
response.url,
response.content,
)
response.raise_for_status()
return response
@classmethod
def results_simplified(cls, url: str, repo_type: str, repo: RepoPage) -> RepoPage:
if repo_type == "git":
prefix_url = TuleapLister.REPO_GIT_PATH
else:
prefix_url = TuleapLister.REPO_SVN_PATH
rep = {
"project": repo["name"],
"type": repo_type,
"uri": urljoin(url, f"{prefix_url}{repo['path']}"),
"last_update_date": repo["last_update_date"],
}
return rep
def _get_repositories(self, url_repo) -> List[Dict[str, Any]]:
ret = self.page_request(url_repo, {})
reps_list = ret.json()["repositories"]
limit = int(ret.headers["X-PAGINATION-LIMIT-MAX"])
offset = int(ret.headers["X-PAGINATION-LIMIT"])
size = int(ret.headers["X-PAGINATION-SIZE"])
while offset < size:
url_offset = url_repo + "?offset=" + str(offset) + "&limit=" + str(limit)
ret = self.page_request(url_offset, {}).json()
reps_list = reps_list + ret["repositories"]
offset += limit
return reps_list
def get_pages(self) -> Iterator[RepoPage]:
# base with trailing slash, path without leading slash for urljoin
url_api: str = urljoin(self.url, self.REPO_LIST_PATH)
url_projects = url_api + "/projects/"
# Get the list of projects.
response = self.page_request(url_projects, {})
projects_list = response.json()
limit = int(response.headers["X-PAGINATION-LIMIT-MAX"])
offset = int(response.headers["X-PAGINATION-LIMIT"])
size = int(response.headers["X-PAGINATION-SIZE"])
while offset < size:
url_offset = (
url_projects + "?offset=" + str(offset) + "&limit=" + str(limit)
)
ret = self.page_request(url_offset, {}).json()
projects_list = projects_list + ret
offset += limit
# Get list of repositories for each project.
for p in projects_list:
p_id = p["id"]
# Fetch Git repositories for project
url_git = url_projects + str(p_id) + "/git"
repos = self._get_repositories(url_git)
for repo in repos:
yield self.results_simplified(url_api, "git", repo)
def get_origins_from_page(self, page: RepoPage) -> Iterator[ListedOrigin]:
"""Convert a page of Tuleap repositories into a list of ListedOrigins.
"""
assert self.lister_obj.id is not None
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=page["uri"],
visit_type=page["type"],
last_update=iso8601.parse_date(page["last_update_date"]),
)