diff --git a/docs/new_lister_template.py b/docs/new_lister_template.py
index 32d2152..20e3e90 100644
--- a/docs/new_lister_template.py
+++ b/docs/new_lister_template.py
@@ -1,166 +1,165 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import asdict, dataclass
import logging
from typing import Any, Dict, Iterator, List
from urllib.parse import urljoin
import requests
from tenacity.before_sleep import before_sleep_log
from swh.lister.utils import throttling_retry
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from .. import USER_AGENT
from ..pattern import CredentialsType, Lister
logger = logging.getLogger(__name__)
# Aliasing the page results returned by `get_pages` method from the lister.
NewForgeListerPage = List[Dict[str, Any]]
@dataclass
class NewForgeListerState:
- """The NewForgeLister instance state. This is used for incremental listing.
-
- """
+ """The NewForgeLister instance state. This is used for incremental listing."""
current: str = ""
"""Id of the last origin listed on an incremental pass"""
# If there is no need to keep state, subclass StatelessLister[NewForgeListerPage]
class NewForgeLister(Lister[NewForgeListerState, NewForgeListerPage]):
- """List origins from the "NewForge" forge.
-
- """
+ """List origins from the "NewForge" forge."""
# Part of the lister API, that identifies this lister
LISTER_NAME = ""
# (Optional) CVS type of the origins listed by this lister, if constant
VISIT_TYPE = ""
# Instance URLs include the hostname and the common path prefix of processed URLs
EXAMPLE_BASE_URL = "https://netloc/api/v1/"
# Path of a specific resource to process, to join the base URL with
EXAMPLE_PATH = "origins/list"
def __init__(
self,
# Required
scheduler: SchedulerInterface,
# Instance URL, required for multi-instances listers (e.g gitlab, ...)
url: str,
# Instance name (free form) required for multi-instance listers,
# or computed from `url`
instance: str,
# Required whether lister supports authentication or not
credentials: CredentialsType = None,
):
super().__init__(
- scheduler=scheduler, credentials=credentials, url=url, instance=instance,
+ scheduler=scheduler,
+ credentials=credentials,
+ url=url,
+ instance=instance,
)
self.session = requests.Session()
# Declare the USER_AGENT is more sysadm-friendly for the forge we list
self.session.headers.update(
{"Accept": "application/json", "User-Agent": USER_AGENT}
)
def state_from_dict(self, d: Dict[str, Any]) -> NewForgeListerState:
return NewForgeListerState(**d)
def state_to_dict(self, state: NewForgeListerState) -> Dict[str, Any]:
return asdict(state)
@throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING))
def page_request(self, url, params) -> requests.Response:
# Do the network resource request under a retrying decorator
# to handle rate limiting and transient errors up to a limit.
# `throttling_retry` by default use the `requests` library to check
# only for rate-limit and a base-10 exponential waiting strategy.
# This can be customized by passed waiting, retrying and logging strategies
# as functions. See the `tenacity` library documentation.
# Log listed URL to ease debugging
logger.debug("Fetching URL %s with params %s", url, params)
response = self.session.get(url, params=params)
if response.status_code != 200:
# Log response content to ease debugging
logger.warning(
"Unexpected HTTP status code %s on %s: %s",
response.status_code,
response.url,
response.content,
)
# The lister must fail on blocking errors
response.raise_for_status()
return response
def get_pages(self) -> Iterator[NewForgeListerPage]:
# The algorithm depends on the service, but should request data reliably,
# following pagination if relevant and yielding pages in a streaming fashion.
# If incremental listing is supported, initialize from saved lister state.
# Make use of any next page URL provided.
# Simplify the results early to ease testing and debugging.
# Initialize from the lister saved state
current = ""
if self.state.current is not None:
current = self.state.current
# Construct the URL of a service endpoint, the lister can have others to fetch
url = urljoin(self.url, self.EXAMPLE_PATH)
while current is not None:
# Parametrize the request for incremental listing
body = self.page_request(url, {"current": current}).json()
# Simplify the page if possible to only the necessary elements
# and yield it
yield body
# Get the next page parameter or end the loop when there is none
current = body.get("next")
def get_origins_from_page(self, page: NewForgeListerPage) -> Iterator[ListedOrigin]:
"""Convert a page of NewForgeLister repositories into a list of ListedOrigins"""
assert self.lister_obj.id is not None
for element in page:
yield ListedOrigin(
# Required. Should use this value.
lister_id=self.lister_obj.id,
# Required. Visit type of the currently processed origin
visit_type=self.VISIT_TYPE,
# Required. URL corresponding to the origin for loaders to ingest
url=...,
# Should get it if the service provides it and if it induces no
# substantial additional processing cost
last_update=...,
)
def commit_page(self, page: NewForgeListerPage) -> None:
# Update the lister state to the latest `current`
current = page[-1]["current"]
if current > self.state.current:
self.state.current = current
def finalize(self) -> None:
# Pull fresh lister state from the scheduler backend, in case multiple
# listers run concurrently
scheduler_state = self.get_state_from_scheduler()
# Update the lister state in the backend only if `current` is fresher than
# the one stored in the database.
if self.state.current > scheduler_state.current:
self.updated = True
diff --git a/swh/lister/bitbucket/lister.py b/swh/lister/bitbucket/lister.py
index ab364a9..6a99699 100644
--- a/swh/lister/bitbucket/lister.py
+++ b/swh/lister/bitbucket/lister.py
@@ -1,200 +1,198 @@
# Copyright (C) 2017-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import asdict, dataclass
from datetime import datetime
import logging
import random
from typing import Any, Dict, Iterator, List, Optional
from urllib import parse
import iso8601
import requests
from tenacity.before_sleep import before_sleep_log
from swh.lister.utils import throttling_retry
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from .. import USER_AGENT
from ..pattern import CredentialsType, Lister
logger = logging.getLogger(__name__)
@dataclass
class BitbucketListerState:
"""State of Bitbucket lister"""
last_repo_cdate: Optional[datetime] = None
"""Creation date and time of the last listed repository during an
incremental pass"""
class BitbucketLister(Lister[BitbucketListerState, List[Dict[str, Any]]]):
"""List origins from Bitbucket using its API.
Bitbucket API has the following rate-limit configuration:
* 60 requests per hour for anonymous users
* 1000 requests per hour for authenticated users
The lister is working in anonymous mode by default but Bitbucket account
credentials can be provided to perform authenticated requests.
"""
LISTER_NAME = "bitbucket"
INSTANCE = "bitbucket"
API_URL = "https://api.bitbucket.org/2.0/repositories"
def __init__(
self,
scheduler: SchedulerInterface,
page_size: int = 1000,
incremental: bool = True,
credentials: CredentialsType = None,
):
super().__init__(
scheduler=scheduler,
credentials=credentials,
url=self.API_URL,
instance=self.INSTANCE,
)
self.incremental = incremental
self.url_params: Dict[str, Any] = {
"pagelen": page_size,
# only return needed JSON fields in bitbucket API responses
# (also prevent errors 500 when listing)
"fields": (
"next,values.links.clone.href,values.scm,values.updated_on,"
"values.created_on"
),
}
self.session = requests.Session()
self.session.headers.update(
{"Accept": "application/json", "User-Agent": USER_AGENT}
)
if len(self.credentials) > 0:
cred = random.choice(self.credentials)
logger.warning("Using Bitbucket credentials from user %s", cred["username"])
self.set_credentials(cred["username"], cred["password"])
else:
logger.warning("No credentials set in configuration, using anonymous mode")
def state_from_dict(self, d: Dict[str, Any]) -> BitbucketListerState:
last_repo_cdate = d.get("last_repo_cdate")
if last_repo_cdate is not None:
d["last_repo_cdate"] = iso8601.parse_date(last_repo_cdate)
return BitbucketListerState(**d)
def state_to_dict(self, state: BitbucketListerState) -> Dict[str, Any]:
d = asdict(state)
last_repo_cdate = d.get("last_repo_cdate")
if last_repo_cdate is not None:
d["last_repo_cdate"] = last_repo_cdate.isoformat()
return d
def set_credentials(self, username: Optional[str], password: Optional[str]) -> None:
"""Set basic authentication headers with given credentials."""
if username is not None and password is not None:
self.session.auth = (username, password)
@throttling_retry(before_sleep=before_sleep_log(logger, logging.DEBUG))
def page_request(self, last_repo_cdate: str) -> requests.Response:
self.url_params["after"] = last_repo_cdate
logger.debug("Fetching URL %s with params %s", self.url, self.url_params)
response = self.session.get(self.url, params=self.url_params)
if response.status_code != 200:
logger.warning(
"Unexpected HTTP status code %s on %s: %s",
response.status_code,
response.url,
response.content,
)
response.raise_for_status()
return response
def get_pages(self) -> Iterator[List[Dict[str, Any]]]:
last_repo_cdate: str = "1970-01-01"
if (
self.incremental
and self.state is not None
and self.state.last_repo_cdate is not None
):
last_repo_cdate = self.state.last_repo_cdate.isoformat()
while True:
body = self.page_request(last_repo_cdate).json()
yield body["values"]
next_page_url = body.get("next")
if next_page_url is not None:
next_page_url = parse.urlparse(next_page_url)
if not next_page_url.query:
logger.warning("Failed to parse url %s", next_page_url)
break
last_repo_cdate = parse.parse_qs(next_page_url.query)["after"][0]
else:
# last page
break
def get_origins_from_page(
self, page: List[Dict[str, Any]]
) -> Iterator[ListedOrigin]:
- """Convert a page of Bitbucket repositories into a list of ListedOrigins.
-
- """
+ """Convert a page of Bitbucket repositories into a list of ListedOrigins."""
assert self.lister_obj.id is not None
for repo in page:
last_update = iso8601.parse_date(repo["updated_on"])
origin_url = repo["links"]["clone"][0]["href"]
origin_type = repo["scm"]
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=origin_url,
visit_type=origin_type,
last_update=last_update,
)
def commit_page(self, page: List[Dict[str, Any]]) -> None:
"""Update the currently stored state using the latest listed page."""
if self.incremental:
last_repo = page[-1]
last_repo_cdate = iso8601.parse_date(last_repo["created_on"])
if (
self.state.last_repo_cdate is None
or last_repo_cdate > self.state.last_repo_cdate
):
self.state.last_repo_cdate = last_repo_cdate
def finalize(self) -> None:
if self.incremental:
scheduler_state = self.get_state_from_scheduler()
if self.state.last_repo_cdate is None:
return
# Update the lister state in the backend only if the last seen id of
# the current run is higher than that stored in the database.
if (
scheduler_state.last_repo_cdate is None
or self.state.last_repo_cdate > scheduler_state.last_repo_cdate
):
self.updated = True
diff --git a/swh/lister/bitbucket/tests/test_lister.py b/swh/lister/bitbucket/tests/test_lister.py
index ee5d79e..c568dbf 100644
--- a/swh/lister/bitbucket/tests/test_lister.py
+++ b/swh/lister/bitbucket/tests/test_lister.py
@@ -1,181 +1,184 @@
# Copyright (C) 2017-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime
import json
import os
import pytest
from swh.lister.bitbucket.lister import BitbucketLister
@pytest.fixture
def bb_api_repositories_page1(datadir):
data_file_path = os.path.join(datadir, "bb_api_repositories_page1.json")
with open(data_file_path, "r") as data_file:
return json.load(data_file)
@pytest.fixture
def bb_api_repositories_page2(datadir):
data_file_path = os.path.join(datadir, "bb_api_repositories_page2.json")
with open(data_file_path, "r") as data_file:
return json.load(data_file)
def _check_listed_origins(lister_origins, scheduler_origins):
"""Asserts that the two collections have the same origins from the point of view of
the lister"""
sorted_lister_origins = list(sorted(lister_origins))
sorted_scheduler_origins = list(sorted(scheduler_origins))
assert len(sorted_lister_origins) == len(sorted_scheduler_origins)
for lo, so in zip(sorted_lister_origins, sorted_scheduler_origins):
assert lo.url == so.url
assert lo.last_update == so.last_update
def test_bitbucket_incremental_lister(
swh_scheduler,
requests_mock,
mocker,
bb_api_repositories_page1,
bb_api_repositories_page2,
):
"""Simple Bitbucket listing with two pages containing 10 origins"""
requests_mock.get(
BitbucketLister.API_URL,
- [{"json": bb_api_repositories_page1}, {"json": bb_api_repositories_page2},],
+ [
+ {"json": bb_api_repositories_page1},
+ {"json": bb_api_repositories_page2},
+ ],
)
lister = BitbucketLister(scheduler=swh_scheduler, page_size=10)
# First listing
stats = lister.run()
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
assert stats.pages == 2
assert stats.origins == 20
assert len(scheduler_origins) == 20
assert lister.updated
lister_state = lister.get_state_from_scheduler()
last_repo_cdate = lister_state.last_repo_cdate.isoformat()
assert hasattr(lister_state, "last_repo_cdate")
assert last_repo_cdate == bb_api_repositories_page2["values"][-1]["created_on"]
# Second listing, restarting from last state
lister.session.get = mocker.spy(lister.session, "get")
lister.run()
url_params = lister.url_params
url_params["after"] = last_repo_cdate
lister.session.get.assert_called_once_with(lister.API_URL, params=url_params)
all_origins = (
bb_api_repositories_page1["values"] + bb_api_repositories_page2["values"]
)
_check_listed_origins(lister.get_origins_from_page(all_origins), scheduler_origins)
def test_bitbucket_lister_rate_limit_hit(
swh_scheduler,
requests_mock,
mocker,
bb_api_repositories_page1,
bb_api_repositories_page2,
):
"""Simple Bitbucket listing with two pages containing 10 origins"""
requests_mock.get(
BitbucketLister.API_URL,
[
{"json": bb_api_repositories_page1, "status_code": 200},
{"json": None, "status_code": 429},
{"json": None, "status_code": 429},
{"json": bb_api_repositories_page2, "status_code": 200},
],
)
lister = BitbucketLister(scheduler=swh_scheduler, page_size=10)
mocker.patch.object(lister.page_request.retry, "sleep")
stats = lister.run()
assert stats.pages == 2
assert stats.origins == 20
assert len(swh_scheduler.get_listed_origins(lister.lister_obj.id).results) == 20
def test_bitbucket_full_lister(
swh_scheduler,
requests_mock,
mocker,
bb_api_repositories_page1,
bb_api_repositories_page2,
):
"""Simple Bitbucket listing with two pages containing 10 origins"""
requests_mock.get(
BitbucketLister.API_URL,
[
{"json": bb_api_repositories_page1},
{"json": bb_api_repositories_page2},
{"json": bb_api_repositories_page1},
{"json": bb_api_repositories_page2},
],
)
credentials = {"bitbucket": {"bitbucket": [{"username": "u", "password": "p"}]}}
lister = BitbucketLister(
scheduler=swh_scheduler, page_size=10, incremental=True, credentials=credentials
)
assert lister.session.auth is not None
# First do a incremental run to have an initial lister state
stats = lister.run()
last_lister_state = lister.get_state_from_scheduler()
assert stats.origins == 20
# Then do the full run and verify lister state did not change
# Modify last listed repo modification date to check it will be not saved
# to lister state after its execution
last_page2_repo = bb_api_repositories_page2["values"][-1]
last_page2_repo["created_on"] = datetime.now().isoformat()
last_page2_repo["updated_on"] = datetime.now().isoformat()
lister = BitbucketLister(scheduler=swh_scheduler, page_size=10, incremental=False)
assert lister.session.auth is None
stats = lister.run()
assert stats.pages == 2
assert stats.origins == 20
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
# 20 because scheduler upserts based on (id, type, url)
assert len(scheduler_origins) == 20
# Modification on created_on SHOULD NOT impact lister state
assert lister.get_state_from_scheduler() == last_lister_state
# Modification on updated_on SHOULD impact lister state
all_origins = (
bb_api_repositories_page1["values"] + bb_api_repositories_page2["values"]
)
_check_listed_origins(lister.get_origins_from_page(all_origins), scheduler_origins)
diff --git a/swh/lister/bitbucket/tests/test_tasks.py b/swh/lister/bitbucket/tests/test_tasks.py
index a646aa7..892e7db 100644
--- a/swh/lister/bitbucket/tests/test_tasks.py
+++ b/swh/lister/bitbucket/tests/test_tasks.py
@@ -1,50 +1,58 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from unittest.mock import patch
from swh.lister.pattern import ListerStats
def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker):
res = swh_scheduler_celery_app.send_task("swh.lister.bitbucket.tasks.ping")
assert res
res.wait()
assert res.successful()
assert res.result == "OK"
@patch("swh.lister.bitbucket.tasks.BitbucketLister")
def test_incremental_listing(
lister, swh_scheduler_celery_app, swh_scheduler_celery_worker
):
lister.from_configfile.return_value = lister
lister.run.return_value = ListerStats(pages=5, origins=5000)
res = swh_scheduler_celery_app.send_task(
"swh.lister.bitbucket.tasks.IncrementalBitBucketLister",
- kwargs=dict(page_size=100, username="username", password="password",),
+ kwargs=dict(
+ page_size=100,
+ username="username",
+ password="password",
+ ),
)
assert res
res.wait()
assert res.successful()
lister.run.assert_called_once()
@patch("swh.lister.bitbucket.tasks.BitbucketLister")
def test_full_listing(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker):
lister.from_configfile.return_value = lister
lister.run.return_value = ListerStats(pages=5, origins=5000)
res = swh_scheduler_celery_app.send_task(
"swh.lister.bitbucket.tasks.FullBitBucketRelister",
- kwargs=dict(page_size=100, username="username", password="password",),
+ kwargs=dict(
+ page_size=100,
+ username="username",
+ password="password",
+ ),
)
assert res
res.wait()
assert res.successful()
lister.run.assert_called_once()
diff --git a/swh/lister/cgit/lister.py b/swh/lister/cgit/lister.py
index 3195f2a..c0d9113 100644
--- a/swh/lister/cgit/lister.py
+++ b/swh/lister/cgit/lister.py
@@ -1,212 +1,217 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
import logging
import re
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
import requests
from requests.exceptions import HTTPError
from tenacity.before_sleep import before_sleep_log
from swh.lister import USER_AGENT
from swh.lister.pattern import CredentialsType, StatelessLister
from swh.lister.utils import throttling_retry
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
logger = logging.getLogger(__name__)
Repositories = List[Dict[str, Any]]
class CGitLister(StatelessLister[Repositories]):
"""Lister class for CGit repositories.
This lister will retrieve the list of published git repositories by
parsing the HTML page(s) of the index retrieved at `url`.
The lister currently defines 2 listing behaviors:
- If the `base_git_url` is provided, the listed origin urls are computed out of the
base git url link and the one listed in the main listed page (resulting in less
HTTP queries than the 2nd behavior below). This is expected to be the main
deployed behavior.
- Otherwise (with no `base_git_url`), for each found git repository listed, one
extra HTTP query is made at the given url found in the main listing page to gather
published "Clone" URLs to be used as origin URL for that git repo. If several
"Clone" urls are provided, prefer the http/https one, if any, otherwise fallback
to the first one.
"""
LISTER_NAME = "cgit"
def __init__(
self,
scheduler: SchedulerInterface,
url: str,
instance: Optional[str] = None,
credentials: Optional[CredentialsType] = None,
base_git_url: Optional[str] = None,
):
"""Lister class for CGit repositories.
Args:
url: main URL of the CGit instance, i.e. url of the index
of published git repositories on this instance.
instance: Name of cgit instance. Defaults to url's network location
if unset.
base_git_url: Optional base git url which allows the origin url
computations.
"""
super().__init__(
- scheduler=scheduler, url=url, instance=instance, credentials=credentials,
+ scheduler=scheduler,
+ url=url,
+ instance=instance,
+ credentials=credentials,
)
self.session = requests.Session()
self.session.headers.update(
{"Accept": "application/html", "User-Agent": USER_AGENT}
)
self.base_git_url = base_git_url
@throttling_retry(before_sleep=before_sleep_log(logger, logging.DEBUG))
def _get_and_parse(self, url: str) -> BeautifulSoup:
"""Get the given url and parse the retrieved HTML using BeautifulSoup"""
response = self.session.get(url)
response.raise_for_status()
return BeautifulSoup(response.text, features="html.parser")
def get_pages(self) -> Iterator[Repositories]:
"""Generate git 'project' URLs found on the current CGit server
- The last_update date is retrieved on the list of repo page to avoid
- to compute it on the repository details which only give a date per branch
+ The last_update date is retrieved on the list of repo page to avoid
+ to compute it on the repository details which only give a date per branch
"""
next_page: Optional[str] = self.url
while next_page:
bs_idx = self._get_and_parse(next_page)
page_results = []
for tr in bs_idx.find("div", {"class": "content"}).find_all(
"tr", {"class": ""}
):
repository_link = tr.find("a")["href"]
repo_url = None
git_url = None
base_url = urljoin(self.url, repository_link).strip("/")
if self.base_git_url: # mapping provided
# computing git url
git_url = base_url.replace(self.url, self.base_git_url)
else:
# we compute the git detailed page url from which we will retrieve
# the git url (cf. self.get_origins_from_page)
repo_url = base_url
span = tr.find("span", {"class": re.compile("age-")})
last_updated_date = span.get("title") if span else None
page_results.append(
{
"url": repo_url,
"git_url": git_url,
"last_updated_date": last_updated_date,
}
)
yield page_results
try:
pager = bs_idx.find("ul", {"class": "pager"})
current_page = pager.find("a", {"class": "current"})
if current_page:
next_page = current_page.parent.next_sibling.a["href"]
next_page = urljoin(self.url, next_page)
except (AttributeError, KeyError):
# no pager, or no next page
next_page = None
def get_origins_from_page(
self, repositories: Repositories
) -> Iterator[ListedOrigin]:
"""Convert a page of cgit repositories into a list of ListedOrigins."""
assert self.lister_obj.id is not None
for repo in repositories:
origin_url = repo["git_url"] or self._get_origin_from_repository_url(
repo["url"]
)
if origin_url is None:
continue
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=origin_url,
visit_type="git",
last_update=_parse_last_updated_date(repo),
)
def _get_origin_from_repository_url(self, repository_url: str) -> Optional[str]:
"""Extract the git url from the repository page"""
try:
bs = self._get_and_parse(repository_url)
except HTTPError as e:
logger.warning(
"Unexpected HTTP status code %s on %s",
e.response.status_code,
e.response.url,
)
return None
# origin urls are listed on the repository page
# TODO check if forcing https is better or not ?
#
#
#
urls = [x["href"] for x in bs.find_all("a", {"rel": "vcs-git"})]
if not urls:
return None
# look for the http/https url, if any, and use it as origin_url
for url in urls:
if urlparse(url).scheme in ("http", "https"):
origin_url = url
break
else:
# otherwise, choose the first one
origin_url = urls[0]
return origin_url
def _parse_last_updated_date(repository: Dict[str, Any]) -> Optional[datetime]:
"""Parse the last updated date"""
date = repository.get("last_updated_date")
if not date:
return None
parsed_date = None
for date_format in ("%Y-%m-%d %H:%M:%S %z", "%Y-%m-%d %H:%M:%S (%Z)"):
try:
parsed_date = datetime.strptime(date, date_format)
# force UTC to avoid naive datetime
if not parsed_date.tzinfo:
parsed_date = parsed_date.replace(tzinfo=timezone.utc)
break
except Exception:
pass
if not parsed_date:
logger.warning(
- "Could not parse %s last_updated date: %s", repository["url"], date,
+ "Could not parse %s last_updated date: %s",
+ repository["url"],
+ date,
)
return parsed_date
diff --git a/swh/lister/cgit/tests/test_lister.py b/swh/lister/cgit/tests/test_lister.py
index 7f402c0..f996333 100644
--- a/swh/lister/cgit/tests/test_lister.py
+++ b/swh/lister/cgit/tests/test_lister.py
@@ -1,259 +1,267 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timedelta, timezone
import os
from typing import List
import pytest
from swh.core.pytest_plugin import requests_mock_datadir_factory
from swh.lister import __version__
from swh.lister.cgit.lister import CGitLister, _parse_last_updated_date
from swh.lister.pattern import ListerStats
def test_lister_cgit_get_pages_one_page(requests_mock_datadir, swh_scheduler):
url = "https://git.savannah.gnu.org/cgit/"
lister_cgit = CGitLister(swh_scheduler, url=url)
repos: List[List[str]] = list(lister_cgit.get_pages())
flattened_repos = sum(repos, [])
assert len(flattened_repos) == 977
assert flattened_repos[0]["url"] == "https://git.savannah.gnu.org/cgit/elisp-es.git"
# note the url below is NOT a subpath of /cgit/
assert (
flattened_repos[-1]["url"] == "https://git.savannah.gnu.org/path/to/yetris.git"
) # noqa
# note the url below is NOT on the same server
assert flattened_repos[-2]["url"] == "http://example.org/cgit/xstarcastle.git"
def test_lister_cgit_get_pages_with_pages(requests_mock_datadir, swh_scheduler):
url = "https://git.tizen/cgit/"
lister_cgit = CGitLister(swh_scheduler, url=url)
repos: List[List[str]] = list(lister_cgit.get_pages())
flattened_repos = sum(repos, [])
# we should have 16 repos (listed on 3 pages)
assert len(repos) == 3
assert len(flattened_repos) == 16
def test_lister_cgit_run_with_page(requests_mock_datadir, swh_scheduler):
"""cgit lister supports pagination"""
url = "https://git.tizen/cgit/"
lister_cgit = CGitLister(swh_scheduler, url=url)
stats = lister_cgit.run()
expected_nb_origins = 16
assert stats == ListerStats(pages=3, origins=expected_nb_origins)
# test page parsing
scheduler_origins = swh_scheduler.get_listed_origins(
lister_cgit.lister_obj.id
).results
assert len(scheduler_origins) == expected_nb_origins
# test listed repositories
for listed_origin in scheduler_origins:
assert listed_origin.visit_type == "git"
assert listed_origin.url.startswith("https://git.tizen")
# test user agent content
assert len(requests_mock_datadir.request_history) != 0
for request in requests_mock_datadir.request_history:
assert "User-Agent" in request.headers
user_agent = request.headers["User-Agent"]
assert "Software Heritage Lister" in user_agent
assert __version__ in user_agent
def test_lister_cgit_run_populates_last_update(requests_mock_datadir, swh_scheduler):
"""cgit lister returns last updated date"""
url = "https://git.tizen/cgit"
urls_without_date = [
f"https://git.tizen.org/cgit/{suffix_url}"
- for suffix_url in ["All-Projects", "All-Users", "Lock-Projects",]
+ for suffix_url in [
+ "All-Projects",
+ "All-Users",
+ "Lock-Projects",
+ ]
]
lister_cgit = CGitLister(swh_scheduler, url=url)
stats = lister_cgit.run()
expected_nb_origins = 16
assert stats == ListerStats(pages=3, origins=expected_nb_origins)
# test page parsing
scheduler_origins = swh_scheduler.get_listed_origins(
lister_cgit.lister_obj.id
).results
assert len(scheduler_origins) == expected_nb_origins
# test listed repositories
for listed_origin in scheduler_origins:
if listed_origin.url in urls_without_date:
assert listed_origin.last_update is None
else:
assert listed_origin.last_update is not None
@pytest.mark.parametrize(
"date_str,expected_date",
[
({}, None),
("unexpected date", None),
("2020-0140-10 10:10:10 (GMT)", None),
(
"2020-01-10 10:10:10 (GMT)",
datetime(
year=2020,
month=1,
day=10,
hour=10,
minute=10,
second=10,
tzinfo=timezone.utc,
),
),
(
"2019-08-04 05:10:41 +0100",
datetime(
year=2019,
month=8,
day=4,
hour=5,
minute=10,
second=41,
tzinfo=timezone(timedelta(hours=1)),
),
),
],
)
def test_lister_cgit_date_parsing(date_str, expected_date):
"""test cgit lister date parsing"""
repository = {"url": "url", "last_updated_date": date_str}
assert _parse_last_updated_date(repository) == expected_date
requests_mock_datadir_missing_url = requests_mock_datadir_factory(
- ignore_urls=["https://git.tizen/cgit/adaptation/ap_samsung/audio-hal-e4x12",]
+ ignore_urls=[
+ "https://git.tizen/cgit/adaptation/ap_samsung/audio-hal-e4x12",
+ ]
)
def test_lister_cgit_get_origin_from_repo_failing(
requests_mock_datadir_missing_url, swh_scheduler
):
url = "https://git.tizen/cgit/"
lister_cgit = CGitLister(swh_scheduler, url=url)
stats = lister_cgit.run()
expected_nb_origins = 15
assert stats == ListerStats(pages=3, origins=expected_nb_origins)
@pytest.mark.parametrize(
"credentials, expected_credentials",
[
(None, []),
({"key": "value"}, []),
(
{"cgit": {"tizen": [{"username": "user", "password": "pass"}]}},
[{"username": "user", "password": "pass"}],
),
],
)
def test_lister_cgit_instantiation_with_credentials(
credentials, expected_credentials, swh_scheduler
):
url = "https://git.tizen/cgit/"
lister = CGitLister(
swh_scheduler, url=url, instance="tizen", credentials=credentials
)
# Credentials are allowed in constructor
assert lister.credentials == expected_credentials
def test_lister_cgit_from_configfile(swh_scheduler_config, mocker):
load_from_envvar = mocker.patch("swh.lister.pattern.load_from_envvar")
load_from_envvar.return_value = {
"scheduler": {"cls": "local", **swh_scheduler_config},
"url": "https://git.tizen/cgit/",
"instance": "tizen",
"credentials": {},
}
lister = CGitLister.from_configfile()
assert lister.scheduler is not None
assert lister.credentials is not None
@pytest.mark.parametrize(
"url,base_git_url,expected_nb_origins",
[
("https://git.eclipse.org/c", "https://eclipse.org/r", 5),
("https://git.baserock.org/cgit/", "https://git.baserock.org/git/", 3),
("https://jff.email/cgit/", "git://jff.email/opt/git/", 6),
],
)
def test_lister_cgit_with_base_git_url(
url, base_git_url, expected_nb_origins, requests_mock_datadir, swh_scheduler
):
- """With base git url provided, listed urls should be the computed origin urls
-
- """
- lister_cgit = CGitLister(swh_scheduler, url=url, base_git_url=base_git_url,)
+ """With base git url provided, listed urls should be the computed origin urls"""
+ lister_cgit = CGitLister(
+ swh_scheduler,
+ url=url,
+ base_git_url=base_git_url,
+ )
stats = lister_cgit.run()
assert stats == ListerStats(pages=1, origins=expected_nb_origins)
# test page parsing
scheduler_origins = swh_scheduler.get_listed_origins(
lister_cgit.lister_obj.id
).results
assert len(scheduler_origins) == expected_nb_origins
# test listed repositories
for listed_origin in scheduler_origins:
assert listed_origin.visit_type == "git"
assert listed_origin.url.startswith(base_git_url)
assert (
listed_origin.url.startswith(url) is False
), f"url should be mapped to {base_git_url}"
def test_lister_cgit_get_pages_with_pages_and_retry(
requests_mock_datadir, requests_mock, datadir, mocker, swh_scheduler
):
url = "https://git.tizen/cgit/"
with open(os.path.join(datadir, "https_git.tizen/cgit,ofs=50"), "rb") as page:
requests_mock.get(
f"{url}?ofs=50",
[
{"content": None, "status_code": 429},
{"content": None, "status_code": 429},
{"content": page.read(), "status_code": 200},
],
)
lister_cgit = CGitLister(swh_scheduler, url=url)
mocker.patch.object(lister_cgit._get_and_parse.retry, "sleep")
repos: List[List[str]] = list(lister_cgit.get_pages())
flattened_repos = sum(repos, [])
# we should have 16 repos (listed on 3 pages)
assert len(repos) == 3
assert len(flattened_repos) == 16
diff --git a/swh/lister/cgit/tests/test_tasks.py b/swh/lister/cgit/tests/test_tasks.py
index b9a00cd..ce08c69 100644
--- a/swh/lister/cgit/tests/test_tasks.py
+++ b/swh/lister/cgit/tests/test_tasks.py
@@ -1,35 +1,36 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.lister.pattern import ListerStats
def test_cgit_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker):
res = swh_scheduler_celery_app.send_task("swh.lister.cgit.tasks.ping")
assert res
res.wait()
assert res.successful()
assert res.result == "OK"
def test_cgit_lister_task(
swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker
):
# setup the mocked CGitLister
lister = mocker.patch("swh.lister.cgit.tasks.CGitLister")
lister.from_configfile.return_value = lister
lister.run.return_value = ListerStats(pages=10, origins=500)
kwargs = dict(url="https://git.kernel.org/", instance="kernel", base_git_url=None)
res = swh_scheduler_celery_app.send_task(
- "swh.lister.cgit.tasks.CGitListerTask", kwargs=kwargs,
+ "swh.lister.cgit.tasks.CGitListerTask",
+ kwargs=kwargs,
)
assert res
res.wait()
assert res.successful()
lister.from_configfile.assert_called_once_with(**kwargs)
lister.run.assert_called_once_with()
diff --git a/swh/lister/cli.py b/swh/lister/cli.py
index 770eeeb..ec7655e 100644
--- a/swh/lister/cli.py
+++ b/swh/lister/cli.py
@@ -1,69 +1,72 @@
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from copy import deepcopy
import logging
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import os
import click
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
from swh.lister import SUPPORTED_LISTERS, get_lister
logger = logging.getLogger(__name__)
@swh_cli_group.group(name="lister", context_settings=CONTEXT_SETTINGS)
@click.option(
"--config-file",
"-C",
default=None,
- type=click.Path(exists=True, dir_okay=False,),
+ type=click.Path(
+ exists=True,
+ dir_okay=False,
+ ),
help="Configuration file.",
)
@click.pass_context
def lister(ctx, config_file):
"""Software Heritage Lister tools."""
from swh.core import config
ctx.ensure_object(dict)
if not config_file:
config_file = os.environ.get("SWH_CONFIG_FILENAME")
conf = config.read(config_file)
ctx.obj["config"] = conf
@lister.command(
name="run",
context_settings=CONTEXT_SETTINGS,
help="Trigger a full listing run for a particular forge "
"instance. The output of this listing results in "
'"oneshot" tasks in the scheduler db with a priority '
"defined by the user",
)
@click.option(
"--lister", "-l", help="Lister to run", type=click.Choice(SUPPORTED_LISTERS)
)
@click.argument("options", nargs=-1)
@click.pass_context
def run(ctx, lister, options):
from swh.scheduler.cli.utils import parse_options
config = deepcopy(ctx.obj["config"])
if options:
config.update(parse_options(options)[1])
get_lister(lister, **config).run()
if __name__ == "__main__":
lister()
diff --git a/swh/lister/cran/lister.py b/swh/lister/cran/lister.py
index 97d24cf..e9f937a 100644
--- a/swh/lister/cran/lister.py
+++ b/swh/lister/cran/lister.py
@@ -1,148 +1,149 @@
# Copyright (C) 2019-2021 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
import json
import logging
import subprocess
from typing import Dict, Iterator, List, Optional, Tuple
import pkg_resources
from swh.lister.pattern import CredentialsType, StatelessLister
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
logger = logging.getLogger(__name__)
CRAN_MIRROR = "https://cran.r-project.org"
PageType = List[Dict[str, str]]
class CRANLister(StatelessLister[PageType]):
"""
List all packages hosted on The Comprehensive R Archive Network.
"""
LISTER_NAME = "CRAN"
def __init__(
self,
scheduler: SchedulerInterface,
credentials: Optional[CredentialsType] = None,
):
super().__init__(
scheduler, url=CRAN_MIRROR, instance="cran", credentials=credentials
)
def get_pages(self) -> Iterator[PageType]:
"""
Yields a single page containing all CRAN packages info.
"""
yield read_cran_data()
def get_origins_from_page(self, page: PageType) -> Iterator[ListedOrigin]:
assert self.lister_obj.id is not None
seen_urls = set()
for package_info in page:
origin_url, artifact_url = compute_origin_urls(package_info)
if origin_url in seen_urls:
# prevent multiple listing of an origin,
# most recent version will be listed first
continue
seen_urls.add(origin_url)
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=origin_url,
visit_type="cran",
last_update=parse_packaged_date(package_info),
extra_loader_arguments={
"artifacts": [
{
"url": artifact_url,
"version": package_info["Version"],
"package": package_info["Package"],
}
]
},
)
def read_cran_data() -> List[Dict[str, str]]:
"""
Runs R script which uses inbuilt API to return a json response
containing data about the R packages.
Returns:
List of Dict about R packages. For example::
[
{
'Package': 'A3',
'Version': '1.0.0'
},
{
'Package': 'abbyyR',
'Version': '0.5.4'
},
...
]
"""
filepath = pkg_resources.resource_filename("swh.lister.cran", "list_all_packages.R")
logger.debug("Executing R script %s", filepath)
response = subprocess.run(filepath, stdout=subprocess.PIPE, shell=False)
return json.loads(response.stdout.decode("utf-8"))
def compute_origin_urls(package_info: Dict[str, str]) -> Tuple[str, str]:
"""Compute the package url from the repo dict.
Args:
repo: dict with key 'Package', 'Version'
Returns:
the tuple project url, artifact url
"""
package = package_info["Package"]
version = package_info["Version"]
origin_url = f"{CRAN_MIRROR}/package={package}"
artifact_url = f"{CRAN_MIRROR}/src/contrib/{package}_{version}.tar.gz"
return origin_url, artifact_url
def parse_packaged_date(package_info: Dict[str, str]) -> Optional[datetime]:
packaged_at_str = package_info.get("Packaged", "")
packaged_at = None
if packaged_at_str:
packaged_at_str = packaged_at_str.replace(" UTC", "")
# Packaged field possible formats:
# - "%Y-%m-%d %H:%M:%S[.%f] UTC; ",
# - "%a %b %d %H:%M:%S %Y; "
for date_format in (
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M:%S.%f",
"%a %b %d %H:%M:%S %Y",
):
try:
packaged_at = datetime.strptime(
- packaged_at_str.split(";")[0], date_format,
+ packaged_at_str.split(";")[0],
+ date_format,
).replace(tzinfo=timezone.utc)
break
except Exception:
continue
if packaged_at is None:
logger.debug(
"Could not parse %s package release date: %s",
package_info["Package"],
packaged_at_str,
)
return packaged_at
diff --git a/swh/lister/cran/tests/test_lister.py b/swh/lister/cran/tests/test_lister.py
index fa0b463..a0bebfc 100644
--- a/swh/lister/cran/tests/test_lister.py
+++ b/swh/lister/cran/tests/test_lister.py
@@ -1,158 +1,163 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
import json
from os import path
import pytest
from swh.lister.cran.lister import (
CRAN_MIRROR,
CRANLister,
compute_origin_urls,
parse_packaged_date,
)
def test_cran_compute_origin_urls():
pack = "something"
vers = "0.0.1"
- origin_url, artifact_url = compute_origin_urls({"Package": pack, "Version": vers,})
+ origin_url, artifact_url = compute_origin_urls(
+ {
+ "Package": pack,
+ "Version": vers,
+ }
+ )
assert origin_url == f"{CRAN_MIRROR}/package={pack}"
assert artifact_url == f"{CRAN_MIRROR}/src/contrib/{pack}_{vers}.tar.gz"
def test_cran_compute_origin_urls_failure():
for incomplete_repo in [{"Version": "0.0.1"}, {"Package": "package"}, {}]:
with pytest.raises(KeyError):
compute_origin_urls(incomplete_repo)
def test_parse_packaged_date():
common_date_format = {
"Package": "test",
"Packaged": "2017-04-26 11:36:15 UTC; Jonathan",
}
assert parse_packaged_date(common_date_format) == datetime(
year=2017, month=4, day=26, hour=11, minute=36, second=15, tzinfo=timezone.utc
)
common_date_format = {
"Package": "test",
"Packaged": "2017-04-26 11:36:15.123456 UTC; Jonathan",
}
assert parse_packaged_date(common_date_format) == datetime(
year=2017,
month=4,
day=26,
hour=11,
minute=36,
second=15,
microsecond=123456,
tzinfo=timezone.utc,
)
old_date_format = {
"Package": "test",
"Packaged": "Thu Mar 30 10:48:35 2006; hornik",
}
assert parse_packaged_date(old_date_format) == datetime(
year=2006, month=3, day=30, hour=10, minute=48, second=35, tzinfo=timezone.utc
)
invalid_date_format = {
"Package": "test",
"Packaged": "foo",
}
assert parse_packaged_date(invalid_date_format) is None
missing_date = {
"Package": "test",
}
assert parse_packaged_date(missing_date) is None
def test_cran_lister_cran(datadir, swh_scheduler, mocker):
with open(path.join(datadir, "list-r-packages.json")) as f:
cran_data = json.loads(f.read())
lister = CRANLister(swh_scheduler)
mock_cran = mocker.patch("swh.lister.cran.lister.read_cran_data")
mock_cran.return_value = cran_data
stats = lister.run()
assert stats.pages == 1
assert stats.origins == len(cran_data)
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
assert len(scheduler_origins) == len(cran_data)
for package_info in cran_data:
origin_url, artifact_url = compute_origin_urls(package_info)
filtered_origins = [o for o in scheduler_origins if o.url == origin_url]
assert len(filtered_origins) == 1
assert filtered_origins[0].extra_loader_arguments == {
"artifacts": [
{
"url": artifact_url,
"version": package_info["Version"],
"package": package_info["Package"],
}
]
}
filtered_origins[0].last_update == parse_packaged_date(package_info)
def test_cran_lister_duplicated_origins(datadir, swh_scheduler, mocker):
with open(path.join(datadir, "list-r-packages.json")) as f:
cran_data = json.loads(f.read())
lister = CRANLister(swh_scheduler)
mock_cran = mocker.patch("swh.lister.cran.lister.read_cran_data")
mock_cran.return_value = cran_data + cran_data
stats = lister.run()
assert stats.pages == 1
assert stats.origins == len(cran_data)
@pytest.mark.parametrize(
"credentials, expected_credentials",
[
(None, []),
({"key": "value"}, []),
(
{"CRAN": {"cran": [{"username": "user", "password": "pass"}]}},
[{"username": "user", "password": "pass"}],
),
],
)
def test_lister_cran_instantiation_with_credentials(
credentials, expected_credentials, swh_scheduler
):
lister = CRANLister(swh_scheduler, credentials=credentials)
# Credentials are allowed in constructor
assert lister.credentials == expected_credentials
def test_lister_cran_from_configfile(swh_scheduler_config, mocker):
load_from_envvar = mocker.patch("swh.lister.pattern.load_from_envvar")
load_from_envvar.return_value = {
"scheduler": {"cls": "local", **swh_scheduler_config},
"credentials": {},
}
lister = CRANLister.from_configfile()
assert lister.scheduler is not None
assert lister.credentials is not None
diff --git a/swh/lister/crates/lister.py b/swh/lister/crates/lister.py
index 5a95049..d0c6984 100644
--- a/swh/lister/crates/lister.py
+++ b/swh/lister/crates/lister.py
@@ -1,138 +1,145 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
from pathlib import Path
import subprocess
from typing import Any, Dict, Iterator, List
import iso8601
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from ..pattern import CredentialsType, StatelessLister
logger = logging.getLogger(__name__)
# Aliasing the page results returned by `get_pages` method from the lister.
CratesListerPage = List[Dict[str, Any]]
class CratesLister(StatelessLister[CratesListerPage]):
"""List origins from the "crates.io" forge.
It basically fetches https://github.com/rust-lang/crates.io-index.git to a
temp directory and then walks through each file to get the crate's info.
"""
# Part of the lister API, that identifies this lister
LISTER_NAME = "crates"
# (Optional) CVS type of the origins listed by this lister, if constant
VISIT_TYPE = "rust-crate"
INSTANCE = "crates"
INDEX_REPOSITORY_URL = "https://github.com/rust-lang/crates.io-index.git"
DESTINATION_PATH = Path("/tmp/crates.io-index")
CRATE_FILE_URL_PATTERN = (
"https://static.crates.io/crates/{crate}/{crate}-{version}.crate"
)
def __init__(
- self, scheduler: SchedulerInterface, credentials: CredentialsType = None,
+ self,
+ scheduler: SchedulerInterface,
+ credentials: CredentialsType = None,
):
super().__init__(
scheduler=scheduler,
credentials=credentials,
url=self.INDEX_REPOSITORY_URL,
instance=self.INSTANCE,
)
def get_index_repository(self) -> None:
"""Get crates.io-index repository up to date running git command."""
subprocess.check_call(
- ["git", "clone", self.INDEX_REPOSITORY_URL, self.DESTINATION_PATH,]
+ [
+ "git",
+ "clone",
+ self.INDEX_REPOSITORY_URL,
+ self.DESTINATION_PATH,
+ ]
)
def get_crates_index(self) -> List[Path]:
"""Build a sorted list of file paths excluding dotted directories and
dotted files.
Each file path corresponds to a crate that lists all available
versions.
"""
crates_index = sorted(
path
for path in self.DESTINATION_PATH.rglob("*")
if not any(part.startswith(".") for part in path.parts)
and path.is_file()
and path != self.DESTINATION_PATH / "config.json"
)
return crates_index
def get_pages(self) -> Iterator[CratesListerPage]:
"""Yield an iterator sorted by name in ascending order of pages.
Each page is a list of crate versions with:
- name: Name of the crate
- version: Version
- checksum: Checksum
- crate_file: Url of the crate file
- last_update: Date of the last commit of the corresponding index
file
"""
# Fetch crates.io index repository
self.get_index_repository()
# Get a list of all crates files from the index repository
crates_index = self.get_crates_index()
logger.debug("found %s crates in crates_index", len(crates_index))
for crate in crates_index:
page = []
# %cI is for strict iso8601 date formatting
last_update_str = subprocess.check_output(
["git", "log", "-1", "--pretty=format:%cI", str(crate)],
cwd=self.DESTINATION_PATH,
)
last_update = iso8601.parse_date(last_update_str.decode().strip())
with crate.open("rb") as current_file:
for line in current_file:
data = json.loads(line)
# pick only the data we need
page.append(
dict(
name=data["name"],
version=data["vers"],
checksum=data["cksum"],
crate_file=self.CRATE_FILE_URL_PATTERN.format(
crate=data["name"], version=data["vers"]
),
last_update=last_update,
)
)
yield page
def get_origins_from_page(self, page: CratesListerPage) -> Iterator[ListedOrigin]:
"""Iterate on all crate pages and yield ListedOrigin instances."""
assert self.lister_obj.id is not None
for version in page:
yield ListedOrigin(
lister_id=self.lister_obj.id,
visit_type=self.VISIT_TYPE,
url=version["crate_file"],
last_update=version["last_update"],
extra_loader_arguments={
"name": version["name"],
"version": version["version"],
"checksum": version["checksum"],
},
)
diff --git a/swh/lister/debian/tests/test_lister.py b/swh/lister/debian/tests/test_lister.py
index 695d7f1..6f2711d 100644
--- a/swh/lister/debian/tests/test_lister.py
+++ b/swh/lister/debian/tests/test_lister.py
@@ -1,259 +1,265 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from datetime import datetime
from email.utils import formatdate, parsedate_to_datetime
import os
from pathlib import Path
from typing import Dict, List, Set, Tuple
from debian.deb822 import Sources
import pytest
from swh.lister.debian.lister import (
DebianLister,
DebianOrigin,
PkgName,
PkgVersion,
Suite,
)
from swh.scheduler.interface import SchedulerInterface
# Those tests use sample debian Sources files whose content has been extracted
# from the real Sources files from stretch, buster and bullseye suite.
# They contain the following package source info
# - stretch:
# * dh-elpa (versions: 0.0.18, 0.0.19, 0.0.20),
# * git (version: 1:2.11.0-3+deb9u7)
# - buster:
# * git (version: 1:2.20.1-2+deb10u3),
# * subversion (version: 1.10.4-1+deb10u1)
# - bullseye:
# * git (version: 1:2.29.2-1)
# * subversion (version: 1.14.0-3)
# * hg-git (version: 0.9.0-2)
_mirror_url = "http://deb.debian.org/debian"
_suites = ["stretch", "buster", "bullseye"]
_components = ["main", "foo"]
_last_modified = {}
SourcesText = str
def _debian_sources_content(datadir: str, suite: Suite) -> SourcesText:
return Path(datadir, f"Sources_{suite}").read_text()
@pytest.fixture
def debian_sources(datadir: str) -> Dict[Suite, SourcesText]:
return {suite: _debian_sources_content(datadir, suite) for suite in _suites}
# suite -> package name -> list of versions
DebianSuitePkgSrcInfo = Dict[Suite, Dict[PkgName, List[Sources]]]
def _init_test(
swh_scheduler: SchedulerInterface,
debian_sources: Dict[Suite, SourcesText],
requests_mock,
) -> Tuple[DebianLister, DebianSuitePkgSrcInfo]:
lister = DebianLister(
scheduler=swh_scheduler,
mirror_url=_mirror_url,
suites=list(debian_sources.keys()),
components=_components,
)
suite_pkg_info: DebianSuitePkgSrcInfo = {}
for i, (suite, sources) in enumerate(debian_sources.items()):
# ensure to generate a different date for each suite
last_modified = formatdate(timeval=datetime.now().timestamp() + i, usegmt=True)
suite_pkg_info[suite] = defaultdict(list)
for pkg_src in Sources.iter_paragraphs(sources):
suite_pkg_info[suite][pkg_src["Package"]].append(pkg_src)
# backup package last update date
global _last_modified
_last_modified[pkg_src["Package"]] = last_modified
for idx_url, compression in lister.debian_index_urls(suite, _components[0]):
if compression:
requests_mock.get(idx_url, status_code=404)
else:
requests_mock.get(
- idx_url, text=sources, headers={"Last-Modified": last_modified},
+ idx_url,
+ text=sources,
+ headers={"Last-Modified": last_modified},
)
for idx_url, _ in lister.debian_index_urls(suite, _components[1]):
requests_mock.get(idx_url, status_code=404)
return lister, suite_pkg_info
def _check_listed_origins(
swh_scheduler: SchedulerInterface,
lister: DebianLister,
suite_pkg_info: DebianSuitePkgSrcInfo,
lister_previous_state: Dict[PkgName, Set[PkgVersion]],
) -> Set[DebianOrigin]:
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
origin_urls = set()
# iterate on each debian suite for the main component
for suite, pkg_info in suite_pkg_info.items():
# iterate on each package
for package_name, pkg_srcs in pkg_info.items():
# iterate on each package version info
for pkg_src in pkg_srcs:
# build package version key
package_version_key = f"{suite}/{_components[0]}/{pkg_src['Version']}"
# if package or its version not previously listed, those info should
# have been sent to the scheduler database
if (
package_name not in lister_previous_state
or package_version_key not in lister_previous_state[package_name]
):
# build origin url
origin_url = lister.origin_url_for_package(package_name)
origin_urls.add(origin_url)
# get ListerOrigin object from scheduler database
filtered_origins = [
scheduler_origin
for scheduler_origin in scheduler_origins
if scheduler_origin.url == origin_url
]
assert filtered_origins
expected_last_update = parsedate_to_datetime(
_last_modified[pkg_src["Package"]]
)
assert filtered_origins[0].last_update == expected_last_update
packages = filtered_origins[0].extra_loader_arguments["packages"]
# check the version info are available
assert package_version_key in packages
# check package files URIs are available
for file in pkg_src["files"]:
filename = file["name"]
file_uri = os.path.join(
_mirror_url, pkg_src["Directory"], filename
)
package_files = packages[package_version_key]["files"]
assert filename in package_files
assert package_files[filename]["uri"] == file_uri
# check listed package version is in lister state
assert package_name in lister.state.package_versions
assert (
package_version_key
in lister.state.package_versions[package_name]
)
return origin_urls
def test_lister_debian_all_suites(
swh_scheduler: SchedulerInterface,
debian_sources: Dict[Suite, SourcesText],
requests_mock,
):
"""
Simulate a full listing of main component packages for all debian suites.
"""
lister, suite_pkg_info = _init_test(swh_scheduler, debian_sources, requests_mock)
stats = lister.run()
origin_urls = _check_listed_origins(
swh_scheduler, lister, suite_pkg_info, lister_previous_state={}
)
assert stats.pages == len(_suites) * len(_components)
assert stats.origins == len(origin_urls)
stats = lister.run()
assert stats.pages == len(_suites) * len(_components)
assert stats.origins == 0
@pytest.mark.parametrize(
"suites_params",
- [[_suites[:1]], [_suites[:1], _suites[:2]], [_suites[:1], _suites[:2], _suites],],
+ [
+ [_suites[:1]],
+ [_suites[:1], _suites[:2]],
+ [_suites[:1], _suites[:2], _suites],
+ ],
)
def test_lister_debian_updated_packages(
swh_scheduler: SchedulerInterface,
debian_sources: Dict[Suite, SourcesText],
requests_mock,
suites_params: List[Suite],
):
"""
Simulate incremental listing of main component packages by adding new suite
to process between each listing operation.
"""
lister_previous_state: Dict[PkgName, Set[PkgVersion]] = {}
for idx, suites in enumerate(suites_params):
sources = {suite: debian_sources[suite] for suite in suites}
lister, suite_pkg_info = _init_test(swh_scheduler, sources, requests_mock)
stats = lister.run()
origin_urls = _check_listed_origins(
swh_scheduler,
lister,
suite_pkg_info,
lister_previous_state=lister_previous_state,
)
assert stats.pages == len(sources) * len(_components)
assert stats.origins == len(origin_urls)
lister_previous_state = lister.state.package_versions
# only new packages or packages with new versions should be listed
if len(suites) > 1 and idx < len(suites) - 1:
assert stats.origins == 0
else:
assert stats.origins != 0
@pytest.mark.parametrize(
"credentials, expected_credentials",
[
(None, []),
({"key": "value"}, []),
(
{"debian": {"Debian": [{"username": "user", "password": "pass"}]}},
[{"username": "user", "password": "pass"}],
),
],
)
def test_lister_debian_instantiation_with_credentials(
credentials, expected_credentials, swh_scheduler
):
lister = DebianLister(swh_scheduler, credentials=credentials)
# Credentials are allowed in constructor
assert lister.credentials == expected_credentials
def test_lister_debian_from_configfile(swh_scheduler_config, mocker):
load_from_envvar = mocker.patch("swh.lister.pattern.load_from_envvar")
load_from_envvar.return_value = {
"scheduler": {"cls": "local", **swh_scheduler_config},
"credentials": {},
}
lister = DebianLister.from_configfile()
assert lister.scheduler is not None
assert lister.credentials is not None
diff --git a/swh/lister/gitea/lister.py b/swh/lister/gitea/lister.py
index 19ca4aa..25bea4e 100644
--- a/swh/lister/gitea/lister.py
+++ b/swh/lister/gitea/lister.py
@@ -1,138 +1,142 @@
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import random
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import urljoin
import iso8601
import requests
from tenacity.before_sleep import before_sleep_log
from swh.lister.utils import throttling_retry
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from .. import USER_AGENT
from ..pattern import CredentialsType, StatelessLister
logger = logging.getLogger(__name__)
RepoListPage = List[Dict[str, Any]]
class GiteaLister(StatelessLister[RepoListPage]):
"""List origins from Gitea.
Gitea API documentation: https://try.gitea.io/api/swagger
The API does pagination and provides navigation URLs through the 'Link' header.
The default value for page size is the maximum value observed on the instances
accessible at https://try.gitea.io/api/v1/ and https://codeberg.org/api/v1/."""
LISTER_NAME = "gitea"
REPO_LIST_PATH = "repos/search"
def __init__(
self,
scheduler: SchedulerInterface,
url: str,
instance: Optional[str] = None,
api_token: Optional[str] = None,
page_size: int = 50,
credentials: CredentialsType = None,
):
super().__init__(
- scheduler=scheduler, credentials=credentials, url=url, instance=instance,
+ scheduler=scheduler,
+ credentials=credentials,
+ url=url,
+ instance=instance,
)
self.query_params = {
"sort": "id",
"order": "asc",
"limit": page_size,
"page": 1,
}
self.session = requests.Session()
self.session.headers.update(
- {"Accept": "application/json", "User-Agent": USER_AGENT,}
+ {
+ "Accept": "application/json",
+ "User-Agent": USER_AGENT,
+ }
)
if api_token is None:
if len(self.credentials) > 0:
cred = random.choice(self.credentials)
username = cred.get("username")
api_token = cred["password"]
logger.warning(
"Using authentication token from user %s", username or "???"
)
else:
logger.warning(
"No authentication token set in configuration, using anonymous mode"
)
if api_token:
self.session.headers["Authorization"] = "Token %s" % api_token
@throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING))
def page_request(self, url: str, params: Dict[str, Any]) -> requests.Response:
logger.info("Fetching URL %s with params %s", url, params)
response = self.session.get(url, params=params)
if response.status_code != 200:
logger.warning(
"Unexpected HTTP status code %s on %s: %s",
response.status_code,
response.url,
response.content,
)
response.raise_for_status()
return response
@classmethod
def results_simplified(cls, body: Dict[str, RepoListPage]) -> RepoListPage:
fields_filter = ["id", "clone_url", "updated_at"]
return [{k: r[k] for k in fields_filter} for r in body["data"]]
def get_pages(self) -> Iterator[RepoListPage]:
# base with trailing slash, path without leading slash for urljoin
url: str = urljoin(self.url, self.REPO_LIST_PATH)
response = self.page_request(url, self.query_params)
while True:
page_results = self.results_simplified(response.json())
yield page_results
assert len(response.links) > 0, "API changed: no Link header found"
if "next" in response.links:
url = response.links["next"]["url"]
else:
# last page
break
response = self.page_request(url, {})
def get_origins_from_page(self, page: RepoListPage) -> Iterator[ListedOrigin]:
- """Convert a page of Gitea repositories into a list of ListedOrigins.
-
- """
+ """Convert a page of Gitea repositories into a list of ListedOrigins."""
assert self.lister_obj.id is not None
for repo in page:
last_update = iso8601.parse_date(repo["updated_at"])
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=repo["clone_url"],
visit_type="git",
last_update=last_update,
)
diff --git a/swh/lister/gitea/tests/test_tasks.py b/swh/lister/gitea/tests/test_tasks.py
index 458bc7e..a204cb1 100644
--- a/swh/lister/gitea/tests/test_tasks.py
+++ b/swh/lister/gitea/tests/test_tasks.py
@@ -1,59 +1,61 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from unittest.mock import patch
from swh.lister.pattern import ListerStats
def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker):
res = swh_scheduler_celery_app.send_task("swh.lister.gitea.tasks.ping")
assert res
res.wait()
assert res.successful()
assert res.result == "OK"
@patch("swh.lister.gitea.tasks.GiteaLister")
def test_full_listing(lister, swh_scheduler_celery_app, swh_scheduler_celery_worker):
lister.from_configfile.return_value = lister
lister.run.return_value = ListerStats(pages=10, origins=500)
kwargs = dict(url="https://try.gitea.io/api/v1")
res = swh_scheduler_celery_app.send_task(
- "swh.lister.gitea.tasks.FullGiteaRelister", kwargs=kwargs,
+ "swh.lister.gitea.tasks.FullGiteaRelister",
+ kwargs=kwargs,
)
assert res
res.wait()
assert res.successful()
actual_kwargs = dict(**kwargs, instance=None, api_token=None, page_size=None)
lister.from_configfile.assert_called_once_with(**actual_kwargs)
lister.run.assert_called_once_with()
@patch("swh.lister.gitea.tasks.GiteaLister")
def test_full_listing_params(
lister, swh_scheduler_celery_app, swh_scheduler_celery_worker
):
lister.from_configfile.return_value = lister
lister.run.return_value = ListerStats(pages=10, origins=500)
kwargs = dict(
url="https://0xacab.org/api/v4",
instance="0xacab",
api_token="test",
page_size=50,
)
res = swh_scheduler_celery_app.send_task(
- "swh.lister.gitea.tasks.FullGiteaRelister", kwargs=kwargs,
+ "swh.lister.gitea.tasks.FullGiteaRelister",
+ kwargs=kwargs,
)
assert res
res.wait()
assert res.successful()
lister.from_configfile.assert_called_once_with(**kwargs)
lister.run.assert_called_once_with()
diff --git a/swh/lister/github/lister.py b/swh/lister/github/lister.py
index f4246a2..2e051f1 100644
--- a/swh/lister/github/lister.py
+++ b/swh/lister/github/lister.py
@@ -1,357 +1,357 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import asdict, dataclass
import datetime
import logging
import random
import time
from typing import Any, Dict, Iterator, List, Optional, Set
from urllib.parse import parse_qs, urlparse
import iso8601
import requests
from tenacity import (
retry,
retry_any,
retry_if_exception_type,
retry_if_result,
wait_exponential,
)
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from .. import USER_AGENT
from ..pattern import CredentialsType, Lister
logger = logging.getLogger(__name__)
def init_session(session: Optional[requests.Session] = None) -> requests.Session:
"""Initialize a requests session with the proper headers for requests to
GitHub."""
if not session:
session = requests.Session()
session.headers.update(
{"Accept": "application/vnd.github.v3+json", "User-Agent": USER_AGENT}
)
return session
class RateLimited(Exception):
def __init__(self, response):
self.reset_time: Optional[int]
# Figure out how long we need to sleep because of that rate limit
ratelimit_reset = response.headers.get("X-Ratelimit-Reset")
retry_after = response.headers.get("Retry-After")
if ratelimit_reset is not None:
self.reset_time = int(ratelimit_reset)
elif retry_after is not None:
self.reset_time = int(time.time()) + int(retry_after) + 1
else:
logger.warning(
"Received a rate-limit-like status code %s, but no rate-limit "
"headers set. Response content: %s",
response.status_code,
response.content,
)
self.reset_time = None
self.response = response
@retry(
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_any(
# ChunkedEncodingErrors happen when the TLS connection gets reset, e.g.
# when running the lister on a connection with high latency
retry_if_exception_type(requests.exceptions.ChunkedEncodingError),
# 502 status codes happen for a Server Error, sometimes
retry_if_result(lambda r: r.status_code == 502),
),
)
def github_request(
url: str, token: Optional[str] = None, session: Optional[requests.Session] = None
) -> requests.Response:
session = init_session(session)
headers = {}
if token:
headers["Authorization"] = f"token {token}"
response = session.get(url, headers=headers)
anonymous = token is None and "Authorization" not in session.headers
if (
# GitHub returns inconsistent status codes between unauthenticated
# rate limit and authenticated rate limits. Handle both.
response.status_code == 429
or (anonymous and response.status_code == 403)
):
raise RateLimited(response)
return response
@dataclass
class GitHubListerState:
"""State of the GitHub lister"""
last_seen_id: int = 0
"""Numeric id of the last repository listed on an incremental pass"""
class GitHubLister(Lister[GitHubListerState, List[Dict[str, Any]]]):
"""List origins from GitHub.
By default, the lister runs in incremental mode: it lists all repositories,
starting with the `last_seen_id` stored in the scheduler backend.
Providing the `first_id` and `last_id` arguments enables the "relisting" mode: in
that mode, the lister finds the origins present in the range **excluding**
`first_id` and **including** `last_id`. In this mode, the lister can overrun the
`last_id`: it will always record all the origins seen in a given page. As the lister
is fully idempotent, this is not a practical problem. Once relisting completes, the
lister state in the scheduler backend is not updated.
When the config contains a set of credentials, we shuffle this list at the beginning
of the listing. To follow GitHub's `abuse rate limit policy`_, we keep using the
same token over and over again, until its rate limit runs out. Once that happens, we
switch to the next token over in our shuffled list.
When a request fails with a rate limit exception for all tokens, we pause the
listing until the largest value for X-Ratelimit-Reset over all tokens.
When the credentials aren't set in the lister config, the lister can run in
anonymous mode too (e.g. for testing purposes).
.. _abuse rate limit policy: https://developer.github.com/v3/guides/best-practices-for-integrators/#dealing-with-abuse-rate-limits
Args:
first_id: the id of the first repo to list
last_id: stop listing after seeing a repo with an id higher than this value.
- """ # noqa: E501
+ """ # noqa: B950
LISTER_NAME = "github"
API_URL = "https://api.github.com/repositories"
PAGE_SIZE = 1000
def __init__(
self,
scheduler: SchedulerInterface,
credentials: CredentialsType = None,
first_id: Optional[int] = None,
last_id: Optional[int] = None,
):
super().__init__(
scheduler=scheduler,
credentials=credentials,
url=self.API_URL,
instance="github",
)
self.first_id = first_id
self.last_id = last_id
self.relisting = self.first_id is not None or self.last_id is not None
self.session = init_session()
random.shuffle(self.credentials)
self.anonymous = not self.credentials
if self.anonymous:
logger.warning("No tokens set in configuration, using anonymous mode")
self.token_index = -1
self.current_user: Optional[str] = None
if not self.anonymous:
# Initialize the first token value in the session headers
self.set_next_session_token()
def set_next_session_token(self) -> None:
"""Update the current authentication token with the next one in line."""
self.token_index = (self.token_index + 1) % len(self.credentials)
auth = self.credentials[self.token_index]
if "password" in auth:
token = auth["password"]
else:
token = auth["token"]
self.current_user = auth["username"]
logger.debug("Using authentication token for user %s", self.current_user)
self.session.headers.update({"Authorization": f"token {token}"})
def state_from_dict(self, d: Dict[str, Any]) -> GitHubListerState:
return GitHubListerState(**d)
def state_to_dict(self, state: GitHubListerState) -> Dict[str, Any]:
return asdict(state)
def get_pages(self) -> Iterator[List[Dict[str, Any]]]:
current_id = 0
if self.first_id is not None:
current_id = self.first_id
elif self.state is not None:
current_id = self.state.last_seen_id
current_url = f"{self.API_URL}?since={current_id}&per_page={self.PAGE_SIZE}"
while self.last_id is None or current_id < self.last_id:
logger.debug("Getting page %s", current_url)
# The following for/else loop handles rate limiting; if successful,
# it provides the rest of the function with a `response` object.
#
# If all tokens are rate-limited, we sleep until the reset time,
# then `continue` into another iteration of the outer while loop,
# attempting to get data from the same URL again.
max_attempts = 1 if self.anonymous else len(self.credentials)
reset_times: Dict[int, int] = {} # token index -> time
for attempt in range(max_attempts):
try:
response = github_request(current_url, session=self.session)
break
except RateLimited as e:
reset_info = "(unknown reset)"
if e.reset_time is not None:
reset_times[self.token_index] = e.reset_time
reset_info = "(resetting in %ss)" % (e.reset_time - time.time())
if not self.anonymous:
logger.info(
"Rate limit exhausted for current user %s %s",
self.current_user,
reset_info,
)
# Use next token in line
self.set_next_session_token()
# Wait one second to avoid triggering GitHub's abuse rate limits
time.sleep(1)
else:
# All tokens have been rate-limited. What do we do?
if not reset_times:
logger.warning(
"No X-Ratelimit-Reset value found in responses for any token; "
"Giving up."
)
break
sleep_time = max(reset_times.values()) - time.time() + 1
logger.info(
"Rate limits exhausted for all tokens. Sleeping for %f seconds.",
sleep_time,
)
time.sleep(sleep_time)
# This goes back to the outer page-by-page loop, doing one more
# iteration on the same page
continue
# We've successfully retrieved a (non-ratelimited) `response`. We
# still need to check it for validity.
if response.status_code != 200:
logger.warning(
"Got unexpected status_code %s: %s",
response.status_code,
response.content,
)
break
yield response.json()
if "next" not in response.links:
# No `next` link, we've reached the end of the world
logger.debug(
"No next link found in the response headers, all caught up"
)
break
# GitHub strongly advises to use the next link directly. We still
# parse it to get the id of the last repository we've reached so
# far.
next_url = response.links["next"]["url"]
parsed_url = urlparse(next_url)
if not parsed_url.query:
logger.warning("Failed to parse url %s", next_url)
break
parsed_query = parse_qs(parsed_url.query)
current_id = int(parsed_query["since"][0])
current_url = next_url
def get_origins_from_page(
self, page: List[Dict[str, Any]]
) -> Iterator[ListedOrigin]:
"""Convert a page of GitHub repositories into a list of ListedOrigins.
This records the html_url, as well as the pushed_at value if it exists.
"""
assert self.lister_obj.id is not None
seen_in_page: Set[str] = set()
for repo in page:
if not repo:
# null repositories in listings happen sometimes...
continue
if repo["html_url"] in seen_in_page:
continue
seen_in_page.add(repo["html_url"])
pushed_at_str = repo.get("pushed_at")
pushed_at: Optional[datetime.datetime] = None
if pushed_at_str:
pushed_at = iso8601.parse_date(pushed_at_str)
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=repo["html_url"],
visit_type="git",
last_update=pushed_at,
)
def commit_page(self, page: List[Dict[str, Any]]):
"""Update the currently stored state using the latest listed page"""
if self.relisting:
# Don't update internal state when relisting
return
if not page:
# Sometimes, when you reach the end of the world, GitHub returns an empty
# page of repositories
return
last_id = page[-1]["id"]
if last_id > self.state.last_seen_id:
self.state.last_seen_id = last_id
def finalize(self):
if self.relisting:
return
# Pull fresh lister state from the scheduler backend
scheduler_state = self.get_state_from_scheduler()
# Update the lister state in the backend only if the last seen id of
# the current run is higher than that stored in the database.
if self.state.last_seen_id > scheduler_state.last_seen_id:
self.updated = True
diff --git a/swh/lister/gitlab/tests/test_lister.py b/swh/lister/gitlab/tests/test_lister.py
index 10144a7..80650b8 100644
--- a/swh/lister/gitlab/tests/test_lister.py
+++ b/swh/lister/gitlab/tests/test_lister.py
@@ -1,351 +1,357 @@
# Copyright (C) 2017-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
from pathlib import Path
from typing import Dict, List
import pytest
from requests.status_codes import codes
from swh.lister import USER_AGENT
from swh.lister.gitlab.lister import GitLabLister, _parse_id_after
from swh.lister.pattern import ListerStats
from swh.lister.tests.test_utils import assert_sleep_calls
from swh.lister.utils import WAIT_EXP_BASE
logger = logging.getLogger(__name__)
def api_url(instance: str) -> str:
return f"https://{instance}/api/v4/"
def _match_request(request):
return request.headers.get("User-Agent") == USER_AGENT
def test_lister_gitlab(datadir, swh_scheduler, requests_mock):
- """Gitlab lister supports full listing
-
- """
+ """Gitlab lister supports full listing"""
instance = "gitlab.com"
lister = GitLabLister(swh_scheduler, url=api_url(instance), instance=instance)
response = gitlab_page_response(datadir, instance, 1)
requests_mock.get(
- lister.page_url(), [{"json": response}], additional_matcher=_match_request,
+ lister.page_url(),
+ [{"json": response}],
+ additional_matcher=_match_request,
)
listed_result = lister.run()
expected_nb_origins = len(response)
assert listed_result == ListerStats(pages=1, origins=expected_nb_origins)
scheduler_origins = lister.scheduler.get_listed_origins(
lister.lister_obj.id
).results
assert len(scheduler_origins) == expected_nb_origins
for listed_origin in scheduler_origins:
assert listed_origin.visit_type == "git"
assert listed_origin.url.startswith(f"https://{instance}")
assert listed_origin.last_update is not None
def test_lister_gitlab_heptapod(datadir, swh_scheduler, requests_mock):
- """Heptapod lister happily lists hg, hg_git as hg and git origins
-
- """
+ """Heptapod lister happily lists hg, hg_git as hg and git origins"""
name = "heptapod"
instance = "foss.heptapod.net"
lister = GitLabLister(
swh_scheduler, url=api_url(instance), name=name, instance=instance
)
assert lister.LISTER_NAME == name
response = gitlab_page_response(datadir, instance, 1)
requests_mock.get(
- lister.page_url(), [{"json": response}], additional_matcher=_match_request,
+ lister.page_url(),
+ [{"json": response}],
+ additional_matcher=_match_request,
)
listed_result = lister.run()
expected_nb_origins = len(response)
for entry in response:
assert entry["vcs_type"] in ("hg", "hg_git")
assert listed_result == ListerStats(pages=1, origins=expected_nb_origins)
scheduler_origins = lister.scheduler.get_listed_origins(
lister.lister_obj.id
).results
assert len(scheduler_origins) == expected_nb_origins
for listed_origin in scheduler_origins:
assert listed_origin.visit_type == "hg"
assert listed_origin.url.startswith(f"https://{instance}")
assert listed_origin.last_update is not None
def gitlab_page_response(datadir, instance: str, id_after: int) -> List[Dict]:
"""Return list of repositories (out of test dataset)"""
datapath = Path(datadir, f"https_{instance}", f"api_response_page{id_after}.json")
return json.loads(datapath.read_text()) if datapath.exists else []
def test_lister_gitlab_with_pages(swh_scheduler, requests_mock, datadir):
- """Gitlab lister supports pagination
-
- """
+ """Gitlab lister supports pagination"""
instance = "gite.lirmm.fr"
lister = GitLabLister(swh_scheduler, url=api_url(instance))
response1 = gitlab_page_response(datadir, instance, 1)
response2 = gitlab_page_response(datadir, instance, 2)
requests_mock.get(
lister.page_url(),
[{"json": response1, "headers": {"Link": f"<{lister.page_url(2)}>; rel=next"}}],
additional_matcher=_match_request,
)
requests_mock.get(
- lister.page_url(2), [{"json": response2}], additional_matcher=_match_request,
+ lister.page_url(2),
+ [{"json": response2}],
+ additional_matcher=_match_request,
)
listed_result = lister.run()
expected_nb_origins = len(response1) + len(response2)
assert listed_result == ListerStats(pages=2, origins=expected_nb_origins)
scheduler_origins = lister.scheduler.get_listed_origins(
lister.lister_obj.id
).results
assert len(scheduler_origins) == expected_nb_origins
for listed_origin in scheduler_origins:
assert listed_origin.visit_type == "git"
assert listed_origin.url.startswith(f"https://{instance}")
assert listed_origin.last_update is not None
def test_lister_gitlab_incremental(swh_scheduler, requests_mock, datadir):
- """Gitlab lister supports incremental visits
-
- """
+ """Gitlab lister supports incremental visits"""
instance = "gite.lirmm.fr"
url = api_url(instance)
lister = GitLabLister(swh_scheduler, url=url, instance=instance, incremental=True)
url_page1 = lister.page_url()
response1 = gitlab_page_response(datadir, instance, 1)
url_page2 = lister.page_url(2)
response2 = gitlab_page_response(datadir, instance, 2)
url_page3 = lister.page_url(3)
response3 = gitlab_page_response(datadir, instance, 3)
requests_mock.get(
url_page1,
[{"json": response1, "headers": {"Link": f"<{url_page2}>; rel=next"}}],
additional_matcher=_match_request,
)
requests_mock.get(
- url_page2, [{"json": response2}], additional_matcher=_match_request,
+ url_page2,
+ [{"json": response2}],
+ additional_matcher=_match_request,
)
listed_result = lister.run()
expected_nb_origins = len(response1) + len(response2)
assert listed_result == ListerStats(pages=2, origins=expected_nb_origins)
assert lister.state.last_seen_next_link == url_page2
lister2 = GitLabLister(swh_scheduler, url=url, instance=instance, incremental=True)
# Lister will start back at the last stop
requests_mock.get(
url_page2,
[{"json": response2, "headers": {"Link": f"<{url_page3}>; rel=next"}}],
additional_matcher=_match_request,
)
requests_mock.get(
- url_page3, [{"json": response3}], additional_matcher=_match_request,
+ url_page3,
+ [{"json": response3}],
+ additional_matcher=_match_request,
)
listed_result2 = lister2.run()
assert listed_result2 == ListerStats(
pages=2, origins=len(response2) + len(response3)
)
assert lister2.state.last_seen_next_link == url_page3
assert lister.lister_obj.id == lister2.lister_obj.id
scheduler_origins = lister2.scheduler.get_listed_origins(
lister2.lister_obj.id
).results
assert len(scheduler_origins) == len(response1) + len(response2) + len(response3)
for listed_origin in scheduler_origins:
assert listed_origin.visit_type == "git"
assert listed_origin.url.startswith(f"https://{instance}")
assert listed_origin.last_update is not None
def test_lister_gitlab_rate_limit(swh_scheduler, requests_mock, datadir, mocker):
- """Gitlab lister supports rate-limit
-
- """
+ """Gitlab lister supports rate-limit"""
instance = "gite.lirmm.fr"
url = api_url(instance)
lister = GitLabLister(swh_scheduler, url=url, instance=instance)
url_page1 = lister.page_url()
response1 = gitlab_page_response(datadir, instance, 1)
url_page2 = lister.page_url(2)
response2 = gitlab_page_response(datadir, instance, 2)
requests_mock.get(
url_page1,
[{"json": response1, "headers": {"Link": f"<{url_page2}>; rel=next"}}],
additional_matcher=_match_request,
)
requests_mock.get(
url_page2,
[
# rate limited twice
{"status_code": codes.forbidden, "headers": {"RateLimit-Remaining": "0"}},
{"status_code": codes.forbidden, "headers": {"RateLimit-Remaining": "0"}},
# ok
{"json": response2},
],
additional_matcher=_match_request,
)
# To avoid this test being too slow, we mock sleep within the retry behavior
mock_sleep = mocker.patch.object(lister.get_page_result.retry, "sleep")
listed_result = lister.run()
expected_nb_origins = len(response1) + len(response2)
assert listed_result == ListerStats(pages=2, origins=expected_nb_origins)
assert_sleep_calls(mocker, mock_sleep, [1, WAIT_EXP_BASE])
@pytest.mark.parametrize("status_code", [502, 503, 520])
def test_lister_gitlab_http_errors(
swh_scheduler, requests_mock, datadir, mocker, status_code
):
- """Gitlab lister should retry requests when encountering HTTP 50x errors
-
- """
+ """Gitlab lister should retry requests when encountering HTTP 50x errors"""
instance = "gite.lirmm.fr"
url = api_url(instance)
lister = GitLabLister(swh_scheduler, url=url, instance=instance)
url_page1 = lister.page_url()
response1 = gitlab_page_response(datadir, instance, 1)
url_page2 = lister.page_url(2)
response2 = gitlab_page_response(datadir, instance, 2)
requests_mock.get(
url_page1,
[{"json": response1, "headers": {"Link": f"<{url_page2}>; rel=next"}}],
additional_matcher=_match_request,
)
requests_mock.get(
url_page2,
[
# first request ends up with error
{"status_code": status_code},
# second request is ok
{"json": response2},
],
additional_matcher=_match_request,
)
# To avoid this test being too slow, we mock sleep within the retry behavior
mock_sleep = mocker.patch.object(lister.get_page_result.retry, "sleep")
listed_result = lister.run()
expected_nb_origins = len(response1) + len(response2)
assert listed_result == ListerStats(pages=2, origins=expected_nb_origins)
assert_sleep_calls(mocker, mock_sleep, [1])
def test_lister_gitlab_http_error_500(swh_scheduler, requests_mock, datadir):
- """Gitlab lister should skip buggy URL and move to next page.
-
- """
+ """Gitlab lister should skip buggy URL and move to next page."""
instance = "gite.lirmm.fr"
url = api_url(instance)
lister = GitLabLister(swh_scheduler, url=url, instance=instance)
url_page1 = lister.page_url()
response1 = gitlab_page_response(datadir, instance, 1)
url_page2 = lister.page_url(lister.per_page)
url_page3 = lister.page_url(2 * lister.per_page)
response3 = gitlab_page_response(datadir, instance, 3)
requests_mock.get(
url_page1,
[{"json": response1, "headers": {"Link": f"<{url_page2}>; rel=next"}}],
additional_matcher=_match_request,
)
requests_mock.get(
- url_page2, [{"status_code": 500},], additional_matcher=_match_request,
+ url_page2,
+ [
+ {"status_code": 500},
+ ],
+ additional_matcher=_match_request,
)
requests_mock.get(
- url_page3, [{"json": response3}], additional_matcher=_match_request,
+ url_page3,
+ [{"json": response3}],
+ additional_matcher=_match_request,
)
listed_result = lister.run()
expected_nb_origins = len(response1) + len(response3)
assert listed_result == ListerStats(pages=2, origins=expected_nb_origins)
def test_lister_gitlab_credentials(swh_scheduler):
- """Gitlab lister supports credentials configuration
-
- """
+ """Gitlab lister supports credentials configuration"""
instance = "gitlab"
credentials = {
"gitlab": {instance: [{"username": "user", "password": "api-token"}]}
}
url = api_url(instance)
lister = GitLabLister(
scheduler=swh_scheduler, url=url, instance=instance, credentials=credentials
)
assert lister.session.headers["Authorization"] == "Bearer api-token"
-@pytest.mark.parametrize("url", [api_url("gitlab").rstrip("/"), api_url("gitlab"),])
+@pytest.mark.parametrize(
+ "url",
+ [
+ api_url("gitlab").rstrip("/"),
+ api_url("gitlab"),
+ ],
+)
def test_lister_gitlab_url_computation(url, swh_scheduler):
lister = GitLabLister(scheduler=swh_scheduler, url=url)
assert not lister.url.endswith("/")
page_url = lister.page_url()
# ensure the generated url contains the separated /
assert page_url.startswith(f"{lister.url}/projects")
@pytest.mark.parametrize(
"url,expected_result",
[
(None, None),
("http://dummy/?query=1", None),
("http://dummy/?foo=bar&id_after=1&some=result", 1),
("http://dummy/?foo=bar&id_after=&some=result", None),
],
)
def test__parse_id_after(url, expected_result):
assert _parse_id_after(url) == expected_result
diff --git a/swh/lister/gitlab/tests/test_tasks.py b/swh/lister/gitlab/tests/test_tasks.py
index 38d5e92..3d0b2a3 100644
--- a/swh/lister/gitlab/tests/test_tasks.py
+++ b/swh/lister/gitlab/tests/test_tasks.py
@@ -1,47 +1,48 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
from swh.lister.pattern import ListerStats
def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker):
res = swh_scheduler_celery_app.send_task("swh.lister.gitlab.tasks.ping")
assert res
res.wait()
assert res.successful()
assert res.result == "OK"
@pytest.mark.parametrize(
"task_name,incremental",
[("IncrementalGitLabLister", True), ("FullGitLabRelister", False)],
)
def test_task_lister_gitlab(
task_name,
incremental,
swh_scheduler_celery_app,
swh_scheduler_celery_worker,
mocker,
):
stats = ListerStats(pages=10, origins=200)
mock_lister = mocker.patch("swh.lister.gitlab.tasks.GitLabLister")
mock_lister.from_configfile.return_value = mock_lister
mock_lister.run.return_value = ListerStats(pages=10, origins=200)
kwargs = dict(url="https://gitweb.torproject.org/")
res = swh_scheduler_celery_app.send_task(
- f"swh.lister.gitlab.tasks.{task_name}", kwargs=kwargs,
+ f"swh.lister.gitlab.tasks.{task_name}",
+ kwargs=kwargs,
)
assert res
res.wait()
assert res.successful()
mock_lister.from_configfile.assert_called_once_with(
incremental=incremental, **kwargs
)
mock_lister.run.assert_called_once_with()
assert res.result == stats.dict()
diff --git a/swh/lister/gnu/lister.py b/swh/lister/gnu/lister.py
index 3d35829..65eca1f 100644
--- a/swh/lister/gnu/lister.py
+++ b/swh/lister/gnu/lister.py
@@ -1,73 +1,75 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from typing import Any, Iterator, Mapping, Optional
import iso8601
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from ..pattern import CredentialsType, StatelessLister
from .tree import GNUTree
logger = logging.getLogger(__name__)
GNUPageType = Mapping[str, Any]
class GNULister(StatelessLister[GNUPageType]):
"""
List all GNU projects and associated artifacts.
"""
LISTER_NAME = "GNU"
GNU_FTP_URL = "https://ftp.gnu.org"
def __init__(
- self, scheduler: SchedulerInterface, credentials: CredentialsType = None,
+ self,
+ scheduler: SchedulerInterface,
+ credentials: CredentialsType = None,
):
super().__init__(
scheduler=scheduler,
url=self.GNU_FTP_URL,
instance="GNU",
credentials=credentials,
)
# no side-effect calls in constructor, if extra state is needed, as preconized
# by the pattern docstring, this must happen in the get_pages method.
self.gnu_tree: Optional[GNUTree] = None
def get_pages(self) -> Iterator[GNUPageType]:
"""
Yield a single page listing all GNU projects.
"""
# first fetch the manifest to parse
self.gnu_tree = GNUTree(f"{self.url}/tree.json.gz")
yield self.gnu_tree.projects
def get_origins_from_page(self, page: GNUPageType) -> Iterator[ListedOrigin]:
"""
Iterate on all GNU projects and yield ListedOrigin instances.
"""
assert self.lister_obj.id is not None
assert self.gnu_tree is not None
artifacts = self.gnu_tree.artifacts
for project_name, project_info in page.items():
origin_url = project_info["url"]
last_update = iso8601.parse_date(project_info["time_modified"])
logger.debug("Found origin %s last updated on %s", origin_url, last_update)
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=origin_url,
visit_type="tar",
last_update=last_update,
extra_loader_arguments={"artifacts": artifacts[project_name]},
)
diff --git a/swh/lister/gnu/tree.py b/swh/lister/gnu/tree.py
index ba74e04..f414ef3 100644
--- a/swh/lister/gnu/tree.py
+++ b/swh/lister/gnu/tree.py
@@ -1,336 +1,332 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
import gzip
import json
import logging
from os import path
from pathlib import Path
import re
from typing import Any, List, Mapping, Sequence, Tuple
from urllib.parse import urlparse
import requests
logger = logging.getLogger(__name__)
class GNUTree:
- """Gnu Tree's representation
-
- """
+ """Gnu Tree's representation"""
def __init__(self, url: str):
self.url = url # filepath or uri
u = urlparse(url)
self.base_url = "%s://%s" % (u.scheme, u.netloc)
# Interesting top level directories
self.top_level_directories = ["gnu", "old-gnu"]
# internal state
self._artifacts = {} # type: Mapping[str, Any]
self._projects = {} # type: Mapping[str, Any]
@property
def projects(self) -> Mapping[str, Any]:
if not self._projects:
self._projects, self._artifacts = self._load()
return self._projects
@property
def artifacts(self) -> Mapping[str, Any]:
if not self._artifacts:
self._projects, self._artifacts = self._load()
return self._artifacts
def _load(self) -> Tuple[Mapping[str, Any], Mapping[str, Any]]:
"""Compute projects and artifacts per project
Returns:
Tuple of dict projects (key project url, value the associated
information) and a dict artifacts (key project url, value the
info_file list)
"""
projects = {}
artifacts = {}
raw_data = load_raw_data(self.url)[0]
for directory in raw_data["contents"]:
if directory["name"] not in self.top_level_directories:
continue
infos = directory["contents"]
for info in infos:
if info["type"] == "directory":
package_url = "%s/%s/%s/" % (
self.base_url,
directory["name"],
info["name"],
)
package_artifacts = find_artifacts(info["contents"], package_url)
if package_artifacts != []:
repo_details = {
"name": info["name"],
"url": package_url,
"time_modified": format_date(info["time"]),
}
artifacts[package_url] = package_artifacts
projects[package_url] = repo_details
return projects, artifacts
def find_artifacts(
filesystem: List[Mapping[str, Any]], url: str
) -> List[Mapping[str, Any]]:
"""Recursively list artifacts present in the folder and subfolders for a
particular package url.
Args:
filesystem: File structure of the package root directory. This is a
list of Dict representing either file or directory information as
dict (keys: name, size, time, type).
url: URL of the corresponding package
Returns
List of tarball urls and their associated metadata (time, length,
etc...). For example:
.. code-block:: python
[
{
'url': 'https://ftp.gnu.org/gnu/3dldf/3DLDF-1.1.3.tar.gz',
'time': 1071002600,
'filename': '3DLDF-1.1.3.tar.gz',
'version': '1.1.3',
'length': 543
},
{
'url': 'https://ftp.gnu.org/gnu/3dldf/3DLDF-1.1.4.tar.gz',
'time': 1071078759,
'filename: '3DLDF-1.1.4.tar.gz',
'version': '1.1.4',
'length': 456
},
{
'url': 'https://ftp.gnu.org/gnu/3dldf/3DLDF-1.1.5.tar.gz',
'time': 1074278633,
'filename': '3DLDF-1.1.5.tar.gz',
'version': '1.1.5'
'length': 251
},
...
]
"""
artifacts = [] # type: List[Mapping[str, Any]]
for info_file in filesystem:
filetype = info_file["type"]
filename = info_file["name"]
if filetype == "file":
if check_filename_is_archive(filename):
uri = url + filename
artifacts.append(
{
"url": uri,
"filename": filename,
"time": format_date(info_file["time"]),
"length": int(info_file["size"]),
"version": get_version(filename),
}
)
# It will recursively check for artifacts in all sub-folders
elif filetype == "directory":
tarballs_in_dir = find_artifacts(
info_file["contents"], url + filename + "/"
)
artifacts.extend(tarballs_in_dir)
return artifacts
def check_filename_is_archive(filename: str) -> bool:
"""
Check for the extension of the file, if the file is of zip format of
.tar.x format, where x could be anything, then returns true.
Args:
filename: name of the file for which the extensions is needs to
be checked.
Returns:
Whether filename is an archive or not
Example:
>>> check_filename_is_archive('abc.zip')
True
>>> check_filename_is_archive('abc.tar.gz')
True
>>> check_filename_is_archive('bac.tar')
True
>>> check_filename_is_archive('abc.tar.gz.sig')
False
>>> check_filename_is_archive('foobar.tar.')
False
"""
file_suffixes = Path(filename).suffixes
if len(file_suffixes) == 1 and file_suffixes[-1] in (".zip", ".tar"):
return True
elif len(file_suffixes) > 1:
if file_suffixes[-1] == ".zip" or file_suffixes[-2] == ".tar":
return True
return False
# to recognize existing naming pattern
EXTENSIONS = [
"zip",
"tar",
"gz",
"tgz",
"bz2",
"bzip2",
"lzma",
"lz",
"xz",
"Z",
"7z",
]
VERSION_KEYWORDS = [
"cygwin_me",
"w32",
"win32",
"nt",
"cygwin",
"mingw",
"latest",
"alpha",
"beta",
"release",
"stable",
"hppa",
"solaris",
"sunos",
"sun4u",
"sparc",
"sun",
"aix",
"ibm",
"rs6000",
"i386",
"i686",
"linux",
"redhat",
"linuxlibc",
"mips",
"powerpc",
"macos",
"apple",
"darwin",
"macosx",
"powermacintosh",
"unknown",
"netbsd",
"freebsd",
"sgi",
"irix",
]
# Match a filename into components.
#
# We use Debian's release number heuristic: A release number starts
# with a digit, and is followed by alphanumeric characters or any of
# ., +, :, ~ and -
#
# We hardcode a list of possible extensions, as this release number
# scheme would match them too... We match on any combination of those.
#
# Greedy matching is done right to left (we only match the extension
# greedily with +, software_name and release_number are matched lazily
# with +? and *?).
PATTERN = r"""
^
(?:
# We have a software name and a release number, separated with a
# -, _ or dot.
(?P.+?[-_.])
(?P({vkeywords}|[0-9][0-9a-zA-Z_.+:~-]*?)+)
|
# We couldn't match a release number, put everything in the
# software name.
(?P.+?)
)
(?P(?:\.(?:{extensions}))+)
$
""".format(
extensions="|".join(EXTENSIONS),
vkeywords="|".join("%s[-]?" % k for k in VERSION_KEYWORDS),
)
def get_version(uri: str) -> str:
"""Extract branch name from tarball uri
Args:
uri (str): Tarball URI
Returns:
Version detected
Example:
>>> uri = 'https://ftp.gnu.org/gnu/8sync/8sync-0.2.0.tar.gz'
>>> get_version(uri)
'0.2.0'
>>> uri = '8sync-0.3.0.tar.gz'
>>> get_version(uri)
'0.3.0'
"""
filename = path.split(uri)[-1]
m = re.match(PATTERN, filename, flags=re.VERBOSE | re.IGNORECASE)
if m:
d = m.groupdict()
if d["software_name1"] and d["release_number"]:
return d["release_number"]
if d["software_name2"]:
return d["software_name2"]
return ""
def load_raw_data(url: str) -> Sequence[Mapping]:
"""Load the raw json from the tree.json.gz
Args:
url: Tree.json.gz url or path
Returns:
The raw json list
"""
if url.startswith("http://") or url.startswith("https://"):
response = requests.get(url, allow_redirects=True)
if not response.ok:
raise ValueError("Error during query to %s" % url)
raw = gzip.decompress(response.content)
else:
with gzip.open(url, "r") as f:
raw = f.read()
raw_data = json.loads(raw.decode("utf-8"))
return raw_data
def format_date(timestamp: str) -> str:
- """Format a string timestamp to an isoformat string
-
- """
+ """Format a string timestamp to an isoformat string"""
return datetime.fromtimestamp(int(timestamp), tz=timezone.utc).isoformat()
diff --git a/swh/lister/launchpad/lister.py b/swh/lister/launchpad/lister.py
index 8078175..b134303 100644
--- a/swh/lister/launchpad/lister.py
+++ b/swh/lister/launchpad/lister.py
@@ -1,208 +1,209 @@
# Copyright (C) 2020-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import dataclass
from datetime import datetime
import logging
from typing import Any, Dict, Iterator, Optional, Tuple
import iso8601
from launchpadlib.launchpad import Launchpad
from lazr.restfulclient.errors import RestfulError
from lazr.restfulclient.resource import Collection
from tenacity.before_sleep import before_sleep_log
from swh.lister.utils import retry_if_exception, throttling_retry
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from ..pattern import CredentialsType, Lister
logger = logging.getLogger(__name__)
VcsType = str
LaunchpadPageType = Tuple[VcsType, Collection]
SUPPORTED_VCS_TYPES = ("git", "bzr")
@dataclass
class LaunchpadListerState:
"""State of Launchpad lister"""
git_date_last_modified: Optional[datetime] = None
"""modification date of last updated git repository since last listing"""
bzr_date_last_modified: Optional[datetime] = None
"""modification date of last updated bzr repository since last listing"""
def origin(vcs_type: str, repo: Any) -> str:
"""Determine the origin url out of a repository with a given vcs_type"""
return repo.git_https_url if vcs_type == "git" else repo.web_link
def retry_if_restful_error(retry_state):
return retry_if_exception(retry_state, lambda e: isinstance(e, RestfulError))
class LaunchpadLister(Lister[LaunchpadListerState, LaunchpadPageType]):
"""
List repositories from Launchpad (git or bzr).
Args:
scheduler: instance of SchedulerInterface
incremental: defines if incremental listing should be used, in that case
only modified or new repositories since last incremental listing operation
will be returned
"""
LISTER_NAME = "launchpad"
def __init__(
self,
scheduler: SchedulerInterface,
incremental: bool = False,
credentials: CredentialsType = None,
):
super().__init__(
scheduler=scheduler,
url="https://launchpad.net/",
instance="launchpad",
credentials=credentials,
)
self.incremental = incremental
self.date_last_modified: Dict[str, Optional[datetime]] = {
"git": None,
"bzr": None,
}
def state_from_dict(self, d: Dict[str, Any]) -> LaunchpadListerState:
for vcs_type in SUPPORTED_VCS_TYPES:
key = f"{vcs_type}_date_last_modified"
date_last_modified = d.get(key)
if date_last_modified is not None:
d[key] = iso8601.parse_date(date_last_modified)
return LaunchpadListerState(**d)
def state_to_dict(self, state: LaunchpadListerState) -> Dict[str, Any]:
d: Dict[str, Optional[str]] = {}
for vcs_type in SUPPORTED_VCS_TYPES:
attribute_name = f"{vcs_type}_date_last_modified"
d[attribute_name] = None
if hasattr(state, attribute_name):
date_last_modified = getattr(state, attribute_name)
if date_last_modified is not None:
d[attribute_name] = date_last_modified.isoformat()
return d
@throttling_retry(
retry=retry_if_restful_error,
before_sleep=before_sleep_log(logger, logging.WARNING),
)
def _page_request(
self, launchpad, vcs_type: str, date_last_modified: Optional[datetime]
) -> Optional[Collection]:
"""Querying the page of results for a given vcs_type since the date_last_modified. If
some issues occurs, this will deal with the retrying policy.
"""
get_vcs_fns = {
"git": launchpad.git_repositories.getRepositories,
"bzr": launchpad.branches.getBranches,
}
return get_vcs_fns[vcs_type](
- order_by="most neglected first", modified_since_date=date_last_modified,
+ order_by="most neglected first",
+ modified_since_date=date_last_modified,
)
def get_pages(self) -> Iterator[LaunchpadPageType]:
"""
Yields an iterator on all git/bzr repositories hosted on Launchpad sorted
by last modification date in ascending order.
"""
launchpad = Launchpad.login_anonymously(
"softwareheritage", "production", version="devel"
)
if self.incremental:
self.date_last_modified = {
"git": self.state.git_date_last_modified,
"bzr": self.state.bzr_date_last_modified,
}
for vcs_type in SUPPORTED_VCS_TYPES:
try:
result = self._page_request(
launchpad, vcs_type, self.date_last_modified[vcs_type]
)
except RestfulError as e:
logger.warning("Listing %s origins raised %s", vcs_type, e)
result = None
if not result:
continue
yield vcs_type, result
def get_origins_from_page(self, page: LaunchpadPageType) -> Iterator[ListedOrigin]:
"""
Iterate on all git repositories and yield ListedOrigin instances.
"""
assert self.lister_obj.id is not None
vcs_type, repos = page
try:
for repo in repos:
origin_url = origin(vcs_type, repo)
# filter out origins with invalid URL
if not origin_url.startswith("https://"):
continue
last_update = repo.date_last_modified
self.date_last_modified[vcs_type] = last_update
logger.debug(
"Found origin %s with type %s last updated on %s",
origin_url,
vcs_type,
last_update,
)
yield ListedOrigin(
lister_id=self.lister_obj.id,
visit_type=vcs_type,
url=origin_url,
last_update=last_update,
)
except RestfulError as e:
logger.warning("Listing %s origins raised %s", vcs_type, e)
def finalize(self) -> None:
git_date_last_modified = self.date_last_modified["git"]
bzr_date_last_modified = self.date_last_modified["bzr"]
if git_date_last_modified is None and bzr_date_last_modified is None:
return
if self.incremental and (
self.state.git_date_last_modified is None
or (
git_date_last_modified is not None
and git_date_last_modified > self.state.git_date_last_modified
)
):
self.state.git_date_last_modified = git_date_last_modified
if self.incremental and (
self.state.bzr_date_last_modified is None
or (
bzr_date_last_modified is not None
and bzr_date_last_modified > self.state.bzr_date_last_modified
)
):
self.state.bzr_date_last_modified = self.date_last_modified["bzr"]
self.updated = True
diff --git a/swh/lister/launchpad/tests/test_lister.py b/swh/lister/launchpad/tests/test_lister.py
index 59fe605..0768dd5 100644
--- a/swh/lister/launchpad/tests/test_lister.py
+++ b/swh/lister/launchpad/tests/test_lister.py
@@ -1,256 +1,264 @@
# Copyright (C) 2020-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime
import json
from pathlib import Path
from typing import List
from lazr.restfulclient.errors import RestfulError
import pytest
from ..lister import LaunchpadLister, origin
class _Repo:
def __init__(self, d: dict):
for key in d.keys():
if key == "date_last_modified":
setattr(self, key, datetime.fromisoformat(d[key]))
else:
setattr(self, key, d[key])
class _Collection:
entries: List[_Repo] = []
def __init__(self, file):
self.entries = [_Repo(r) for r in file]
def __getitem__(self, key):
return self.entries[key]
def __len__(self):
return len(self.entries)
def _launchpad_response(datadir, datafile):
return _Collection(json.loads(Path(datadir, datafile).read_text()))
@pytest.fixture
def launchpad_response1(datadir):
return _launchpad_response(datadir, "launchpad_response1.json")
@pytest.fixture
def launchpad_response2(datadir):
return _launchpad_response(datadir, "launchpad_response2.json")
@pytest.fixture
def launchpad_bzr_response(datadir):
return _launchpad_response(datadir, "launchpad_bzr_response.json")
def _mock_launchpad(mocker, launchpad_response, launchpad_bzr_response=None):
mock_launchpad = mocker.patch("swh.lister.launchpad.lister.Launchpad")
mock_getRepositories = mock_launchpad.git_repositories.getRepositories
if isinstance(launchpad_response, Exception):
mock_getRepositories.side_effect = launchpad_response
else:
mock_getRepositories.return_value = launchpad_response
mock_getBranches = mock_launchpad.branches.getBranches
if launchpad_bzr_response is not None:
if isinstance(launchpad_bzr_response, Exception):
mock_getBranches.side_effect = launchpad_bzr_response
else:
mock_getBranches.return_value = launchpad_bzr_response
else:
mock_getBranches.return_value = [] # empty page
mock_launchpad.login_anonymously.return_value = mock_launchpad
return mock_getRepositories, mock_getBranches
def _check_listed_origins(scheduler_origins, launchpad_response, vcs_type="git"):
for repo in launchpad_response:
filtered_origins = [
o for o in scheduler_origins if o.url == origin(vcs_type, repo)
]
assert len(filtered_origins) == 1
assert filtered_origins[0].last_update == repo.date_last_modified
assert filtered_origins[0].visit_type == vcs_type
def test_lister_from_configfile(swh_scheduler_config, mocker):
load_from_envvar = mocker.patch("swh.lister.pattern.load_from_envvar")
load_from_envvar.return_value = {
"scheduler": {"cls": "local", **swh_scheduler_config},
"credentials": {},
}
lister = LaunchpadLister.from_configfile()
assert lister.scheduler is not None
assert lister.credentials is not None
def test_launchpad_full_lister(
swh_scheduler, mocker, launchpad_response1, launchpad_bzr_response
):
mock_getRepositories, mock_getBranches = _mock_launchpad(
mocker, launchpad_response1, launchpad_bzr_response
)
lister = LaunchpadLister(scheduler=swh_scheduler)
stats = lister.run()
assert not lister.incremental
assert lister.updated
assert stats.pages == 1 + 1, "Expects 1 page for git origins, another for bzr ones"
assert stats.origins == len(launchpad_response1) + len(launchpad_bzr_response)
mock_getRepositories.assert_called_once_with(
order_by="most neglected first", modified_since_date=None
)
mock_getBranches.assert_called_once_with(
order_by="most neglected first", modified_since_date=None
)
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
assert len(scheduler_origins) == len(launchpad_response1) + len(
launchpad_bzr_response
)
_check_listed_origins(scheduler_origins, launchpad_response1)
_check_listed_origins(scheduler_origins, launchpad_bzr_response, vcs_type="bzr")
def test_launchpad_incremental_lister(
swh_scheduler,
mocker,
launchpad_response1,
launchpad_response2,
launchpad_bzr_response,
):
mock_getRepositories, mock_getBranches = _mock_launchpad(
mocker, launchpad_response1, launchpad_bzr_response
)
lister = LaunchpadLister(scheduler=swh_scheduler, incremental=True)
stats = lister.run()
assert lister.incremental
assert lister.updated
assert stats.pages == 1 + 1, "Expects 1 page for git origins, another for bzr ones"
len_first_runs = len(launchpad_response1) + len(launchpad_bzr_response)
assert stats.origins == len_first_runs
mock_getRepositories.assert_called_once_with(
order_by="most neglected first", modified_since_date=None
)
mock_getBranches.assert_called_once_with(
order_by="most neglected first", modified_since_date=None
)
lister_state = lister.get_state_from_scheduler()
assert (
lister_state.git_date_last_modified
== launchpad_response1[-1].date_last_modified
)
assert (
lister_state.bzr_date_last_modified
== launchpad_bzr_response[-1].date_last_modified
)
mock_getRepositories, mock_getBranches = _mock_launchpad(
mocker, launchpad_response2
)
lister = LaunchpadLister(scheduler=swh_scheduler, incremental=True)
stats = lister.run()
assert lister.incremental
assert lister.updated
assert stats.pages == 1, "Empty bzr page response is ignored"
assert stats.origins == len(launchpad_response2)
mock_getRepositories.assert_called_once_with(
order_by="most neglected first",
modified_since_date=lister_state.git_date_last_modified,
)
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
assert len(scheduler_origins) == len_first_runs + len(launchpad_response2)
_check_listed_origins(scheduler_origins, launchpad_response1)
_check_listed_origins(scheduler_origins, launchpad_bzr_response, vcs_type="bzr")
_check_listed_origins(scheduler_origins, launchpad_response2)
def test_launchpad_lister_invalid_url_filtering(
- swh_scheduler, mocker,
+ swh_scheduler,
+ mocker,
):
- invalid_origin = [_Repo({"git_https_url": "tag:launchpad.net:2008:redacted",})]
+ invalid_origin = [
+ _Repo(
+ {
+ "git_https_url": "tag:launchpad.net:2008:redacted",
+ }
+ )
+ ]
_mock_launchpad(mocker, invalid_origin)
lister = LaunchpadLister(scheduler=swh_scheduler)
stats = lister.run()
assert not lister.updated
assert stats.pages == 1, "Empty pages are ignored(only 1 git page of results)"
assert stats.origins == 0
def test_launchpad_lister_duplicated_origin(
- swh_scheduler, mocker,
+ swh_scheduler,
+ mocker,
):
origin = _Repo(
{
"git_https_url": "https://git.launchpad.net/test",
"date_last_modified": "2021-01-14 21:05:31.231406+00:00",
}
)
origins = [origin, origin]
_mock_launchpad(mocker, origins)
lister = LaunchpadLister(scheduler=swh_scheduler)
stats = lister.run()
assert lister.updated
assert stats.pages == 1, "Empty bzr page are ignored (only 1 git page of results)"
assert stats.origins == 1
def test_launchpad_lister_raise_during_listing(
swh_scheduler, mocker, launchpad_response1, launchpad_bzr_response
):
lister = LaunchpadLister(scheduler=swh_scheduler)
# Exponential retries take a long time, so stub time.sleep
mocker.patch.object(lister._page_request.retry, "sleep")
mock_getRepositories, mock_getBranches = _mock_launchpad(
mocker,
RestfulError("Refuse to list git page"), # breaks git page listing
launchpad_bzr_response,
)
stats = lister.run()
assert lister.updated
assert stats.pages == 1
assert stats.origins == len(launchpad_bzr_response)
mock_getRepositories, mock_getBranches = _mock_launchpad(
mocker,
launchpad_response1,
RestfulError("Refuse to list bzr"), # breaks bzr page listing
)
lister = LaunchpadLister(scheduler=swh_scheduler)
stats = lister.run()
assert lister.updated
assert stats.pages == 1
assert stats.origins == len(launchpad_response1)
diff --git a/swh/lister/maven/lister.py b/swh/lister/maven/lister.py
index 2d57550..a940a4d 100644
--- a/swh/lister/maven/lister.py
+++ b/swh/lister/maven/lister.py
@@ -1,361 +1,372 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
import logging
import re
from typing import Any, Dict, Iterator, Optional
from urllib.parse import urljoin
import requests
from tenacity.before_sleep import before_sleep_log
from urllib3.util import parse_url
import xmltodict
from swh.lister.utils import throttling_retry
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from .. import USER_AGENT
from ..pattern import CredentialsType, Lister
logger = logging.getLogger(__name__)
RepoPage = Dict[str, Any]
@dataclass
class MavenListerState:
"""State of the MavenLister"""
last_seen_doc: int = -1
"""Last doc ID ingested during an incremental pass
"""
last_seen_pom: int = -1
"""Last doc ID related to a pom and ingested during
an incremental pass
"""
class MavenLister(Lister[MavenListerState, RepoPage]):
"""List origins from a Maven repository.
Maven Central provides artifacts for Java builds.
It includes POM files and source archives, which we download to get
the source code of artifacts and links to their scm repository.
This lister yields origins of types: git/svn/hg or whatever the Artifacts
use as repository type, plus maven types for the maven loader (tgz, jar)."""
LISTER_NAME = "maven"
def __init__(
self,
scheduler: SchedulerInterface,
url: str,
index_url: str = None,
instance: Optional[str] = None,
credentials: CredentialsType = None,
incremental: bool = True,
):
"""Lister class for Maven repositories.
Args:
url: main URL of the Maven repository, i.e. url of the base index
used to fetch maven artifacts. For Maven central use
https://repo1.maven.org/maven2/
index_url: the URL to download the exported text indexes from.
Would typically be a local host running the export docker image.
See README.md in this directory for more information.
instance: Name of maven instance. Defaults to url's network location
if unset.
incremental: bool, defaults to True. Defines if incremental listing
is activated or not.
"""
self.BASE_URL = url
self.INDEX_URL = index_url
self.incremental = incremental
if instance is None:
instance = parse_url(url).host
super().__init__(
- scheduler=scheduler, credentials=credentials, url=url, instance=instance,
+ scheduler=scheduler,
+ credentials=credentials,
+ url=url,
+ instance=instance,
)
self.session = requests.Session()
self.session.headers.update(
- {"Accept": "application/json", "User-Agent": USER_AGENT,}
+ {
+ "Accept": "application/json",
+ "User-Agent": USER_AGENT,
+ }
)
def state_from_dict(self, d: Dict[str, Any]) -> MavenListerState:
return MavenListerState(**d)
def state_to_dict(self, state: MavenListerState) -> Dict[str, Any]:
return asdict(state)
@throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING))
def page_request(self, url: str, params: Dict[str, Any]) -> requests.Response:
logger.info("Fetching URL %s with params %s", url, params)
response = self.session.get(url, params=params)
if response.status_code != 200:
logger.warning(
"Unexpected HTTP status code %s on %s: %s",
response.status_code,
response.url,
response.content,
)
response.raise_for_status()
return response
def get_pages(self) -> Iterator[RepoPage]:
- """ Retrieve and parse exported maven indexes to
+ """Retrieve and parse exported maven indexes to
identify all pom files and src archives.
"""
# Example of returned RepoPage's:
# [
# {
# "type": "maven",
# "url": "https://maven.xwiki.org/..-5.4.2-sources.jar",
# "time": 1626109619335,
# "gid": "org.xwiki.platform",
# "aid": "xwiki-platform-wikistream-events-xwiki",
# "version": "5.4.2"
# },
# {
# "type": "scm",
# "url": "scm:git:git://github.com/openengsb/openengsb-framework.git",
# "project": "openengsb-framework",
# },
# ...
# ]
# Download the main text index file.
logger.info("Downloading text index from %s.", self.INDEX_URL)
assert self.INDEX_URL is not None
response = requests.get(self.INDEX_URL, stream=True)
response.raise_for_status()
# Prepare regexes to parse index exports.
# Parse doc id.
# Example line: "doc 13"
re_doc = re.compile(r"^doc (?P\d+)$")
# Parse gid, aid, version, classifier, extension.
# Example line: " value al.aldi|sprova4j|0.1.0|sources|jar"
re_val = re.compile(
r"^\s{4}value (?P[^|]+)\|(?P[^|]+)\|(?P[^|]+)\|"
+ r"(?P[^|]+)\|(?P[^|]+)$"
)
# Parse last modification time.
# Example line: " value jar|1626109619335|14316|2|2|0|jar"
re_time = re.compile(
r"^\s{4}value ([^|]+)\|(?P[^|]+)\|([^|]+)\|([^|]+)\|([^|]+)"
+ r"\|([^|]+)\|([^|]+)$"
)
# Read file line by line and process it
out_pom: Dict = {}
jar_src: Dict = {}
doc_id: int = 0
jar_src["doc"] = None
url_src = None
iterator = response.iter_lines(chunk_size=1024)
for line_bytes in iterator:
# Read the index text export and get URLs and SCMs.
line = line_bytes.decode(errors="ignore")
m_doc = re_doc.match(line)
if m_doc is not None:
doc_id = int(m_doc.group("doc"))
if (
self.incremental
and self.state
and self.state.last_seen_doc
and self.state.last_seen_doc >= doc_id
):
# jar_src["doc"] contains the id of the current document, whatever
# its type (scm or jar).
jar_src["doc"] = None
else:
jar_src["doc"] = doc_id
else:
# If incremental mode, we don't record any line that is
# before our last recorded doc id.
if self.incremental and jar_src["doc"] is None:
continue
m_val = re_val.match(line)
if m_val is not None:
(gid, aid, version, classifier, ext) = m_val.groups()
ext = ext.strip()
path = "/".join(gid.split("."))
if classifier == "NA" and ext.lower() == "pom":
# If incremental mode, we don't record any line that is
# before our last recorded doc id.
if (
self.incremental
and self.state
and self.state.last_seen_pom
and self.state.last_seen_pom >= doc_id
):
continue
url_path = f"{path}/{aid}/{version}/{aid}-{version}.{ext}"
- url_pom = urljoin(self.BASE_URL, url_path,)
+ url_pom = urljoin(
+ self.BASE_URL,
+ url_path,
+ )
out_pom[url_pom] = doc_id
elif (
classifier.lower() == "sources" or ("src" in classifier)
) and ext.lower() in ("zip", "jar"):
url_path = (
f"{path}/{aid}/{version}/{aid}-{version}-{classifier}.{ext}"
)
url_src = urljoin(self.BASE_URL, url_path)
jar_src["gid"] = gid
jar_src["aid"] = aid
jar_src["version"] = version
else:
m_time = re_time.match(line)
if m_time is not None and url_src is not None:
time = m_time.group("mtime")
jar_src["time"] = int(time)
artifact_metadata_d = {
"type": "maven",
"url": url_src,
**jar_src,
}
logger.debug(
"* Yielding jar %s: %s", url_src, artifact_metadata_d
)
yield artifact_metadata_d
url_src = None
logger.info("Found %s poms.", len(out_pom))
# Now fetch pom files and scan them for scm info.
logger.info("Fetching poms..")
for pom in out_pom:
text = self.page_request(pom, {})
try:
project = xmltodict.parse(text.content.decode())
if "scm" in project["project"]:
if "connection" in project["project"]["scm"]:
scm = project["project"]["scm"]["connection"]
gid = project["project"]["groupId"]
aid = project["project"]["artifactId"]
artifact_metadata_d = {
"type": "scm",
"doc": out_pom[pom],
"url": scm,
"project": f"{gid}.{aid}",
}
logger.debug("* Yielding pom %s: %s", pom, artifact_metadata_d)
yield artifact_metadata_d
else:
logger.debug("No scm.connection in pom %s", pom)
else:
logger.debug("No scm in pom %s", pom)
except xmltodict.expat.ExpatError as error:
logger.info("Could not parse POM %s XML: %s. Next.", pom, error)
def get_origins_from_page(self, page: RepoPage) -> Iterator[ListedOrigin]:
- """Convert a page of Maven repositories into a list of ListedOrigins.
-
- """
+ """Convert a page of Maven repositories into a list of ListedOrigins."""
assert self.lister_obj.id is not None
scm_types_ok = ("git", "svn", "hg", "cvs", "bzr")
if page["type"] == "scm":
# If origin is a scm url: detect scm type and yield.
# Note that the official format is:
# scm:git:git://github.com/openengsb/openengsb-framework.git
# but many, many projects directly put the repo url, so we have to
# detect the content to match it properly.
m_scm = re.match(r"^scm:(?P[^:]+):(?P.*)$", page["url"])
if m_scm is not None:
scm_type = m_scm.group("type")
if scm_type in scm_types_ok:
scm_url = m_scm.group("url")
origin = ListedOrigin(
- lister_id=self.lister_obj.id, url=scm_url, visit_type=scm_type,
+ lister_id=self.lister_obj.id,
+ url=scm_url,
+ visit_type=scm_type,
)
yield origin
else:
if page["url"].endswith(".git"):
origin = ListedOrigin(
- lister_id=self.lister_obj.id, url=page["url"], visit_type="git",
+ lister_id=self.lister_obj.id,
+ url=page["url"],
+ visit_type="git",
)
yield origin
else:
# Origin is a source archive:
last_update_dt = None
last_update_iso = ""
last_update_seconds = str(page["time"])[:-3]
try:
last_update_dt = datetime.fromtimestamp(int(last_update_seconds))
last_update_dt_tz = last_update_dt.astimezone(timezone.utc)
except OverflowError:
logger.warning("- Failed to convert datetime %s.", last_update_seconds)
if last_update_dt:
last_update_iso = last_update_dt_tz.isoformat()
origin = ListedOrigin(
lister_id=self.lister_obj.id,
url=page["url"],
visit_type=page["type"],
last_update=last_update_dt_tz,
extra_loader_arguments={
"artifacts": [
{
"time": last_update_iso,
"gid": page["gid"],
"aid": page["aid"],
"version": page["version"],
"base_url": self.BASE_URL,
}
]
},
)
yield origin
def commit_page(self, page: RepoPage) -> None:
"""Update currently stored state using the latest listed doc.
Note: this is a noop for full listing mode
"""
if self.incremental and self.state:
# We need to differentiate the two state counters according
# to the type of origin.
if page["type"] == "maven" and page["doc"] > self.state.last_seen_doc:
self.state.last_seen_doc = page["doc"]
elif page["type"] == "scm" and page["doc"] > self.state.last_seen_pom:
self.state.last_seen_doc = page["doc"]
self.state.last_seen_pom = page["doc"]
def finalize(self) -> None:
"""Finalize the lister state, set update if any progress has been made.
Note: this is a noop for full listing mode
"""
if self.incremental and self.state:
last_seen_doc = self.state.last_seen_doc
last_seen_pom = self.state.last_seen_pom
scheduler_state = self.get_state_from_scheduler()
if last_seen_doc and last_seen_pom:
if (scheduler_state.last_seen_doc < last_seen_doc) or (
scheduler_state.last_seen_pom < last_seen_pom
):
self.updated = True
diff --git a/swh/lister/maven/tests/test_lister.py b/swh/lister/maven/tests/test_lister.py
index c81ee96..c8142ef 100644
--- a/swh/lister/maven/tests/test_lister.py
+++ b/swh/lister/maven/tests/test_lister.py
@@ -1,320 +1,325 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import timezone
from pathlib import Path
import iso8601
import pytest
import requests
from swh.lister.maven.lister import MavenLister
MVN_URL = "https://repo1.maven.org/maven2/" # main maven repo url
INDEX_URL = "http://indexes/export.fld" # index directory url
URL_POM_1 = MVN_URL + "al/aldi/sprova4j/0.1.0/sprova4j-0.1.0.pom"
URL_POM_2 = MVN_URL + "al/aldi/sprova4j/0.1.1/sprova4j-0.1.1.pom"
URL_POM_3 = MVN_URL + "com/arangodb/arangodb-graphql/1.2/arangodb-graphql-1.2.pom"
LIST_GIT = (
"git://github.com/aldialimucaj/sprova4j.git",
"https://github.com/aldialimucaj/sprova4j.git",
)
LIST_GIT_INCR = ("git://github.com/ArangoDB-Community/arangodb-graphql-java.git",)
LIST_SRC = (
MVN_URL + "al/aldi/sprova4j/0.1.0/sprova4j-0.1.0-sources.jar",
MVN_URL + "al/aldi/sprova4j/0.1.1/sprova4j-0.1.1-sources.jar",
)
LIST_SRC_DATA = (
{
"type": "maven",
"url": "https://repo1.maven.org/maven2/al/aldi/sprova4j"
+ "/0.1.0/sprova4j-0.1.0-sources.jar",
"time": "2021-07-12T17:06:59+00:00",
"gid": "al.aldi",
"aid": "sprova4j",
"version": "0.1.0",
},
{
"type": "maven",
"url": "https://repo1.maven.org/maven2/al/aldi/sprova4j"
+ "/0.1.1/sprova4j-0.1.1-sources.jar",
"time": "2021-07-12T17:37:05+00:00",
"gid": "al.aldi",
"aid": "sprova4j",
"version": "0.1.1",
},
)
@pytest.fixture
def maven_index(datadir) -> str:
return Path(datadir, "http_indexes", "export.fld").read_text()
@pytest.fixture
def maven_index_incr(datadir) -> str:
return Path(datadir, "http_indexes", "export_incr.fld").read_text()
@pytest.fixture
def maven_pom_1(datadir) -> str:
return Path(datadir, "https_maven.org", "sprova4j-0.1.0.pom").read_text()
@pytest.fixture
def maven_pom_1_malformed(datadir) -> str:
return Path(datadir, "https_maven.org", "sprova4j-0.1.0.malformed.pom").read_text()
@pytest.fixture
def maven_pom_2(datadir) -> str:
return Path(datadir, "https_maven.org", "sprova4j-0.1.1.pom").read_text()
@pytest.fixture
def maven_pom_3(datadir) -> str:
return Path(datadir, "https_maven.org", "arangodb-graphql-1.2.pom").read_text()
def test_maven_full_listing(
- swh_scheduler, requests_mock, mocker, maven_index, maven_pom_1, maven_pom_2,
+ swh_scheduler,
+ requests_mock,
+ mocker,
+ maven_index,
+ maven_pom_1,
+ maven_pom_2,
):
"""Covers full listing of multiple pages, checking page results and listed
origins, statelessness."""
lister = MavenLister(
scheduler=swh_scheduler,
url=MVN_URL,
instance="maven.org",
index_url=INDEX_URL,
incremental=False,
)
# Set up test.
index_text = maven_index
requests_mock.get(INDEX_URL, text=index_text)
requests_mock.get(URL_POM_1, text=maven_pom_1)
requests_mock.get(URL_POM_2, text=maven_pom_2)
# Then run the lister.
stats = lister.run()
# Start test checks.
assert stats.pages == 4
assert stats.origins == 4
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
origin_urls = [origin.url for origin in scheduler_origins]
assert sorted(origin_urls) == sorted(LIST_GIT + LIST_SRC)
for origin in scheduler_origins:
if origin.visit_type == "maven":
for src in LIST_SRC_DATA:
if src.get("url") == origin.url:
last_update_src = iso8601.parse_date(src.get("time")).astimezone(
tz=timezone.utc
)
assert last_update_src == origin.last_update
artifact = origin.extra_loader_arguments["artifacts"][0]
assert src.get("time") == artifact["time"]
assert src.get("gid") == artifact["gid"]
assert src.get("aid") == artifact["aid"]
assert src.get("version") == artifact["version"]
assert MVN_URL == artifact["base_url"]
break
else:
raise AssertionError(
"Could not find scheduler origin in referenced origins."
)
scheduler_state = lister.get_state_from_scheduler()
assert scheduler_state is not None
assert scheduler_state.last_seen_doc == -1
assert scheduler_state.last_seen_pom == -1
def test_maven_full_listing_malformed(
swh_scheduler,
requests_mock,
mocker,
maven_index,
maven_pom_1_malformed,
maven_pom_2,
):
"""Covers full listing of multiple pages, checking page results with a malformed
scm entry in pom."""
lister = MavenLister(
scheduler=swh_scheduler,
url=MVN_URL,
instance="maven.org",
index_url=INDEX_URL,
incremental=False,
)
# Set up test.
index_text = maven_index
requests_mock.get(INDEX_URL, text=index_text)
requests_mock.get(URL_POM_1, text=maven_pom_1_malformed)
requests_mock.get(URL_POM_2, text=maven_pom_2)
# Then run the lister.
stats = lister.run()
# Start test checks.
assert stats.pages == 4
assert stats.origins == 3
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
origin_urls = [origin.url for origin in scheduler_origins]
LIST_SRC_1 = ("https://github.com/aldialimucaj/sprova4j.git",)
assert sorted(origin_urls) == sorted(LIST_SRC_1 + LIST_SRC)
for origin in scheduler_origins:
if origin.visit_type == "maven":
for src in LIST_SRC_DATA:
if src.get("url") == origin.url:
artifact = origin.extra_loader_arguments["artifacts"][0]
assert src.get("time") == artifact["time"]
assert src.get("gid") == artifact["gid"]
assert src.get("aid") == artifact["aid"]
assert src.get("version") == artifact["version"]
assert MVN_URL == artifact["base_url"]
break
else:
raise AssertionError(
"Could not find scheduler origin in referenced origins."
)
scheduler_state = lister.get_state_from_scheduler()
assert scheduler_state is not None
assert scheduler_state.last_seen_doc == -1
assert scheduler_state.last_seen_pom == -1
def test_maven_incremental_listing(
swh_scheduler,
requests_mock,
mocker,
maven_index,
maven_index_incr,
maven_pom_1,
maven_pom_2,
maven_pom_3,
):
"""Covers full listing of multiple pages, checking page results and listed
origins, with a second updated run for statefulness."""
lister = MavenLister(
scheduler=swh_scheduler,
url=MVN_URL,
instance="maven.org",
index_url=INDEX_URL,
incremental=True,
)
# Set up test.
requests_mock.get(INDEX_URL, text=maven_index)
requests_mock.get(URL_POM_1, text=maven_pom_1)
requests_mock.get(URL_POM_2, text=maven_pom_2)
# Then run the lister.
stats = lister.run()
# Start test checks.
assert lister.incremental
assert lister.updated
assert stats.pages == 4
assert stats.origins == 4
# Second execution of the lister, incremental mode
lister = MavenLister(
scheduler=swh_scheduler,
url=MVN_URL,
instance="maven.org",
index_url=INDEX_URL,
incremental=True,
)
scheduler_state = lister.get_state_from_scheduler()
assert scheduler_state is not None
assert scheduler_state.last_seen_doc == 3
assert scheduler_state.last_seen_pom == 3
# Set up test.
requests_mock.get(INDEX_URL, text=maven_index_incr)
requests_mock.get(URL_POM_3, text=maven_pom_3)
# Then run the lister.
stats = lister.run()
# Start test checks.
assert lister.incremental
assert lister.updated
assert stats.pages == 1
assert stats.origins == 1
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
origin_urls = [origin.url for origin in scheduler_origins]
assert sorted(origin_urls) == sorted(LIST_SRC + LIST_GIT + LIST_GIT_INCR)
for origin in scheduler_origins:
if origin.visit_type == "maven":
for src in LIST_SRC_DATA:
if src.get("url") == origin.url:
artifact = origin.extra_loader_arguments["artifacts"][0]
assert src.get("time") == artifact["time"]
assert src.get("gid") == artifact["gid"]
assert src.get("aid") == artifact["aid"]
assert src.get("version") == artifact["version"]
break
else:
raise AssertionError
scheduler_state = lister.get_state_from_scheduler()
assert scheduler_state is not None
assert scheduler_state.last_seen_doc == 4
assert scheduler_state.last_seen_pom == 4
@pytest.mark.parametrize("http_code", [400, 404, 500, 502])
def test_maven_list_http_error(
swh_scheduler, requests_mock, mocker, maven_index, http_code
):
"""Test handling of some common HTTP errors:
- 400: Bad request.
- 404: Resource no found.
- 500: Internal server error.
- 502: Bad gateway ou proxy Error.
"""
lister = MavenLister(scheduler=swh_scheduler, url=MVN_URL, index_url=INDEX_URL)
# Test failure of index retrieval.
requests_mock.get(INDEX_URL, status_code=http_code)
with pytest.raises(requests.HTTPError):
lister.run()
# Test failure of artefacts retrieval.
requests_mock.get(INDEX_URL, text=maven_index)
requests_mock.get(URL_POM_1, status_code=http_code)
with pytest.raises(requests.HTTPError):
lister.run()
# If the maven_index step succeeded but not the get_pom step,
# then we get only the 2 maven-jar origins (and not the 2 additional
# src origins).
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
assert len(scheduler_origins) == 2
diff --git a/swh/lister/maven/tests/test_tasks.py b/swh/lister/maven/tests/test_tasks.py
index 864c00d..b95dfda 100644
--- a/swh/lister/maven/tests/test_tasks.py
+++ b/swh/lister/maven/tests/test_tasks.py
@@ -1,45 +1,46 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
from swh.lister.pattern import ListerStats
def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker):
res = swh_scheduler_celery_app.send_task("swh.lister.maven.tasks.ping")
assert res
res.wait()
assert res.successful()
assert res.result == "OK"
@pytest.mark.parametrize(
"task_name,incremental",
[("IncrementalMavenLister", True), ("FullMavenLister", False)],
)
def test_task_lister_maven(
task_name,
incremental,
swh_scheduler_celery_app,
swh_scheduler_celery_worker,
mocker,
):
lister = mocker.patch("swh.lister.maven.tasks.MavenLister")
lister.from_configfile.return_value = lister
lister.run.return_value = ListerStats(pages=10, origins=500)
kwargs = dict(
url="https://repo1.maven.org/maven2/", index_url="http://indexes/export.fld"
)
res = swh_scheduler_celery_app.send_task(
- f"swh.lister.maven.tasks.{task_name}", kwargs=kwargs,
+ f"swh.lister.maven.tasks.{task_name}",
+ kwargs=kwargs,
)
assert res
res.wait()
assert res.successful()
lister.from_configfile.assert_called_once_with(incremental=incremental, **kwargs)
lister.run.assert_called_once_with()
diff --git a/swh/lister/npm/tests/test_lister.py b/swh/lister/npm/tests/test_lister.py
index 8ceb86c..1c20b33 100644
--- a/swh/lister/npm/tests/test_lister.py
+++ b/swh/lister/npm/tests/test_lister.py
@@ -1,200 +1,207 @@
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from itertools import chain
import json
from pathlib import Path
import iso8601
import pytest
from requests.exceptions import HTTPError
from swh.lister import USER_AGENT
from swh.lister.npm.lister import NpmLister, NpmListerState
@pytest.fixture
def npm_full_listing_page1(datadir):
return json.loads(Path(datadir, "npm_full_page1.json").read_text())
@pytest.fixture
def npm_full_listing_page2(datadir):
return json.loads(Path(datadir, "npm_full_page2.json").read_text())
@pytest.fixture
def npm_incremental_listing_page1(datadir):
return json.loads(Path(datadir, "npm_incremental_page1.json").read_text())
@pytest.fixture
def npm_incremental_listing_page2(datadir):
return json.loads(Path(datadir, "npm_incremental_page2.json").read_text())
def _check_listed_npm_packages(lister, packages, scheduler_origins):
for package in packages:
package_name = package["doc"]["name"]
latest_version = package["doc"]["dist-tags"]["latest"]
package_last_update = iso8601.parse_date(package["doc"]["time"][latest_version])
origin_url = lister.PACKAGE_URL_TEMPLATE.format(package_name=package_name)
scheduler_origin = [o for o in scheduler_origins if o.url == origin_url]
assert scheduler_origin
assert scheduler_origin[0].last_update == package_last_update
def _match_request(request):
return request.headers.get("User-Agent") == USER_AGENT
def _url_params(page_size, **kwargs):
params = {"limit": page_size, "include_docs": "true"}
params.update(**kwargs)
return params
def test_npm_lister_full(
swh_scheduler, requests_mock, mocker, npm_full_listing_page1, npm_full_listing_page2
):
"""Simulate a full listing of four npm packages in two pages"""
page_size = 2
lister = NpmLister(scheduler=swh_scheduler, page_size=page_size, incremental=False)
requests_mock.get(
lister.API_FULL_LISTING_URL,
- [{"json": npm_full_listing_page1}, {"json": npm_full_listing_page2},],
+ [
+ {"json": npm_full_listing_page1},
+ {"json": npm_full_listing_page2},
+ ],
additional_matcher=_match_request,
)
spy_get = mocker.spy(lister.session, "get")
stats = lister.run()
assert stats.pages == 2
assert stats.origins == page_size * stats.pages
spy_get.assert_has_calls(
[
mocker.call(
lister.API_FULL_LISTING_URL,
params=_url_params(page_size + 1, startkey='""'),
),
mocker.call(
lister.API_FULL_LISTING_URL,
params=_url_params(
page_size + 1,
startkey=f'"{npm_full_listing_page1["rows"][-1]["id"]}"',
),
),
]
)
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
_check_listed_npm_packages(
lister,
chain(npm_full_listing_page1["rows"][:-1], npm_full_listing_page2["rows"]),
scheduler_origins,
)
assert lister.get_state_from_scheduler() == NpmListerState()
def test_npm_lister_incremental(
swh_scheduler,
requests_mock,
mocker,
npm_incremental_listing_page1,
npm_incremental_listing_page2,
):
"""Simulate an incremental listing of four npm packages in two pages"""
page_size = 2
lister = NpmLister(scheduler=swh_scheduler, page_size=page_size, incremental=True)
requests_mock.get(
lister.API_INCREMENTAL_LISTING_URL,
[
{"json": npm_incremental_listing_page1},
{"json": npm_incremental_listing_page2},
{"json": {"results": []}},
],
additional_matcher=_match_request,
)
spy_get = mocker.spy(lister.session, "get")
assert lister.get_state_from_scheduler() == NpmListerState()
stats = lister.run()
assert stats.pages == 2
assert stats.origins == page_size * stats.pages
last_seq = npm_incremental_listing_page2["results"][-1]["seq"]
spy_get.assert_has_calls(
[
mocker.call(
lister.API_INCREMENTAL_LISTING_URL,
params=_url_params(page_size, since="0"),
),
mocker.call(
lister.API_INCREMENTAL_LISTING_URL,
params=_url_params(
page_size,
since=str(npm_incremental_listing_page1["results"][-1]["seq"]),
),
),
mocker.call(
lister.API_INCREMENTAL_LISTING_URL,
params=_url_params(page_size, since=str(last_seq)),
),
]
)
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
_check_listed_npm_packages(
lister,
chain(
npm_incremental_listing_page1["results"],
npm_incremental_listing_page2["results"],
),
scheduler_origins,
)
assert lister.get_state_from_scheduler() == NpmListerState(last_seq=last_seq)
def test_npm_lister_incremental_restart(
- swh_scheduler, requests_mock, mocker,
+ swh_scheduler,
+ requests_mock,
+ mocker,
):
"""Check incremental npm listing will restart from saved state"""
page_size = 2
last_seq = 67
lister = NpmLister(scheduler=swh_scheduler, page_size=page_size, incremental=True)
lister.state = NpmListerState(last_seq=last_seq)
requests_mock.get(lister.API_INCREMENTAL_LISTING_URL, json={"results": []})
spy_get = mocker.spy(lister.session, "get")
lister.run()
spy_get.assert_called_with(
lister.API_INCREMENTAL_LISTING_URL,
params=_url_params(page_size, since=str(last_seq)),
)
def test_npm_lister_http_error(
- swh_scheduler, requests_mock, mocker,
+ swh_scheduler,
+ requests_mock,
+ mocker,
):
lister = NpmLister(scheduler=swh_scheduler)
requests_mock.get(lister.API_FULL_LISTING_URL, status_code=500)
with pytest.raises(HTTPError):
lister.run()
diff --git a/swh/lister/opam/lister.py b/swh/lister/opam/lister.py
index 4ad510e..724d198 100644
--- a/swh/lister/opam/lister.py
+++ b/swh/lister/opam/lister.py
@@ -1,141 +1,144 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import io
import logging
import os
from subprocess import PIPE, Popen, call
from typing import Any, Dict, Iterator, Optional
from swh.lister.pattern import StatelessLister
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from ..pattern import CredentialsType
logger = logging.getLogger(__name__)
PageType = str
class OpamLister(StatelessLister[PageType]):
"""
List all repositories hosted on an opam repository.
On initialisation, we create an opam root, with no ocaml compiler (no switch)
as we won't need it and it's costly. In this opam root, we add a single opam
repository (url) and give it a name (instance). Then, to get pages, we just ask
opam to list all the packages for our opam repository in our opam root.
Args:
url: base URL of an opam repository
(for instance https://opam.ocaml.org)
instance: string identifier for the listed repository
"""
# Part of the lister API, that identifies this lister
LISTER_NAME = "opam"
def __init__(
self,
scheduler: SchedulerInterface,
url: str,
instance: Optional[str] = None,
credentials: CredentialsType = None,
opam_root: str = "/tmp/opam/",
):
super().__init__(
- scheduler=scheduler, credentials=credentials, url=url, instance=instance,
+ scheduler=scheduler,
+ credentials=credentials,
+ url=url,
+ instance=instance,
)
self.env = os.environ.copy()
# Opam root folder is initialized in the :meth:`get_pages` method as no
# side-effect should happen in the constructor to ease instantiation
self.opam_root = opam_root
def get_pages(self) -> Iterator[PageType]:
# Initialize the opam root directory
opam_init(self.opam_root, self.instance, self.url, self.env)
# Actually list opam instance data
proc = Popen(
[
"opam",
"list",
"--all",
"--no-switch",
"--safe",
"--repos",
self.instance,
"--root",
self.opam_root,
"--normalise",
"--short",
],
env=self.env,
stdout=PIPE,
)
if proc.stdout is not None:
for line in io.TextIOWrapper(proc.stdout):
yield line.rstrip("\n")
def get_origins_from_page(self, page: PageType) -> Iterator[ListedOrigin]:
"""Convert a page of OpamLister repositories into a list of ListedOrigins"""
assert self.lister_obj.id is not None
# a page is just a package name
url = f"opam+{self.url}/packages/{page}/"
yield ListedOrigin(
lister_id=self.lister_obj.id,
visit_type="opam",
url=url,
last_update=None,
extra_loader_arguments={
"opam_root": self.opam_root,
"opam_instance": self.instance,
"opam_url": self.url,
"opam_package": page,
},
)
def opam_init(opam_root: str, instance: str, url: str, env: Dict[str, Any]) -> None:
"""Initialize an opam_root folder.
Args:
opam_root: The opam root folder to initialize
instance: Name of the opam repository to add or initialize
url: The associated url of the opam repository to add or initialize
env: The global environment to use for the opam command.
Returns:
None.
"""
if not os.path.exists(opam_root) or not os.listdir(opam_root):
command = [
"opam",
"init",
"--reinit",
"--bare",
"--no-setup",
"--root",
opam_root,
instance,
url,
]
else:
# The repository exists and is populated, we just add another instance in the
# repository. If it's already setup, it's a noop
command = [
"opam",
"repository",
"add",
"--root",
opam_root,
instance,
url,
]
# Actually execute the command
call(command, env=env)
diff --git a/swh/lister/opam/tests/test_lister.py b/swh/lister/opam/tests/test_lister.py
index b39c501..26dc753 100644
--- a/swh/lister/opam/tests/test_lister.py
+++ b/swh/lister/opam/tests/test_lister.py
@@ -1,170 +1,170 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import io
import os
from tempfile import mkdtemp
from unittest.mock import MagicMock
import pytest
from swh.lister.opam.lister import OpamLister, opam_init
module_name = "swh.lister.opam.lister"
@pytest.fixture
def mock_opam(mocker):
- """Fixture to bypass the actual opam calls within the test context.
-
- """
+ """Fixture to bypass the actual opam calls within the test context."""
# inhibits the real `subprocess.call` which prepares the required internal opam
# state
mock_init = mocker.patch(f"{module_name}.call", return_value=None)
# replaces the real Popen with a fake one (list origins command)
mocked_popen = MagicMock()
mocked_popen.stdout = io.BytesIO(b"bar\nbaz\nfoo\n")
mock_open = mocker.patch(f"{module_name}.Popen", return_value=mocked_popen)
return mock_init, mock_open
def test_mock_init_repository_init(mock_opam, tmp_path, datadir):
- """Initializing opam root directory with an instance should be ok
-
- """
+ """Initializing opam root directory with an instance should be ok"""
mock_init, mock_popen = mock_opam
instance = "fake"
instance_url = f"file://{datadir}/{instance}"
opam_root = str(tmp_path / "test-opam")
assert not os.path.exists(opam_root)
# This will initialize an opam directory with the instance
opam_init(opam_root, instance, instance_url, {})
assert mock_init.called
def test_mock_init_repository_update(mock_opam, tmp_path, datadir):
- """Updating opam root directory with another instance should be ok
-
- """
+ """Updating opam root directory with another instance should be ok"""
mock_init, mock_popen = mock_opam
instance = "fake_opam_repo"
instance_url = f"file://{datadir}/{instance}"
opam_root = str(tmp_path / "test-opam")
os.makedirs(opam_root, exist_ok=True)
with open(os.path.join(opam_root, "opam"), "w") as f:
f.write("one file to avoid empty folder")
assert os.path.exists(opam_root)
assert os.listdir(opam_root) == ["opam"] # not empty
# This will update the repository opam with another instance
opam_init(opam_root, instance, instance_url, {})
assert mock_init.called
def test_lister_opam_optional_instance(swh_scheduler):
"""Instance name should be optional and default to be built out of the netloc."""
netloc = "opam.ocaml.org"
instance_url = f"https://{netloc}"
- lister = OpamLister(swh_scheduler, url=instance_url,)
+ lister = OpamLister(
+ swh_scheduler,
+ url=instance_url,
+ )
assert lister.instance == netloc
assert lister.opam_root == "/tmp/opam/"
def test_urls(swh_scheduler, mock_opam, tmp_path):
mock_init, mock_popen = mock_opam
instance_url = "https://opam.ocaml.org"
tmp_folder = mkdtemp(dir=tmp_path, prefix="swh_opam_lister")
lister = OpamLister(
- swh_scheduler, url=instance_url, instance="opam", opam_root=tmp_folder,
+ swh_scheduler,
+ url=instance_url,
+ instance="opam",
+ opam_root=tmp_folder,
)
assert lister.instance == "opam"
assert lister.opam_root == tmp_folder
# call the lister and get all listed origins urls
stats = lister.run()
assert mock_init.called
assert mock_popen.called
assert stats.pages == 3
assert stats.origins == 3
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
expected_urls = [
f"opam+{instance_url}/packages/bar/",
f"opam+{instance_url}/packages/baz/",
f"opam+{instance_url}/packages/foo/",
]
result_urls = [origin.url for origin in scheduler_origins]
assert expected_urls == result_urls
def test_opam_binary(datadir, swh_scheduler, tmp_path):
instance_url = f"file://{datadir}/fake_opam_repo"
lister = OpamLister(
swh_scheduler,
url=instance_url,
instance="fake",
opam_root=mkdtemp(dir=tmp_path, prefix="swh_opam_lister"),
)
stats = lister.run()
assert stats.pages == 4
assert stats.origins == 4
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
expected_urls = [
f"opam+{instance_url}/packages/agrid/",
f"opam+{instance_url}/packages/calculon/",
f"opam+{instance_url}/packages/directories/",
f"opam+{instance_url}/packages/ocb/",
]
result_urls = [origin.url for origin in scheduler_origins]
assert expected_urls == result_urls
def test_opam_multi_instance(datadir, swh_scheduler, tmp_path):
instance_url = f"file://{datadir}/fake_opam_repo"
lister = OpamLister(
swh_scheduler,
url=instance_url,
instance="fake",
opam_root=mkdtemp(dir=tmp_path, prefix="swh_opam_lister"),
)
stats = lister.run()
assert stats.pages == 4
assert stats.origins == 4
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
expected_urls = [
f"opam+{instance_url}/packages/agrid/",
f"opam+{instance_url}/packages/calculon/",
f"opam+{instance_url}/packages/directories/",
f"opam+{instance_url}/packages/ocb/",
]
result_urls = [origin.url for origin in scheduler_origins]
assert expected_urls == result_urls
diff --git a/swh/lister/packagist/lister.py b/swh/lister/packagist/lister.py
index 9378691..19b4721 100644
--- a/swh/lister/packagist/lister.py
+++ b/swh/lister/packagist/lister.py
@@ -1,182 +1,184 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import dataclass
from datetime import datetime, timezone
import logging
from typing import Any, Dict, Iterator, List, Optional
import iso8601
import requests
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from .. import USER_AGENT
from ..pattern import CredentialsType, Lister
logger = logging.getLogger(__name__)
PackagistPageType = List[str]
@dataclass
class PackagistListerState:
"""State of Packagist lister"""
last_listing_date: Optional[datetime] = None
"""Last date when packagist lister was executed"""
class PackagistLister(Lister[PackagistListerState, PackagistPageType]):
"""
List all Packagist projects and send associated origins to scheduler.
The lister queries the Packagist API, whose documentation can be found at
https://packagist.org/apidoc.
For each package, its metadata are retrieved using Packagist API endpoints
whose responses are served from static files, which are guaranteed to be
efficient on the Packagist side (no dymamic queries).
Furthermore, subsequent listing will send the "If-Modified-Since" HTTP
header to only retrieve packages metadata updated since the previous listing
operation in order to save bandwidth and return only origins which might have
new released versions.
"""
LISTER_NAME = "Packagist"
PACKAGIST_PACKAGES_LIST_URL = "https://packagist.org/packages/list.json"
PACKAGIST_REPO_BASE_URL = "https://repo.packagist.org/p"
def __init__(
- self, scheduler: SchedulerInterface, credentials: CredentialsType = None,
+ self,
+ scheduler: SchedulerInterface,
+ credentials: CredentialsType = None,
):
super().__init__(
scheduler=scheduler,
url=self.PACKAGIST_PACKAGES_LIST_URL,
instance="packagist",
credentials=credentials,
)
self.session = requests.Session()
self.session.headers.update(
{"Accept": "application/json", "User-Agent": USER_AGENT}
)
self.listing_date = datetime.now().astimezone(tz=timezone.utc)
def state_from_dict(self, d: Dict[str, Any]) -> PackagistListerState:
last_listing_date = d.get("last_listing_date")
if last_listing_date is not None:
d["last_listing_date"] = iso8601.parse_date(last_listing_date)
return PackagistListerState(**d)
def state_to_dict(self, state: PackagistListerState) -> Dict[str, Any]:
d: Dict[str, Optional[str]] = {"last_listing_date": None}
last_listing_date = state.last_listing_date
if last_listing_date is not None:
d["last_listing_date"] = last_listing_date.isoformat()
return d
def api_request(self, url: str) -> Any:
logger.debug("Fetching URL %s", url)
response = self.session.get(url)
if response.status_code not in (200, 304):
logger.warning(
"Unexpected HTTP status code %s on %s: %s",
response.status_code,
response.url,
response.content,
)
response.raise_for_status()
# response is empty when status code is 304
return response.json() if response.status_code == 200 else {}
def get_pages(self) -> Iterator[PackagistPageType]:
"""
Yield a single page listing all Packagist projects.
"""
yield self.api_request(self.PACKAGIST_PACKAGES_LIST_URL)["packageNames"]
def get_origins_from_page(self, page: PackagistPageType) -> Iterator[ListedOrigin]:
"""
Iterate on all Packagist projects and yield ListedOrigin instances.
"""
assert self.lister_obj.id is not None
# save some bandwidth by only getting packages metadata updated since
# last listing
if self.state.last_listing_date is not None:
if_modified_since = self.state.last_listing_date.strftime(
"%a, %d %b %Y %H:%M:%S GMT"
)
self.session.headers["If-Modified-Since"] = if_modified_since
# to ensure origins will not be listed multiple times
origin_urls = set()
for package_name in page:
try:
metadata = self.api_request(
f"{self.PACKAGIST_REPO_BASE_URL}/{package_name}.json"
)
if not metadata.get("packages", {}):
# package metadata not updated since last listing
continue
if package_name not in metadata["packages"]:
# missing package metadata in response
continue
versions_info = metadata["packages"][package_name].values()
except requests.exceptions.HTTPError:
# error when getting package metadata (usually 404 when a
# package has been removed), skip it and process next package
continue
origin_url = None
visit_type = None
last_update = None
# extract origin url for package, vcs type and latest release date
for version_info in versions_info:
origin_url = version_info.get("source", {}).get("url", "")
if not origin_url:
continue
# can be git, hg or svn
visit_type = version_info.get("source", {}).get("type", "")
dist_time_str = version_info.get("time", "")
if not dist_time_str:
continue
dist_time = iso8601.parse_date(dist_time_str)
if last_update is None or dist_time > last_update:
last_update = dist_time
# skip package with already seen origin url or with missing required info
if visit_type is None or origin_url is None or origin_url in origin_urls:
continue
# bitbucket closed its mercurial hosting service, those origins can not be
# loaded into the archive anymore
if visit_type == "hg" and origin_url.startswith("https://bitbucket.org/"):
continue
origin_urls.add(origin_url)
logger.debug(
"Found package %s last updated on %s", package_name, last_update
)
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=origin_url,
visit_type=visit_type,
last_update=last_update,
)
def finalize(self) -> None:
self.state.last_listing_date = self.listing_date
self.updated = True
diff --git a/swh/lister/phabricator/lister.py b/swh/lister/phabricator/lister.py
index 1dbee37..83ddc31 100644
--- a/swh/lister/phabricator/lister.py
+++ b/swh/lister/phabricator/lister.py
@@ -1,183 +1,185 @@
# Copyright (C) 2019-2021 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
import logging
import random
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import urljoin
import requests
from swh.lister import USER_AGENT
from swh.lister.pattern import CredentialsType, StatelessLister
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
logger = logging.getLogger(__name__)
PageType = List[Dict[str, Any]]
class PhabricatorLister(StatelessLister[PageType]):
"""
List all repositories hosted on a Phabricator instance.
Args:
url: base URL of a phabricator forge
(for instance https://forge.softwareheritage.org)
instance: string identifier for the listed forge,
URL network location will be used if not provided
api_token: authentication token for Conduit API
"""
LISTER_NAME = "phabricator"
API_REPOSITORY_PATH = "/api/diffusion.repository.search"
def __init__(
self,
scheduler: SchedulerInterface,
url: str,
instance: Optional[str] = None,
api_token: Optional[str] = None,
credentials: CredentialsType = None,
):
super().__init__(
scheduler, urljoin(url, self.API_REPOSITORY_PATH), instance, credentials
)
self.session = requests.Session()
self.session.headers.update(
{"Accept": "application/json", "User-Agent": USER_AGENT}
)
if api_token is not None:
self.api_token = api_token
else:
if not self.credentials:
raise ValueError(
f"No credentials found for phabricator instance {self.instance};"
" Please set them in the lister configuration file."
)
self.api_token = random.choice(self.credentials)["password"]
def get_request_params(self, after: Optional[str]) -> Dict[str, str]:
"""Get the query parameters for the request."""
base_params = {
# Stable order
"order": "oldest",
# Add all URIs to the response
"attachments[uris]": "1",
# API token from stored credentials
"api.token": self.api_token,
}
if after is not None:
base_params["after"] = after
return base_params
@staticmethod
def filter_params(params: Dict[str, str]) -> Dict[str, str]:
"""Filter the parameters for debug purposes"""
return {
k: (v if k != "api.token" else "**redacted**") for k, v in params.items()
}
def get_pages(self) -> Iterator[PageType]:
after: Optional[str] = None
while True:
params = self.get_request_params(after)
logger.debug(
"Retrieving results on URI %s with parameters %s",
self.url,
self.filter_params(params),
)
response = self.session.post(self.url, data=params)
if response.status_code != 200:
logger.warning(
"Unexpected HTTP status code %s on %s: %s",
response.status_code,
response.url,
response.content,
)
response.raise_for_status()
response_data = response.json()
if response_data.get("result") is None:
logger.warning(
- "Got unexpected response on %s: %s", response.url, response_data,
+ "Got unexpected response on %s: %s",
+ response.url,
+ response_data,
)
break
result = response_data["result"]
yield result["data"]
after = None
if "cursor" in result and "after" in result["cursor"]:
after = result["cursor"]["after"]
if not after:
logger.debug("Empty `after` cursor. All done")
break
def get_origins_from_page(self, page: PageType) -> Iterator[ListedOrigin]:
assert self.lister_obj.id is not None
for repo in page:
url = get_repo_url(repo["attachments"]["uris"]["uris"])
if url is None:
short_name: Optional[str] = None
for field in "shortName", "name", "callsign":
short_name = repo["fields"].get(field)
if short_name:
break
logger.warning(
"No valid url for repository [%s] (phid=%s)",
short_name or repo["phid"],
repo["phid"],
)
continue
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=url,
visit_type=repo["fields"]["vcs"],
# The "dateUpdated" field returned by the Phabricator API only refers to
# the repository metadata; We can't use it for our purposes.
last_update=None,
)
def get_repo_url(attachments: List[Dict[str, Any]]) -> Optional[str]:
"""
Return url for a hosted repository from its uris attachments according
to the following priority lists:
* protocol: https > http
* identifier: shortname > callsign > id
"""
processed_urls = defaultdict(dict) # type: Dict[str, Any]
for uri in attachments:
protocol = uri["fields"]["builtin"]["protocol"]
url = uri["fields"]["uri"]["effective"]
identifier = uri["fields"]["builtin"]["identifier"]
if protocol in ("http", "https"):
processed_urls[protocol][identifier] = url
elif protocol is None:
for protocol in ("https", "http"):
if url.startswith(protocol):
processed_urls[protocol]["undefined"] = url
break
for protocol in ["https", "http"]:
for identifier in ["shortname", "callsign", "id", "undefined"]:
if protocol in processed_urls and identifier in processed_urls[protocol]:
return processed_urls[protocol][identifier]
return None
diff --git a/swh/lister/phabricator/tests/test_lister.py b/swh/lister/phabricator/tests/test_lister.py
index a21d302..a638c40 100644
--- a/swh/lister/phabricator/tests/test_lister.py
+++ b/swh/lister/phabricator/tests/test_lister.py
@@ -1,135 +1,137 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from pathlib import Path
import pytest
from requests.exceptions import HTTPError
from swh.lister import USER_AGENT
from swh.lister.phabricator.lister import PhabricatorLister, get_repo_url
@pytest.fixture
def phabricator_repositories_page1(datadir):
return json.loads(
Path(datadir, "phabricator_api_repositories_page1.json").read_text()
)
@pytest.fixture
def phabricator_repositories_page2(datadir):
return json.loads(
Path(datadir, "phabricator_api_repositories_page2.json").read_text()
)
def test_get_repo_url(phabricator_repositories_page1):
repos = phabricator_repositories_page1["result"]["data"]
for repo in repos:
expected_name = "https://forge.softwareheritage.org/source/%s.git" % (
repo["fields"]["shortName"]
)
assert get_repo_url(repo["attachments"]["uris"]["uris"]) == expected_name
def test_get_repo_url_undefined_protocol():
undefined_protocol_uris = [
{
"fields": {
"uri": {
"raw": "https://svn.blender.org/svnroot/bf-blender/",
"display": "https://svn.blender.org/svnroot/bf-blender/",
"effective": "https://svn.blender.org/svnroot/bf-blender/",
"normalized": "svn.blender.org/svnroot/bf-blender",
},
"builtin": {"protocol": None, "identifier": None},
},
}
]
expected_name = "https://svn.blender.org/svnroot/bf-blender/"
assert get_repo_url(undefined_protocol_uris) == expected_name
def test_lister_url_param(swh_scheduler):
FORGE_BASE_URL = "https://forge.softwareheritage.org"
API_REPOSITORY_PATH = "/api/diffusion.repository.search"
for url in (
FORGE_BASE_URL,
f"{FORGE_BASE_URL}/",
f"{FORGE_BASE_URL}/{API_REPOSITORY_PATH}",
f"{FORGE_BASE_URL}/{API_REPOSITORY_PATH}/",
):
lister = PhabricatorLister(
scheduler=swh_scheduler, url=FORGE_BASE_URL, instance="swh", api_token="foo"
)
expected_url = f"{FORGE_BASE_URL}{API_REPOSITORY_PATH}"
assert lister.url == expected_url
def test_lister(
swh_scheduler,
requests_mock,
phabricator_repositories_page1,
phabricator_repositories_page2,
):
FORGE_BASE_URL = "https://forge.softwareheritage.org"
API_TOKEN = "foo"
lister = PhabricatorLister(
scheduler=swh_scheduler, url=FORGE_BASE_URL, instance="swh", api_token=API_TOKEN
)
def match_request(request):
return (
request.headers.get("User-Agent") == USER_AGENT
and f"api.token={API_TOKEN}" in request.body
)
requests_mock.post(
f"{FORGE_BASE_URL}{lister.API_REPOSITORY_PATH}",
[
{"json": phabricator_repositories_page1},
{"json": phabricator_repositories_page2},
],
additional_matcher=match_request,
)
stats = lister.run()
expected_nb_origins = len(phabricator_repositories_page1["result"]["data"]) * 2
assert stats.pages == 2
assert stats.origins == expected_nb_origins
scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results
assert len(scheduler_origins) == expected_nb_origins
def test_lister_request_error(
- swh_scheduler, requests_mock, phabricator_repositories_page1,
+ swh_scheduler,
+ requests_mock,
+ phabricator_repositories_page1,
):
FORGE_BASE_URL = "https://forge.softwareheritage.org"
lister = PhabricatorLister(
scheduler=swh_scheduler, url=FORGE_BASE_URL, instance="swh", api_token="foo"
)
requests_mock.post(
f"{FORGE_BASE_URL}{lister.API_REPOSITORY_PATH}",
[
{"status_code": 200, "json": phabricator_repositories_page1},
{"status_code": 500, "reason": "Internal Server Error"},
],
)
with pytest.raises(HTTPError):
lister.run()
diff --git a/swh/lister/pypi/lister.py b/swh/lister/pypi/lister.py
index abc27ec..eefd797 100644
--- a/swh/lister/pypi/lister.py
+++ b/swh/lister/pypi/lister.py
@@ -1,181 +1,177 @@
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
import logging
from time import sleep
from typing import Any, Dict, Iterator, List, Optional, Tuple
from xmlrpc.client import Fault, ServerProxy
from tenacity.before_sleep import before_sleep_log
from swh.lister.utils import throttling_retry
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from ..pattern import CredentialsType, Lister
logger = logging.getLogger(__name__)
# Type returned by the XML-RPC changelog call:
# package, version, release timestamp, description, serial
ChangelogEntry = Tuple[str, str, int, str, int]
# Manipulated package updated type which is a subset information
# of the ChangelogEntry type: package, max release date
PackageUpdate = Tuple[str, datetime]
# Type returned by listing a page of results
PackageListPage = List[PackageUpdate]
@dataclass
class PyPIListerState:
"""State of PyPI lister"""
last_serial: Optional[int] = None
"""Last seen serial when visiting the pypi instance"""
def _if_rate_limited(retry_state) -> bool:
"""Custom tenacity retry predicate to handle xmlrpc client error:
.. code::
xmlrpc.client.Fault:
"""
attempt = retry_state.outcome
return attempt.failed and isinstance(attempt.exception(), Fault)
def pypi_url(package_name: str) -> str:
- """Build pypi url out of a package name.
-
- """
+ """Build pypi url out of a package name."""
return PyPILister.PACKAGE_URL.format(package_name=package_name)
class PyPILister(Lister[PyPIListerState, PackageListPage]):
- """List origins from PyPI.
-
- """
+ """List origins from PyPI."""
LISTER_NAME = "pypi"
INSTANCE = "pypi" # As of today only the main pypi.org is used
PACKAGE_LIST_URL = "https://pypi.org/pypi" # XML-RPC url
PACKAGE_URL = "https://pypi.org/project/{package_name}/"
def __init__(
self,
scheduler: SchedulerInterface,
credentials: Optional[CredentialsType] = None,
):
super().__init__(
scheduler=scheduler,
url=self.PACKAGE_LIST_URL,
instance=self.INSTANCE,
credentials=credentials,
)
# used as termination condition and if useful, becomes the new state when the
# visit is done
self.last_processed_serial: Optional[int] = None
def state_from_dict(self, d: Dict[str, Any]) -> PyPIListerState:
return PyPIListerState(last_serial=d.get("last_serial"))
def state_to_dict(self, state: PyPIListerState) -> Dict[str, Any]:
return asdict(state)
@throttling_retry(
retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING)
)
def _changelog_last_serial(self, client: ServerProxy) -> int:
"""Internal detail to allow throttling when calling the changelog last entry"""
serial = client.changelog_last_serial()
assert isinstance(serial, int)
return serial
@throttling_retry(
retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING)
)
def _changelog_since_serial(
self, client: ServerProxy, serial: int
) -> List[ChangelogEntry]:
"""Internal detail to allow throttling when calling the changelog listing"""
sleep(1) # to avoid the initial warning about throttling
return client.changelog_since_serial(serial) # type: ignore
def get_pages(self) -> Iterator[PackageListPage]:
"""Iterate other changelog events per package, determine the max release date for that
package and use that max release date as last_update. When the execution is
done, this will also set the self.last_processed_serial attribute so we can
finalize the state of the lister for the next visit.
Yields:
List of Tuple of (package-name, max release-date)
"""
client = ServerProxy(self.url)
last_processed_serial = -1
if self.state.last_serial is not None:
last_processed_serial = self.state.last_serial
upstream_last_serial = self._changelog_last_serial(client)
# Paginate through result of pypi, until we read everything
while last_processed_serial < upstream_last_serial:
updated_packages = defaultdict(list)
for package, _, release_date, _, serial in self._changelog_since_serial(
client, last_processed_serial
):
updated_packages[package].append(release_date)
# Compute the max serial so we can stop when done
last_processed_serial = max(last_processed_serial, serial)
# Returns pages of result to flush regularly
yield [
(
pypi_url(package),
datetime.fromtimestamp(max(release_dates)).replace(
tzinfo=timezone.utc
),
)
for package, release_dates in updated_packages.items()
]
self.last_processed_serial = upstream_last_serial
def get_origins_from_page(
self, packages: PackageListPage
) -> Iterator[ListedOrigin]:
"""Convert a page of PyPI repositories into a list of ListedOrigins."""
assert self.lister_obj.id is not None
for origin, last_update in packages:
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=origin,
visit_type="pypi",
last_update=last_update,
)
def finalize(self):
"""Finalize the visit state by updating with the new last_serial if updates
- actually happened.
+ actually happened.
"""
self.updated = (
self.state
and self.state.last_serial
and self.last_processed_serial
and self.state.last_serial < self.last_processed_serial
) or (not self.state.last_serial and self.last_processed_serial)
if self.updated:
self.state.last_serial = self.last_processed_serial
diff --git a/swh/lister/sourceforge/lister.py b/swh/lister/sourceforge/lister.py
index 6199240..c95a089 100644
--- a/swh/lister/sourceforge/lister.py
+++ b/swh/lister/sourceforge/lister.py
@@ -1,427 +1,424 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import dataclass, field
import datetime
from enum import Enum
import logging
import re
from typing import Any, Dict, Iterator, List, Optional, Set, Tuple
from xml.etree import ElementTree
from bs4 import BeautifulSoup
import iso8601
import requests
from tenacity.before_sleep import before_sleep_log
from swh.core.api.classes import stream_results
from swh.lister.utils import retry_policy_generic, throttling_retry
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from .. import USER_AGENT
from ..pattern import CredentialsType, Lister
logger = logging.getLogger(__name__)
class VcsNames(Enum):
"""Used to filter SourceForge tool names for valid VCS types"""
# CVS projects are read-only
CVS = "cvs"
GIT = "git"
SUBVERSION = "svn"
MERCURIAL = "hg"
BAZAAR = "bzr"
VCS_NAMES = set(v.value for v in VcsNames.__members__.values())
@dataclass
class SourceForgeListerEntry:
vcs: VcsNames
url: str
last_modified: datetime.date
SubSitemapNameT = str
ProjectNameT = str
# SourceForge only offers day-level granularity, which is good enough for our purposes
LastModifiedT = datetime.date
@dataclass
class SourceForgeListerState:
- """Current state of the SourceForge lister in incremental runs
- """
+ """Current state of the SourceForge lister in incremental runs"""
"""If the subsitemap does not exist, we assume a full run of this subsitemap
is needed. If the date is the same, we skip the subsitemap, otherwise we
request the subsitemap and look up every project's "last modified" date
to compare against `ListedOrigins` from the database."""
subsitemap_last_modified: Dict[SubSitemapNameT, LastModifiedT] = field(
default_factory=dict
)
"""Some projects (not the majority, but still meaningful) have no VCS for us to
archive. We need to remember a mapping of their API URL to their "last modified"
date so we don't keep querying them needlessly every time."""
empty_projects: Dict[str, LastModifiedT] = field(default_factory=dict)
SourceForgeListerPage = List[SourceForgeListerEntry]
MAIN_SITEMAP_URL = "https://sourceforge.net/allura_sitemap/sitemap.xml"
SITEMAP_XML_NAMESPACE = "{http://www.sitemaps.org/schemas/sitemap/0.9}"
# API resource endpoint for information about the given project.
#
# `namespace`: Project namespace. Very often `p`, but can be something else like
# `adobe`
# `project`: Project name, e.g. `seedai`. Can be a subproject, e.g `backapps/website`.
PROJECT_API_URL_FORMAT = "https://sourceforge.net/rest/{namespace}/{project}"
# Predictable URL for cloning (in the broad sense) a VCS registered for the project.
#
# Warning: does not apply to bzr repos, and Mercurial are http only, see use of this
# constant below.
#
# `vcs`: VCS type, one of `VCS_NAMES`
# `namespace`: Project namespace. Very often `p`, but can be something else like
# `adobe`.
# `project`: Project name, e.g. `seedai`. Can be a subproject, e.g `backapps/website`.
# `mount_point`: url path used by the repo. For example, the Code::Blocks project uses
# `git` (https://git.code.sf.net/p/codeblocks/git).
CLONE_URL_FORMAT = "https://{vcs}.code.sf.net/{namespace}/{project}/{mount_point}"
PROJ_URL_RE = re.compile(
r"^https://sourceforge.net/(?P[^/]+)/(?P[^/]+)/(?P.*)?"
)
# Mapping of `(namespace, project name)` to `last modified` date.
ProjectsLastModifiedCache = Dict[Tuple[str, str], LastModifiedT]
class SourceForgeLister(Lister[SourceForgeListerState, SourceForgeListerPage]):
- """List origins from the "SourceForge" forge.
-
- """
+ """List origins from the "SourceForge" forge."""
# Part of the lister API, that identifies this lister
LISTER_NAME = "sourceforge"
def __init__(
self,
scheduler: SchedulerInterface,
incremental: bool = False,
credentials: Optional[CredentialsType] = None,
):
super().__init__(
scheduler=scheduler,
url="https://sourceforge.net",
instance="main",
credentials=credentials,
)
# Will hold the currently saved "last modified" dates to compare against our
# requests.
self._project_last_modified: Optional[ProjectsLastModifiedCache] = None
self.session = requests.Session()
# Declare the USER_AGENT is more sysadm-friendly for the forge we list
self.session.headers.update(
{"Accept": "application/json", "User-Agent": USER_AGENT}
)
self.incremental = incremental
def state_from_dict(self, d: Dict[str, Dict[str, Any]]) -> SourceForgeListerState:
subsitemaps = {
k: datetime.date.fromisoformat(v)
for k, v in d.get("subsitemap_last_modified", {}).items()
}
empty_projects = {
k: datetime.date.fromisoformat(v)
for k, v in d.get("empty_projects", {}).items()
}
return SourceForgeListerState(
subsitemap_last_modified=subsitemaps, empty_projects=empty_projects
)
def state_to_dict(self, state: SourceForgeListerState) -> Dict[str, Any]:
return {
"subsitemap_last_modified": {
k: v.isoformat() for k, v in state.subsitemap_last_modified.items()
},
"empty_projects": {
k: v.isoformat() for k, v in state.empty_projects.items()
},
}
def projects_last_modified(self) -> ProjectsLastModifiedCache:
if not self.incremental:
# No point in loading the previous results if we're doing a full run
return {}
if self._project_last_modified is not None:
return self._project_last_modified
# We know there will be at least that many origins
stream = stream_results(
self.scheduler.get_listed_origins, self.lister_obj.id, limit=300_000
)
listed_origins = dict()
# Projects can have slashes in them if they're subprojects, but the
# mointpoint (last component) cannot.
url_match = re.compile(
r".*\.code\.sf\.net/(?P[^/]+)/(?P.+)/.*"
)
bzr_url_match = re.compile(
r"http://(?P[^/]+).bzr.sourceforge.net/bzrroot/([^/]+)"
)
cvs_url_match = re.compile(
r"rsync://a.cvs.sourceforge.net/cvsroot/(?P.+)/([^/]+)"
)
for origin in stream:
url = origin.url
match = url_match.match(url)
if match is None:
# Could be a bzr or cvs special endpoint
bzr_match = bzr_url_match.match(url)
cvs_match = cvs_url_match.match(url)
matches = None
if bzr_match is not None:
matches = bzr_match.groupdict()
elif cvs_match is not None:
matches = cvs_match.groupdict()
assert matches
project = matches["project"]
namespace = "p" # no special namespacing for bzr and cvs projects
else:
matches = match.groupdict()
namespace = matches["namespace"]
project = matches["project"]
# "Last modified" dates are the same across all VCS (tools, even)
# within a project or subproject. An assertion here would be overkill.
last_modified = origin.last_update
assert last_modified is not None
listed_origins[(namespace, project)] = last_modified.date()
self._project_last_modified = listed_origins
return listed_origins
@throttling_retry(
retry=retry_policy_generic,
before_sleep=before_sleep_log(logger, logging.WARNING),
)
def page_request(self, url, params) -> requests.Response:
# Log listed URL to ease debugging
logger.debug("Fetching URL %s with params %s", url, params)
response = self.session.get(url, params=params)
if response.status_code != 200:
# Log response content to ease debugging
logger.warning(
"Unexpected HTTP status code %s for URL %s",
response.status_code,
response.url,
)
# The lister must fail on blocking errors
response.raise_for_status()
return response
def get_pages(self) -> Iterator[SourceForgeListerPage]:
"""
SourceForge has a main XML sitemap that lists its sharded sitemaps for all
projects.
Each XML sub-sitemap lists project pages, which are not unique per project: a
project can have a wiki, a home, a git, an svn, etc.
For each unique project, we query an API endpoint that lists (among
other things) the tools associated with said project, some of which are
the VCS used. Subprojects are considered separate projects.
Lastly we use the information of which VCS are used to build the predictable
clone URL for any given VCS.
"""
sitemap_contents = self.page_request(MAIN_SITEMAP_URL, {}).text
tree = ElementTree.fromstring(sitemap_contents)
for subsitemap in tree.iterfind(f"{SITEMAP_XML_NAMESPACE}sitemap"):
last_modified_el = subsitemap.find(f"{SITEMAP_XML_NAMESPACE}lastmod")
assert last_modified_el is not None and last_modified_el.text is not None
last_modified = datetime.date.fromisoformat(last_modified_el.text)
location = subsitemap.find(f"{SITEMAP_XML_NAMESPACE}loc")
assert location is not None and location.text is not None
sub_url = location.text
if self.incremental:
recorded_last_mod = self.state.subsitemap_last_modified.get(sub_url)
if recorded_last_mod == last_modified:
# The entire subsitemap hasn't changed, so none of its projects
# have either, skip it.
continue
self.state.subsitemap_last_modified[sub_url] = last_modified
subsitemap_contents = self.page_request(sub_url, {}).text
subtree = ElementTree.fromstring(subsitemap_contents)
yield from self._get_pages_from_subsitemap(subtree)
def get_origins_from_page(
self, page: SourceForgeListerPage
) -> Iterator[ListedOrigin]:
assert self.lister_obj.id is not None
for hit in page:
last_modified: str = str(hit.last_modified)
last_update: datetime.datetime = iso8601.parse_date(last_modified)
yield ListedOrigin(
lister_id=self.lister_obj.id,
visit_type=hit.vcs.value,
url=hit.url,
last_update=last_update,
)
def _get_pages_from_subsitemap(
self, subtree: ElementTree.Element
) -> Iterator[SourceForgeListerPage]:
projects: Set[ProjectNameT] = set()
for project_block in subtree.iterfind(f"{SITEMAP_XML_NAMESPACE}url"):
last_modified_block = project_block.find(f"{SITEMAP_XML_NAMESPACE}lastmod")
assert last_modified_block is not None
last_modified = last_modified_block.text
location = project_block.find(f"{SITEMAP_XML_NAMESPACE}loc")
assert location is not None
project_url = location.text
assert project_url is not None
match = PROJ_URL_RE.match(project_url)
if match:
matches = match.groupdict()
namespace = matches["namespace"]
if namespace == "projects":
# These have a `p`-namespaced counterpart, use that instead
continue
project = matches["project"]
rest = matches["rest"]
if rest.count("/") > 1:
# This is a subproject. There exists no sub-subprojects.
subproject_name = rest.rsplit("/", 2)[0]
project = f"{project}/{subproject_name}"
prev_len = len(projects)
projects.add(project)
if prev_len == len(projects):
# Already seen
continue
pages = self._get_pages_for_project(namespace, project, last_modified)
if pages:
yield pages
else:
logger.debug("Project '%s' does not have any VCS", project)
else:
# Should almost always match, let's log it
# The only ones that don't match are mostly specialized one-off URLs.
msg = "Project URL '%s' does not match expected pattern"
logger.warning(msg, project_url)
def _get_pages_for_project(
self, namespace, project, last_modified
) -> SourceForgeListerPage:
endpoint = PROJECT_API_URL_FORMAT.format(namespace=namespace, project=project)
empty_project_last_modified = self.state.empty_projects.get(endpoint)
if empty_project_last_modified is not None:
if last_modified == empty_project_last_modified.isoformat():
# Project has not changed, so is still empty, meaning it has
# no VCS attached that we can archive.
logger.debug(f"Project {namespace}/{project} is still empty")
return []
if self.incremental:
expected = self.projects_last_modified().get((namespace, project))
if expected is not None:
if expected.isoformat() == last_modified:
# Project has not changed
logger.debug(f"Project {namespace}/{project} has not changed")
return []
else:
logger.debug(f"Project {namespace}/{project} was updated")
else:
msg = "New project during an incremental run: %s/%s"
logger.debug(msg, namespace, project)
try:
res = self.page_request(endpoint, {}).json()
except requests.HTTPError:
# We've already logged in `page_request`
return []
tools = res.get("tools")
if tools is None:
# This rarely happens, on very old URLs
logger.warning("Project '%s' does not have any tools", endpoint)
return []
hits = []
for tool in tools:
tool_name = tool["name"]
if tool_name not in VCS_NAMES:
continue
if tool_name == VcsNames.CVS.value:
# CVS projects are different from other VCS ones, they use the rsync
# protocol, a list of modules needs to be fetched from an info page
# and multiple origin URLs can be produced for a same project.
cvs_info_url = f"http://{project}.cvs.sourceforge.net"
try:
response = self.page_request(cvs_info_url, params={})
except requests.HTTPError:
logger.warning(
"CVS info page could not be fetched, skipping project '%s'",
project,
)
continue
else:
bs = BeautifulSoup(response.text, features="html.parser")
cvs_base_url = "rsync://a.cvs.sourceforge.net/cvsroot"
for text in [b.text for b in bs.find_all("b")]:
- match = re.search(fr".*/cvsroot/{project} co -P (.+)", text)
+ match = re.search(rf".*/cvsroot/{project} co -P (.+)", text)
if match is not None:
module = match.group(1)
url = f"{cvs_base_url}/{project}/{module}"
hits.append(
SourceForgeListerEntry(
vcs=VcsNames(tool_name),
url=url,
last_modified=last_modified,
)
)
continue
url = CLONE_URL_FORMAT.format(
vcs=tool_name,
namespace=namespace,
project=project,
mount_point=tool["mount_point"],
)
if tool_name == VcsNames.MERCURIAL.value:
# SourceForge does not yet support anonymous HTTPS cloning for Mercurial
# See https://sourceforge.net/p/forge/feature-requests/727/
url = url.replace("https://", "http://")
if tool_name == VcsNames.BAZAAR.value:
# SourceForge has removed support for bzr and only keeps legacy projects
# around at a separate (also not https) URL. Bzr projects are very rare
# and a lot of them are 404 now.
url = f"http://{project}.bzr.sourceforge.net/bzrroot/{project}"
entry = SourceForgeListerEntry(
vcs=VcsNames(tool_name), url=url, last_modified=last_modified
)
hits.append(entry)
if not hits:
date = datetime.date.fromisoformat(last_modified)
self.state.empty_projects[endpoint] = date
else:
self.state.empty_projects.pop(endpoint, None)
return hits
diff --git a/swh/lister/tests/test_cli.py b/swh/lister/tests/test_cli.py
index e2585df..dfaf72b 100644
--- a/swh/lister/tests/test_cli.py
+++ b/swh/lister/tests/test_cli.py
@@ -1,48 +1,55 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
from swh.lister.cli import SUPPORTED_LISTERS, get_lister
lister_args = {
- "cgit": {"url": "https://git.eclipse.org/c/",},
+ "cgit": {
+ "url": "https://git.eclipse.org/c/",
+ },
"phabricator": {
"instance": "softwareheritage",
"url": "https://forge.softwareheritage.org/api/diffusion.repository.search",
"api_token": "bogus",
},
- "gitea": {"url": "https://try.gitea.io/api/v1/",},
- "tuleap": {"url": "https://tuleap.net",},
- "gitlab": {"url": "https://gitlab.ow2.org/api/v4", "instance": "ow2",},
+ "gitea": {
+ "url": "https://try.gitea.io/api/v1/",
+ },
+ "tuleap": {
+ "url": "https://tuleap.net",
+ },
+ "gitlab": {
+ "url": "https://gitlab.ow2.org/api/v4",
+ "instance": "ow2",
+ },
"opam": {"url": "https://opam.ocaml.org", "instance": "opam"},
"maven": {
"url": "https://repo1.maven.org/maven2/",
"index_url": "http://indexes/export.fld",
},
}
def test_get_lister_wrong_input():
"""Unsupported lister should raise"""
with pytest.raises(ValueError) as e:
get_lister("unknown", "db-url")
assert "Invalid lister" in str(e.value)
def test_get_lister(swh_scheduler_config):
- """Instantiating a supported lister should be ok
-
- """
+ """Instantiating a supported lister should be ok"""
# Drop launchpad lister from the lister to check, its test setup is more involved
# than the other listers and it's not currently done here
for lister_name in SUPPORTED_LISTERS:
lst = get_lister(
lister_name,
scheduler={"cls": "local", **swh_scheduler_config},
**lister_args.get(lister_name, {}),
)
assert hasattr(lst, "run")
diff --git a/swh/lister/tests/test_pattern.py b/swh/lister/tests/test_pattern.py
index 795b715..192f8f7 100644
--- a/swh/lister/tests/test_pattern.py
+++ b/swh/lister/tests/test_pattern.py
@@ -1,198 +1,200 @@
# Copyright (C) 2020-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import TYPE_CHECKING, Any, Dict, Iterator, List
import pytest
from swh.lister import pattern
from swh.scheduler.model import ListedOrigin
StateType = Dict[str, str]
OriginType = Dict[str, str]
PageType = List[OriginType]
class InstantiableLister(pattern.Lister[StateType, PageType]):
"""A lister that can only be instantiated, not run."""
LISTER_NAME = "test-pattern-lister"
def state_from_dict(self, d: Dict[str, str]) -> StateType:
return d
def test_instantiation(swh_scheduler):
lister = InstantiableLister(
scheduler=swh_scheduler, url="https://example.com", instance="example.com"
)
# check the lister was registered in the scheduler backend
stored_lister = swh_scheduler.get_or_create_lister(
name="test-pattern-lister", instance_name="example.com"
)
assert stored_lister == lister.lister_obj
with pytest.raises(NotImplementedError):
lister.run()
def test_lister_instance_name(swh_scheduler):
lister = InstantiableLister(
scheduler=swh_scheduler, url="https://example.org", instance="example"
)
assert lister.instance == "example"
lister = InstantiableLister(scheduler=swh_scheduler, url="https://example.org")
assert lister.instance == "example.org"
def test_instantiation_from_configfile(swh_scheduler, mocker):
mock_load_from_envvar = mocker.patch("swh.lister.pattern.load_from_envvar")
mock_get_scheduler = mocker.patch("swh.lister.pattern.get_scheduler")
mock_load_from_envvar.return_value = {
"scheduler": {},
"url": "foo",
"instance": "bar",
}
mock_get_scheduler.return_value = swh_scheduler
lister = InstantiableLister.from_configfile()
assert lister.url == "foo"
assert lister.instance == "bar"
lister = InstantiableLister.from_configfile(url="bar", instance="foo")
assert lister.url == "bar"
assert lister.instance == "foo"
lister = InstantiableLister.from_configfile(url=None, instance="foo")
assert lister.url == "foo"
assert lister.instance == "foo"
if TYPE_CHECKING:
_Base = pattern.Lister[Any, PageType]
else:
_Base = object
class ListerMixin(_Base):
def get_pages(self) -> Iterator[PageType]:
for pageno in range(2):
yield [
{"url": f"https://example.com/{pageno:02d}{i:03d}"} for i in range(10)
]
def get_origins_from_page(self, page: PageType) -> Iterator[ListedOrigin]:
assert self.lister_obj.id is not None
for origin in page:
yield ListedOrigin(
lister_id=self.lister_obj.id, url=origin["url"], visit_type="git"
)
def check_listed_origins(swh_scheduler, lister, stored_lister):
"""Check that the listed origins match the ones in the lister"""
# Gather the origins that are supposed to be listed
lister_urls = sorted(
sum([[o["url"] for o in page] for page in lister.get_pages()], [])
)
# And check the state of origins in the scheduler
ret = swh_scheduler.get_listed_origins()
assert ret.next_page_token is None
assert len(ret.results) == len(lister_urls)
for origin, expected_url in zip(ret.results, lister_urls):
assert origin.url == expected_url
assert origin.lister_id == stored_lister.id
class RunnableLister(ListerMixin, InstantiableLister):
"""A lister that can be run."""
def state_to_dict(self, state: StateType) -> Dict[str, str]:
return state
def finalize(self) -> None:
self.state["updated"] = "yes"
self.updated = True
def test_run(swh_scheduler):
lister = RunnableLister(
scheduler=swh_scheduler, url="https://example.com", instance="example.com"
)
assert "updated" not in lister.state
update_date = lister.lister_obj.updated
run_result = lister.run()
assert run_result.pages == 2
assert run_result.origins == 20
stored_lister = swh_scheduler.get_or_create_lister(
name="test-pattern-lister", instance_name="example.com"
)
# Check that the finalize operation happened
assert stored_lister.updated > update_date
assert stored_lister.current_state["updated"] == "yes"
check_listed_origins(swh_scheduler, lister, stored_lister)
class InstantiableStatelessLister(pattern.StatelessLister[PageType]):
LISTER_NAME = "test-stateless-lister"
def test_stateless_instantiation(swh_scheduler):
lister = InstantiableStatelessLister(
- scheduler=swh_scheduler, url="https://example.com", instance="example.com",
+ scheduler=swh_scheduler,
+ url="https://example.com",
+ instance="example.com",
)
# check the lister was registered in the scheduler backend
stored_lister = swh_scheduler.get_or_create_lister(
name="test-stateless-lister", instance_name="example.com"
)
assert stored_lister == lister.lister_obj
assert stored_lister.current_state == {}
assert lister.state is None
with pytest.raises(NotImplementedError):
lister.run()
class RunnableStatelessLister(ListerMixin, InstantiableStatelessLister):
def finalize(self):
self.updated = True
def test_stateless_run(swh_scheduler):
lister = RunnableStatelessLister(
scheduler=swh_scheduler, url="https://example.com", instance="example.com"
)
update_date = lister.lister_obj.updated
run_result = lister.run()
assert run_result.pages == 2
assert run_result.origins == 20
stored_lister = swh_scheduler.get_or_create_lister(
name="test-stateless-lister", instance_name="example.com"
)
# Check that the finalize operation happened
assert stored_lister.updated > update_date
assert stored_lister.current_state == {}
# And that all origins are stored
check_listed_origins(swh_scheduler, lister, stored_lister)
diff --git a/swh/lister/tests/test_utils.py b/swh/lister/tests/test_utils.py
index acb73fe..6d9b50d 100644
--- a/swh/lister/tests/test_utils.py
+++ b/swh/lister/tests/test_utils.py
@@ -1,113 +1,126 @@
# Copyright (C) 2018-2021 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
import requests
from requests.status_codes import codes
from tenacity.wait import wait_fixed
from swh.lister.utils import (
MAX_NUMBER_ATTEMPTS,
WAIT_EXP_BASE,
split_range,
throttling_retry,
)
@pytest.mark.parametrize(
"total_pages,nb_pages,expected_ranges",
[
(14, 5, [(0, 4), (5, 9), (10, 14)]),
(19, 10, [(0, 9), (10, 19)]),
(20, 3, [(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 20)]),
- (21, 3, [(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 21),],),
+ (
+ 21,
+ 3,
+ [
+ (0, 2),
+ (3, 5),
+ (6, 8),
+ (9, 11),
+ (12, 14),
+ (15, 17),
+ (18, 21),
+ ],
+ ),
],
)
def test_split_range(total_pages, nb_pages, expected_ranges):
actual_ranges = list(split_range(total_pages, nb_pages))
assert actual_ranges == expected_ranges
@pytest.mark.parametrize("total_pages,nb_pages", [(None, 1), (100, None)])
def test_split_range_errors(total_pages, nb_pages):
for total_pages, nb_pages in [(None, 1), (100, None)]:
with pytest.raises(TypeError):
next(split_range(total_pages, nb_pages))
TEST_URL = "https://example.og/api/repositories"
@throttling_retry()
def make_request():
response = requests.get(TEST_URL)
response.raise_for_status()
return response
def assert_sleep_calls(mocker, mock_sleep, sleep_params):
mock_sleep.assert_has_calls([mocker.call(param) for param in sleep_params])
def test_throttling_retry(requests_mock, mocker):
data = {"result": {}}
requests_mock.get(
TEST_URL,
[
{"status_code": codes.too_many_requests},
{"status_code": codes.too_many_requests},
{"status_code": codes.ok, "json": data},
],
)
mock_sleep = mocker.patch.object(make_request.retry, "sleep")
response = make_request()
assert_sleep_calls(mocker, mock_sleep, [1, WAIT_EXP_BASE])
assert response.json() == data
def test_throttling_retry_max_attemps(requests_mock, mocker):
requests_mock.get(
- TEST_URL, [{"status_code": codes.too_many_requests}] * (MAX_NUMBER_ATTEMPTS),
+ TEST_URL,
+ [{"status_code": codes.too_many_requests}] * (MAX_NUMBER_ATTEMPTS),
)
mock_sleep = mocker.patch.object(make_request.retry, "sleep")
with pytest.raises(requests.exceptions.HTTPError) as e:
make_request()
assert e.value.response.status_code == codes.too_many_requests
assert_sleep_calls(
mocker,
mock_sleep,
- [float(WAIT_EXP_BASE ** i) for i in range(MAX_NUMBER_ATTEMPTS - 1)],
+ [float(WAIT_EXP_BASE**i) for i in range(MAX_NUMBER_ATTEMPTS - 1)],
)
@throttling_retry(wait=wait_fixed(WAIT_EXP_BASE))
def make_request_wait_fixed():
response = requests.get(TEST_URL)
response.raise_for_status()
return response
def test_throttling_retry_wait_fixed(requests_mock, mocker):
requests_mock.get(
TEST_URL,
[
{"status_code": codes.too_many_requests},
{"status_code": codes.too_many_requests},
{"status_code": codes.ok},
],
)
mock_sleep = mocker.patch.object(make_request_wait_fixed.retry, "sleep")
make_request_wait_fixed()
assert_sleep_calls(mocker, mock_sleep, [WAIT_EXP_BASE] * 2)
diff --git a/swh/lister/tuleap/lister.py b/swh/lister/tuleap/lister.py
index b630508..179329a 100644
--- a/swh/lister/tuleap/lister.py
+++ b/swh/lister/tuleap/lister.py
@@ -1,146 +1,150 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from typing import Any, Dict, Iterator, List, Optional
from urllib.parse import urljoin
import iso8601
import requests
from tenacity.before_sleep import before_sleep_log
from swh.lister.utils import throttling_retry
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from .. import USER_AGENT
from ..pattern import CredentialsType, StatelessLister
logger = logging.getLogger(__name__)
RepoPage = Dict[str, Any]
class TuleapLister(StatelessLister[RepoPage]):
"""List origins from Tuleap.
Tuleap provides SVN and Git repositories hosting.
Tuleap API getting started:
https://tuleap.net/doc/en/user-guide/integration/rest.html
Tuleap API reference:
https://tuleap.net/api/explorer/
Using the API we first request a list of projects, and from there request their
associated repositories individually. Everything is paginated, code uses throttling
at the individual GET call level."""
LISTER_NAME = "tuleap"
REPO_LIST_PATH = "/api"
REPO_GIT_PATH = "plugins/git/"
REPO_SVN_PATH = "plugins/svn/"
def __init__(
self,
scheduler: SchedulerInterface,
url: str,
instance: Optional[str] = None,
credentials: CredentialsType = None,
):
super().__init__(
- scheduler=scheduler, credentials=credentials, url=url, instance=instance,
+ scheduler=scheduler,
+ credentials=credentials,
+ url=url,
+ instance=instance,
)
self.session = requests.Session()
self.session.headers.update(
- {"Accept": "application/json", "User-Agent": USER_AGENT,}
+ {
+ "Accept": "application/json",
+ "User-Agent": USER_AGENT,
+ }
)
@throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING))
def page_request(self, url: str, params: Dict[str, Any]) -> requests.Response:
logger.info("Fetching URL %s with params %s", url, params)
response = self.session.get(url, params=params)
if response.status_code != 200:
logger.warning(
"Unexpected HTTP status code %s on %s: %s",
response.status_code,
response.url,
response.content,
)
response.raise_for_status()
return response
@classmethod
def results_simplified(cls, url: str, repo_type: str, repo: RepoPage) -> RepoPage:
if repo_type == "git":
prefix_url = TuleapLister.REPO_GIT_PATH
else:
prefix_url = TuleapLister.REPO_SVN_PATH
rep = {
"project": repo["name"],
"type": repo_type,
"uri": urljoin(url, f"{prefix_url}{repo['path']}"),
"last_update_date": repo["last_update_date"],
}
return rep
def _get_repositories(self, url_repo) -> List[Dict[str, Any]]:
ret = self.page_request(url_repo, {})
reps_list = ret.json()["repositories"]
limit = int(ret.headers["X-PAGINATION-LIMIT-MAX"])
offset = int(ret.headers["X-PAGINATION-LIMIT"])
size = int(ret.headers["X-PAGINATION-SIZE"])
while offset < size:
url_offset = url_repo + "?offset=" + str(offset) + "&limit=" + str(limit)
ret = self.page_request(url_offset, {}).json()
reps_list = reps_list + ret["repositories"]
offset += limit
return reps_list
def get_pages(self) -> Iterator[RepoPage]:
# base with trailing slash, path without leading slash for urljoin
url_api: str = urljoin(self.url, self.REPO_LIST_PATH)
url_projects = url_api + "/projects/"
# Get the list of projects.
response = self.page_request(url_projects, {})
projects_list = response.json()
limit = int(response.headers["X-PAGINATION-LIMIT-MAX"])
offset = int(response.headers["X-PAGINATION-LIMIT"])
size = int(response.headers["X-PAGINATION-SIZE"])
while offset < size:
url_offset = (
url_projects + "?offset=" + str(offset) + "&limit=" + str(limit)
)
ret = self.page_request(url_offset, {}).json()
projects_list = projects_list + ret
offset += limit
# Get list of repositories for each project.
for p in projects_list:
p_id = p["id"]
# Fetch Git repositories for project
url_git = url_projects + str(p_id) + "/git"
repos = self._get_repositories(url_git)
for repo in repos:
yield self.results_simplified(url_api, "git", repo)
def get_origins_from_page(self, page: RepoPage) -> Iterator[ListedOrigin]:
- """Convert a page of Tuleap repositories into a list of ListedOrigins.
-
- """
+ """Convert a page of Tuleap repositories into a list of ListedOrigins."""
assert self.lister_obj.id is not None
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=page["uri"],
visit_type=page["type"],
last_update=iso8601.parse_date(page["last_update_date"]),
)
diff --git a/swh/lister/tuleap/tests/test_tasks.py b/swh/lister/tuleap/tests/test_tasks.py
index a9b3cf2..2f394b9 100644
--- a/swh/lister/tuleap/tests/test_tasks.py
+++ b/swh/lister/tuleap/tests/test_tasks.py
@@ -1,50 +1,55 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.lister.pattern import ListerStats
def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker):
res = swh_scheduler_celery_app.send_task("swh.lister.tuleap.tasks.ping")
assert res
res.wait()
assert res.successful()
assert res.result == "OK"
def test_full_listing(swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker):
lister = mocker.patch("swh.lister.tuleap.tasks.TuleapLister")
lister.from_configfile.return_value = lister
lister.run.return_value = ListerStats(pages=10, origins=500)
kwargs = dict(url="https://tuleap.net")
res = swh_scheduler_celery_app.send_task(
- "swh.lister.tuleap.tasks.FullTuleapLister", kwargs=kwargs,
+ "swh.lister.tuleap.tasks.FullTuleapLister",
+ kwargs=kwargs,
)
assert res
res.wait()
assert res.successful()
lister.from_configfile.assert_called_once_with(**kwargs)
lister.run.assert_called_once_with()
def test_full_listing_params(
swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker
):
lister = mocker.patch("swh.lister.tuleap.tasks.TuleapLister")
lister.from_configfile.return_value = lister
lister.run.return_value = ListerStats(pages=10, origins=500)
- kwargs = dict(url="https://tuleap.net", instance="tuleap.net",)
+ kwargs = dict(
+ url="https://tuleap.net",
+ instance="tuleap.net",
+ )
res = swh_scheduler_celery_app.send_task(
- "swh.lister.tuleap.tasks.FullTuleapLister", kwargs=kwargs,
+ "swh.lister.tuleap.tasks.FullTuleapLister",
+ kwargs=kwargs,
)
assert res
res.wait()
assert res.successful()
lister.from_configfile.assert_called_once_with(**kwargs)
lister.run.assert_called_once_with()