diff --git a/docs/new_lister_template.py b/docs/new_lister_template.py index 41e27a7..a13a83f 100644 --- a/docs/new_lister_template.py +++ b/docs/new_lister_template.py @@ -1,165 +1,130 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import asdict, dataclass import logging from typing import Any, Dict, Iterator, List from urllib.parse import urljoin -import requests -from tenacity.before_sleep import before_sleep_log - -from swh.lister.utils import http_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin -from .. import USER_AGENT from ..pattern import CredentialsType, Lister logger = logging.getLogger(__name__) # Aliasing the page results returned by `get_pages` method from the lister. NewForgeListerPage = List[Dict[str, Any]] @dataclass class NewForgeListerState: """The NewForgeLister instance state. This is used for incremental listing.""" current: str = "" """Id of the last origin listed on an incremental pass""" # If there is no need to keep state, subclass StatelessLister[NewForgeListerPage] class NewForgeLister(Lister[NewForgeListerState, NewForgeListerPage]): """List origins from the "NewForge" forge.""" # Part of the lister API, that identifies this lister LISTER_NAME = "" # (Optional) CVS type of the origins listed by this lister, if constant VISIT_TYPE = "" # Instance URLs include the hostname and the common path prefix of processed URLs EXAMPLE_BASE_URL = "https://netloc/api/v1/" # Path of a specific resource to process, to join the base URL with EXAMPLE_PATH = "origins/list" def __init__( self, # Required scheduler: SchedulerInterface, # Instance URL, required for multi-instances listers (e.g gitlab, ...) url: str, # Instance name (free form) required for multi-instance listers, # or computed from `url` instance: str, # Required whether lister supports authentication or not credentials: CredentialsType = None, ): super().__init__( scheduler=scheduler, credentials=credentials, url=url, instance=instance, ) - self.session = requests.Session() - # Declare the USER_AGENT is more sysadm-friendly for the forge we list - self.session.headers.update( - {"Accept": "application/json", "User-Agent": USER_AGENT} - ) + self.session.headers.update({"Accept": "application/json"}) def state_from_dict(self, d: Dict[str, Any]) -> NewForgeListerState: return NewForgeListerState(**d) def state_to_dict(self, state: NewForgeListerState) -> Dict[str, Any]: return asdict(state) - @http_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) - def page_request(self, url, params) -> requests.Response: - # Do the network resource request under a retrying decorator - # to handle rate limiting and transient errors up to a limit. - # `http_retry` by default use the `requests` library to check - # only for rate-limit and a base-10 exponential waiting strategy. - # This can be customized by passed waiting, retrying and logging strategies - # as functions. See the `tenacity` library documentation. - - # Log listed URL to ease debugging - logger.debug("Fetching URL %s with params %s", url, params) - response = self.session.get(url, params=params) - - if response.status_code != 200: - # Log response content to ease debugging - logger.warning( - "Unexpected HTTP status code %s on %s: %s", - response.status_code, - response.url, - response.content, - ) - # The lister must fail on blocking errors - response.raise_for_status() - - return response - def get_pages(self) -> Iterator[NewForgeListerPage]: # The algorithm depends on the service, but should request data reliably, # following pagination if relevant and yielding pages in a streaming fashion. # If incremental listing is supported, initialize from saved lister state. # Make use of any next page URL provided. # Simplify the results early to ease testing and debugging. # Initialize from the lister saved state current = "" if self.state.current is not None: current = self.state.current # Construct the URL of a service endpoint, the lister can have others to fetch url = urljoin(self.url, self.EXAMPLE_PATH) while current is not None: # Parametrize the request for incremental listing - body = self.page_request(url, {"current": current}).json() + body = self.http_request(url, params={"current": current}).json() # Simplify the page if possible to only the necessary elements # and yield it yield body # Get the next page parameter or end the loop when there is none current = body.get("next") def get_origins_from_page(self, page: NewForgeListerPage) -> Iterator[ListedOrigin]: """Convert a page of NewForgeLister repositories into a list of ListedOrigins""" assert self.lister_obj.id is not None for element in page: yield ListedOrigin( # Required. Should use this value. lister_id=self.lister_obj.id, # Required. Visit type of the currently processed origin visit_type=self.VISIT_TYPE, # Required. URL corresponding to the origin for loaders to ingest url=..., # Should get it if the service provides it and if it induces no # substantial additional processing cost last_update=..., ) def commit_page(self, page: NewForgeListerPage) -> None: # Update the lister state to the latest `current` current = page[-1]["current"] if current > self.state.current: self.state.current = current def finalize(self) -> None: # Pull fresh lister state from the scheduler backend, in case multiple # listers run concurrently scheduler_state = self.get_state_from_scheduler() # Update the lister state in the backend only if `current` is fresher than # the one stored in the database. if self.state.current > scheduler_state.current: self.updated = True diff --git a/swh/lister/__init__.py b/swh/lister/__init__.py index 6a9b02b..f4448d8 100644 --- a/swh/lister/__init__.py +++ b/swh/lister/__init__.py @@ -1,60 +1,61 @@ -# Copyright (C) 2018-2019 The Software Heritage developers +# Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import pkg_resources -from swh.lister import pattern - logger = logging.getLogger(__name__) try: __version__ = pkg_resources.get_distribution("swh.lister").version except pkg_resources.DistributionNotFound: __version__ = "devel" USER_AGENT_TEMPLATE = "Software Heritage Lister (%s)" USER_AGENT = USER_AGENT_TEMPLATE % __version__ LISTERS = { entry_point.name.split(".", 1)[1]: entry_point for entry_point in pkg_resources.iter_entry_points("swh.workers") if entry_point.name.split(".", 1)[0] == "lister" } SUPPORTED_LISTERS = list(LISTERS) def get_lister(lister_name, db_url=None, **conf): """Instantiate a lister given its name. Args: lister_name (str): Lister's name conf (dict): Configuration dict (lister db cnx, policy, priority...) Returns: Tuple (instantiated lister, drop_tables function, init schema function, insert minimum data function) """ if lister_name not in LISTERS: raise ValueError( "Invalid lister %s: only supported listers are %s" % (lister_name, SUPPORTED_LISTERS) ) if db_url: conf["lister"] = {"cls": "local", "args": {"db": db_url}} registry_entry = LISTERS[lister_name].load()() lister_cls = registry_entry["lister"] + + from swh.lister import pattern + if issubclass(lister_cls, pattern.Lister): return lister_cls.from_config(**conf) else: # Old-style lister return lister_cls(override_config=conf) diff --git a/swh/lister/arch/lister.py b/swh/lister/arch/lister.py index a933650..af66d7f 100644 --- a/swh/lister/arch/lister.py +++ b/swh/lister/arch/lister.py @@ -1,501 +1,474 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging from pathlib import Path import re import tarfile import tempfile from typing import Any, Dict, Iterator, List, Optional from urllib.parse import unquote, urljoin from bs4 import BeautifulSoup -import requests -from tenacity.before_sleep import before_sleep_log -from swh.lister.utils import http_retry from swh.model.hashutil import hash_to_hex from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin -from .. import USER_AGENT from ..pattern import CredentialsType, StatelessLister logger = logging.getLogger(__name__) # Aliasing the page results returned by `get_pages` method from the lister. ArchListerPage = List[Dict[str, Any]] def size_to_bytes(size: str) -> int: """Convert human readable file size to bytes. Resulting value is an approximation as input value is in most case rounded. Args: size: A string representing a human readable file size (eg: '500K') Returns: A decimal representation of file size Examples:: >>> size_to_bytes("500") 500 >>> size_to_bytes("1K") 1000 """ units = { "K": 1000, "M": 1000**2, "G": 1000**3, "T": 1000**4, "P": 1000**5, "E": 1000**6, "Z": 1000**7, "Y": 1000**8, } if size.endswith(tuple(units)): v, u = (size[:-1], size[-1]) return int(v) * units[u] else: return int(size) class ArchLister(StatelessLister[ArchListerPage]): """List Arch linux origins from 'core', 'extra', and 'community' repositories For 'official' Arch Linux it downloads core.tar.gz, extra.tar.gz and community.tar.gz from https://archive.archlinux.org/repos/last/ extract to a temp directory and then walks through each 'desc' files. Each 'desc' file describe the latest released version of a package and helps to build an origin url from where scrapping artifacts metadata. For 'arm' Arch Linux it follow the same discovery process parsing 'desc' files. The main difference is that we can't get existing versions of an arm package because https://archlinuxarm.org does not have an 'archive' website or api. """ LISTER_NAME = "arch" VISIT_TYPE = "arch" INSTANCE = "arch" ARCH_PACKAGE_URL_PATTERN = "{base_url}/packages/{repo}/{arch}/{pkgname}" ARCH_PACKAGE_VERSIONS_URL_PATTERN = "{base_url}/packages/{pkgname[0]}/{pkgname}" ARCH_PACKAGE_DOWNLOAD_URL_PATTERN = ( "{base_url}/packages/{pkgname[0]}/{pkgname}/{filename}" ) ARCH_API_URL_PATTERN = "{base_url}/packages/{repo}/{arch}/{pkgname}/json" ARM_PACKAGE_URL_PATTERN = "{base_url}/packages/{arch}/{pkgname}" ARM_PACKAGE_DOWNLOAD_URL_PATTERN = "{base_url}/{arch}/{repo}/{filename}" def __init__( self, scheduler: SchedulerInterface, credentials: Optional[CredentialsType] = None, flavours: Dict[str, Any] = { "official": { "archs": ["x86_64"], "repos": ["core", "extra", "community"], "base_info_url": "https://archlinux.org", "base_archive_url": "https://archive.archlinux.org", "base_mirror_url": "", "base_api_url": "https://archlinux.org", }, "arm": { "archs": ["armv7h", "aarch64"], "repos": ["core", "extra", "community"], "base_info_url": "https://archlinuxarm.org", "base_archive_url": "", "base_mirror_url": "https://uk.mirror.archlinuxarm.org", "base_api_url": "", }, }, ): super().__init__( scheduler=scheduler, credentials=credentials, url=flavours["official"]["base_info_url"], instance=self.INSTANCE, ) self.flavours = flavours - self.session = requests.Session() - self.session.headers.update( - { - "User-Agent": USER_AGENT, - } - ) - - @http_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) - def request_get(self, url: str, params: Dict[str, Any]) -> requests.Response: - - logger.debug("Fetching URL %s with params %s", url, params) - - response = self.session.get(url, params=params) - if response.status_code != 200: - logger.warning( - "Unexpected HTTP status code %s on %s: %s", - response.status_code, - response.url, - response.content, - ) - response.raise_for_status() - - return response def scrap_package_versions( self, name: str, repo: str, base_url: str ) -> List[Dict[str, Any]]: """Given a package 'name' and 'repo', make an http call to origin url and parse its content to get package versions artifacts data. That method is suitable only for 'official' Arch Linux, not 'arm'. Args: name: Package name repo: The repository the package belongs to (one of self.repos) Returns: A list of dict of version Example:: [ {"url": "https://archive.archlinux.org/packages/d/dialog/dialog-1:1.3_20190211-1-x86_64.pkg.tar.xz", # noqa: B950 "arch": "x86_64", "repo": "core", "name": "dialog", "version": "1:1.3_20190211-1", "length": 180000, "filename": "dialog-1:1.3_20190211-1-x86_64.pkg.tar.xz", "last_modified": "2019-02-13T08:36:00"}, ] """ url = self.ARCH_PACKAGE_VERSIONS_URL_PATTERN.format( pkgname=name, base_url=base_url ) - response = self.request_get(url=url, params={}) + response = self.http_request(url) soup = BeautifulSoup(response.text, "html.parser") links = soup.find_all("a", href=True) # drop the first line (used to go to up directory) if links[0].attrs["href"] == "../": links.pop(0) versions = [] for link in links: # filename displayed can be cropped if name is too long, get it from href instead filename = unquote(link.attrs["href"]) if filename.endswith((".tar.xz", ".tar.zst")): # Extract arch from filename arch_rex = re.compile( rf"^{re.escape(name)}-(?P.*)-(?Pany|i686|x86_64)" rf"(.pkg.tar.(?:zst|xz))$" ) m = arch_rex.match(filename) if m is None: logger.error( "Can not find a match for architecture in %(filename)s", dict(filename=filename), ) else: arch = m.group("arch") version = m.group("version") # Extract last_modified and an approximate file size raw_text = link.next_sibling raw_text_rex = re.compile( r"^(?P\d+-\w+-\d+ \d\d:\d\d)\s+(?P\w+)$" ) s = raw_text_rex.search(raw_text.strip()) if s is None: logger.error( "Can not find a match for 'last_modified' and/or " "'size' in '%(raw_text)s'", dict(raw_text=raw_text), ) else: assert s.groups() assert len(s.groups()) == 2 last_modified_str, size = s.groups() # format as expected last_modified = datetime.datetime.strptime( last_modified_str, "%d-%b-%Y %H:%M" ).isoformat() length = size_to_bytes(size) # we want bytes # link url is relative, format a canonical one url = self.ARCH_PACKAGE_DOWNLOAD_URL_PATTERN.format( base_url=base_url, pkgname=name, filename=filename ) versions.append( dict( name=name, version=version, repo=repo, arch=arch, filename=filename, url=url, last_modified=last_modified, length=length, ) ) return versions def get_repo_archive(self, url: str, destination_path: Path) -> Path: """Given an url and a destination path, retrieve and extract .tar.gz archive which contains 'desc' file for each package. Each .tar.gz archive corresponds to an Arch Linux repo ('core', 'extra', 'community'). Args: url: url of the .tar.gz archive to download destination_path: the path on disk where to extract archive Returns: a directory Path where the archive has been extracted to. """ - res = self.request_get(url=url, params={}) + res = self.http_request(url) destination_path.parent.mkdir(parents=True, exist_ok=True) destination_path.write_bytes(res.content) extract_to = Path(str(destination_path).split(".tar.gz")[0]) tar = tarfile.open(destination_path) tar.extractall(path=extract_to) tar.close() return extract_to def parse_desc_file( self, path: Path, repo: str, base_url: str, dl_url_fmt: str, ) -> Dict[str, Any]: """Extract package information from a 'desc' file. There are subtle differences between parsing 'official' and 'arm' des files Args: path: A path to a 'desc' file on disk repo: The repo the package belongs to Returns: A dict of metadata Example:: {'api_url': 'https://archlinux.org/packages/core/x86_64/dialog/json', 'arch': 'x86_64', 'base': 'dialog', 'builddate': '1650081535', 'csize': '203028', 'desc': 'A tool to display dialog boxes from shell scripts', 'filename': 'dialog-1:1.3_20220414-1-x86_64.pkg.tar.zst', 'isize': '483988', 'license': 'LGPL2.1', 'md5sum': '06407c0cb11c50d7bf83d600f2e8107c', 'name': 'dialog', 'packager': 'Evangelos Foutras ', 'pgpsig': 'pgpsig content xxx', 'project_url': 'https://invisible-island.net/dialog/', 'provides': 'libdialog.so=15-64', 'repo': 'core', 'sha256sum': 'ef8c8971f591de7db0f455970ef5d81d5aced1ddf139f963f16f6730b1851fa7', 'url': 'https://archive.archlinux.org/packages/.all/dialog-1:1.3_20220414-1-x86_64.pkg.tar.zst', # noqa: B950 'version': '1:1.3_20220414-1'} """ rex = re.compile(r"^\%(?P\w+)\%\n(?P.*)\n$", re.M) with path.open("rb") as content: parsed = rex.findall(content.read().decode()) data = {entry[0].lower(): entry[1] for entry in parsed} if "url" in data.keys(): data["project_url"] = data["url"] assert data["name"] assert data["filename"] assert data["arch"] data["repo"] = repo data["url"] = urljoin( base_url, dl_url_fmt.format( base_url=base_url, pkgname=data["name"], filename=data["filename"], arch=data["arch"], repo=repo, ), ) assert data["md5sum"] assert data["sha256sum"] data["checksums"] = { "md5sum": hash_to_hex(data["md5sum"]), "sha256sum": hash_to_hex(data["sha256sum"]), } return data def get_pages(self) -> Iterator[ArchListerPage]: """Yield an iterator sorted by name in ascending order of pages. Each page is a list of package belonging to a flavour ('official', 'arm'), and a repo ('core', 'extra', 'community') """ for name, flavour in self.flavours.items(): for arch in flavour["archs"]: for repo in flavour["repos"]: yield self._get_repo_page(name, flavour, arch, repo) def _get_repo_page( self, name: str, flavour: Dict[str, Any], arch: str, repo: str ) -> ArchListerPage: with tempfile.TemporaryDirectory() as tmpdir: page = [] if name == "official": prefix = urljoin(flavour["base_archive_url"], "/repos/last/") filename = f"{repo}.files.tar.gz" archive_url = urljoin(prefix, f"{repo}/os/{arch}/{filename}") destination_path = Path(tmpdir, arch, filename) base_url = flavour["base_archive_url"] dl_url_fmt = self.ARCH_PACKAGE_DOWNLOAD_URL_PATTERN base_info_url = flavour["base_info_url"] info_url_fmt = self.ARCH_PACKAGE_URL_PATTERN elif name == "arm": filename = f"{repo}.files.tar.gz" archive_url = urljoin( flavour["base_mirror_url"], f"{arch}/{repo}/{filename}" ) destination_path = Path(tmpdir, arch, filename) base_url = flavour["base_mirror_url"] dl_url_fmt = self.ARM_PACKAGE_DOWNLOAD_URL_PATTERN base_info_url = flavour["base_info_url"] info_url_fmt = self.ARM_PACKAGE_URL_PATTERN archive = self.get_repo_archive( url=archive_url, destination_path=destination_path ) assert archive packages_desc = list(archive.glob("**/desc")) logger.debug( "Processing %(instance)s source packages info from " "%(flavour)s %(arch)s %(repo)s repository, " "(%(qty)s packages).", dict( instance=self.instance, flavour=name, arch=arch, repo=repo, qty=len(packages_desc), ), ) for package_desc in packages_desc: data = self.parse_desc_file( path=package_desc, repo=repo, base_url=base_url, dl_url_fmt=dl_url_fmt, ) assert data["builddate"] last_modified = datetime.datetime.fromtimestamp( float(data["builddate"]), tz=datetime.timezone.utc ) assert data["name"] assert data["filename"] assert data["arch"] url = info_url_fmt.format( base_url=base_info_url, pkgname=data["name"], filename=data["filename"], repo=repo, arch=data["arch"], ) assert data["version"] if name == "official": # find all versions of a package scrapping archive versions = self.scrap_package_versions( name=data["name"], repo=repo, base_url=base_url ) elif name == "arm": # There is no way to get related versions of a package, # but 'data' represents the latest released version, # use it in this case assert data["builddate"] assert data["csize"] assert data["url"] versions = [ dict( name=data["name"], version=data["version"], repo=repo, arch=data["arch"], filename=data["filename"], url=data["url"], last_modified=last_modified.replace(tzinfo=None).isoformat( timespec="seconds" ), length=int(data["csize"]), ) ] package = { "name": data["name"], "version": data["version"], "last_modified": last_modified, "url": url, "versions": versions, "data": data, } page.append(package) return page def get_origins_from_page(self, page: ArchListerPage) -> Iterator[ListedOrigin]: """Iterate on all arch pages and yield ListedOrigin instances.""" assert self.lister_obj.id is not None for origin in page: artifacts = [] arch_metadata = [] for version in origin["versions"]: artifacts.append( { "version": version["version"], "filename": version["filename"], "url": version["url"], "length": version["length"], } ) arch_metadata.append( { "version": version["version"], "name": version["name"], "arch": version["arch"], "repo": version["repo"], "last_modified": version["last_modified"], } ) yield ListedOrigin( lister_id=self.lister_obj.id, visit_type=self.VISIT_TYPE, url=origin["url"], last_update=origin["last_modified"], extra_loader_arguments={ "artifacts": artifacts, "arch_metadata": arch_metadata, }, ) diff --git a/swh/lister/aur/lister.py b/swh/lister/aur/lister.py index 778a848..9bbdf37 100644 --- a/swh/lister/aur/lister.py +++ b/swh/lister/aur/lister.py @@ -1,154 +1,152 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging from typing import Any, Dict, Iterator, List, Optional -import requests - from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin from ..pattern import CredentialsType, StatelessLister logger = logging.getLogger(__name__) # Aliasing the page results returned by `get_pages` method from the lister. AurListerPage = Dict[str, Any] class AurLister(StatelessLister[AurListerPage]): """List Arch User Repository (AUR) origins. Given an url (used as a base url, default is 'https://aur.archlinux.org'), download a 'packages-meta-v1.json.gz' which contains a json file listing all existing packages definitions. Each entry describes the latest released version of a package. The origin url for a package is built using 'pkgname' and corresponds to a git repository. An rpc api exists but it is recommended to save bandwidth so it's not used. See https://lists.archlinux.org/pipermail/aur-general/2021-November/036659.html for more on this. """ LISTER_NAME = "aur" VISIT_TYPE = "aur" INSTANCE = "aur" BASE_URL = "https://aur.archlinux.org" DEFAULT_PACKAGES_INDEX_URL = "{base_url}/packages-meta-v1.json.gz" PACKAGE_VCS_URL_PATTERN = "{base_url}/{pkgname}.git" PACKAGE_SNAPSHOT_URL_PATTERN = "{base_url}/cgit/aur.git/snapshot/{pkgname}.tar.gz" ORIGIN_URL_PATTERN = "{base_url}/packages/{pkgname}" def __init__( self, scheduler: SchedulerInterface, credentials: Optional[CredentialsType] = None, ): super().__init__( scheduler=scheduler, credentials=credentials, instance=self.INSTANCE, url=self.BASE_URL, ) def download_packages_index(self) -> List[Dict[str, Any]]: """Build an url based on self.DEFAULT_PACKAGES_INDEX_URL format string, and download the archive to self.DESTINATION_PATH Returns: a directory Path where the archive has been downloaded to. """ url = self.DEFAULT_PACKAGES_INDEX_URL.format(base_url=self.url) - return requests.get(url).json() + return self.http_request(url).json() def get_pages(self) -> Iterator[AurListerPage]: """Yield an iterator which returns 'page' Each page corresponds to a package with a 'version', an 'url' for a Git repository, a 'project_url' which represents the upstream project url and a canonical 'snapshot_url' from which a tar.gz archive of the package can be downloaded. """ packages = self.download_packages_index() logger.debug("Found %s AUR packages in aur_index", len(packages)) for package in packages: # Exclude lines where Name differs from PackageBase as they represents # split package and they don't have resolvable snapshots url if package["Name"] == package["PackageBase"]: logger.debug("Processing AUR package %s", package["Name"]) pkgname = package["PackageBase"] version = package["Version"] project_url = package["URL"] last_modified = datetime.datetime.fromtimestamp( float(package["LastModified"]), tz=datetime.timezone.utc ).isoformat() yield { "pkgname": pkgname, "version": version, "url": self.ORIGIN_URL_PATTERN.format( base_url=self.BASE_URL, pkgname=pkgname ), "git_url": self.PACKAGE_VCS_URL_PATTERN.format( base_url=self.BASE_URL, pkgname=pkgname ), "snapshot_url": self.PACKAGE_SNAPSHOT_URL_PATTERN.format( base_url=self.BASE_URL, pkgname=pkgname ), "project_url": project_url, "last_modified": last_modified, } def get_origins_from_page(self, origin: AurListerPage) -> Iterator[ListedOrigin]: """Iterate on all pages and yield ListedOrigin instances. It uses the vcs (Git) url as an origin and adds `artifacts` and `aur_metadata` entries to 'extra_loader_arguments'. `artifacts` describe the file to download and `aur_metadata` store some metadata that can be useful for the loader. """ assert self.lister_obj.id is not None last_update = datetime.datetime.fromisoformat(origin["last_modified"]) filename = origin["snapshot_url"].split("/")[-1] artifacts = [ { "filename": filename, "url": origin["snapshot_url"], "version": origin["version"], } ] aur_metadata = [ { "version": origin["version"], "project_url": origin["project_url"], "last_update": origin["last_modified"], "pkgname": origin["pkgname"], } ] yield ListedOrigin( lister_id=self.lister_obj.id, visit_type=self.VISIT_TYPE, url=origin["url"], last_update=last_update, extra_loader_arguments={ "artifacts": artifacts, "aur_metadata": aur_metadata, }, ) yield ListedOrigin( lister_id=self.lister_obj.id, visit_type="git", url=origin["git_url"], last_update=last_update, ) diff --git a/swh/lister/bitbucket/lister.py b/swh/lister/bitbucket/lister.py index 1c195b2..7bcec03 100644 --- a/swh/lister/bitbucket/lister.py +++ b/swh/lister/bitbucket/lister.py @@ -1,198 +1,173 @@ -# Copyright (C) 2017-2021 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import asdict, dataclass from datetime import datetime import logging import random from typing import Any, Dict, Iterator, List, Optional from urllib import parse import iso8601 -import requests -from tenacity.before_sleep import before_sleep_log -from swh.lister.utils import http_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin -from .. import USER_AGENT from ..pattern import CredentialsType, Lister logger = logging.getLogger(__name__) @dataclass class BitbucketListerState: """State of Bitbucket lister""" last_repo_cdate: Optional[datetime] = None """Creation date and time of the last listed repository during an incremental pass""" class BitbucketLister(Lister[BitbucketListerState, List[Dict[str, Any]]]): """List origins from Bitbucket using its API. Bitbucket API has the following rate-limit configuration: * 60 requests per hour for anonymous users * 1000 requests per hour for authenticated users The lister is working in anonymous mode by default but Bitbucket account credentials can be provided to perform authenticated requests. """ LISTER_NAME = "bitbucket" INSTANCE = "bitbucket" API_URL = "https://api.bitbucket.org/2.0/repositories" def __init__( self, scheduler: SchedulerInterface, page_size: int = 1000, incremental: bool = True, credentials: CredentialsType = None, ): super().__init__( scheduler=scheduler, credentials=credentials, url=self.API_URL, instance=self.INSTANCE, ) self.incremental = incremental self.url_params: Dict[str, Any] = { "pagelen": page_size, # only return needed JSON fields in bitbucket API responses # (also prevent errors 500 when listing) "fields": ( "next,values.links.clone.href,values.scm,values.updated_on," "values.created_on" ), } - self.session = requests.Session() - self.session.headers.update( - {"Accept": "application/json", "User-Agent": USER_AGENT} - ) + self.session.headers.update({"Accept": "application/json"}) if len(self.credentials) > 0: cred = random.choice(self.credentials) logger.warning("Using Bitbucket credentials from user %s", cred["username"]) self.set_credentials(cred["username"], cred["password"]) else: logger.warning("No credentials set in configuration, using anonymous mode") def state_from_dict(self, d: Dict[str, Any]) -> BitbucketListerState: last_repo_cdate = d.get("last_repo_cdate") if last_repo_cdate is not None: d["last_repo_cdate"] = iso8601.parse_date(last_repo_cdate) return BitbucketListerState(**d) def state_to_dict(self, state: BitbucketListerState) -> Dict[str, Any]: d = asdict(state) last_repo_cdate = d.get("last_repo_cdate") if last_repo_cdate is not None: d["last_repo_cdate"] = last_repo_cdate.isoformat() return d def set_credentials(self, username: Optional[str], password: Optional[str]) -> None: """Set basic authentication headers with given credentials.""" if username is not None and password is not None: self.session.auth = (username, password) - @http_retry(before_sleep=before_sleep_log(logger, logging.DEBUG)) - def page_request(self, last_repo_cdate: str) -> requests.Response: - - self.url_params["after"] = last_repo_cdate - logger.debug("Fetching URL %s with params %s", self.url, self.url_params) - - response = self.session.get(self.url, params=self.url_params) - - if response.status_code != 200: - logger.warning( - "Unexpected HTTP status code %s on %s: %s", - response.status_code, - response.url, - response.content, - ) - response.raise_for_status() - - return response - def get_pages(self) -> Iterator[List[Dict[str, Any]]]: last_repo_cdate: str = "1970-01-01" if ( self.incremental and self.state is not None and self.state.last_repo_cdate is not None ): last_repo_cdate = self.state.last_repo_cdate.isoformat() while True: - body = self.page_request(last_repo_cdate).json() + self.url_params["after"] = last_repo_cdate + body = self.http_request(self.url, params=self.url_params).json() yield body["values"] next_page_url = body.get("next") if next_page_url is not None: next_page_url = parse.urlparse(next_page_url) if not next_page_url.query: logger.warning("Failed to parse url %s", next_page_url) break last_repo_cdate = parse.parse_qs(next_page_url.query)["after"][0] else: # last page break def get_origins_from_page( self, page: List[Dict[str, Any]] ) -> Iterator[ListedOrigin]: """Convert a page of Bitbucket repositories into a list of ListedOrigins.""" assert self.lister_obj.id is not None for repo in page: last_update = iso8601.parse_date(repo["updated_on"]) origin_url = repo["links"]["clone"][0]["href"] origin_type = repo["scm"] yield ListedOrigin( lister_id=self.lister_obj.id, url=origin_url, visit_type=origin_type, last_update=last_update, ) def commit_page(self, page: List[Dict[str, Any]]) -> None: """Update the currently stored state using the latest listed page.""" if self.incremental: last_repo = page[-1] last_repo_cdate = iso8601.parse_date(last_repo["created_on"]) if ( self.state.last_repo_cdate is None or last_repo_cdate > self.state.last_repo_cdate ): self.state.last_repo_cdate = last_repo_cdate def finalize(self) -> None: if self.incremental: scheduler_state = self.get_state_from_scheduler() if self.state.last_repo_cdate is None: return # Update the lister state in the backend only if the last seen id of # the current run is higher than that stored in the database. if ( scheduler_state.last_repo_cdate is None or self.state.last_repo_cdate > scheduler_state.last_repo_cdate ): self.updated = True diff --git a/swh/lister/bitbucket/tests/test_lister.py b/swh/lister/bitbucket/tests/test_lister.py index e624e8e..04df324 100644 --- a/swh/lister/bitbucket/tests/test_lister.py +++ b/swh/lister/bitbucket/tests/test_lister.py @@ -1,178 +1,180 @@ -# Copyright (C) 2017-2021 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime import json import os import pytest from swh.lister.bitbucket.lister import BitbucketLister @pytest.fixture def bb_api_repositories_page1(datadir): data_file_path = os.path.join(datadir, "bb_api_repositories_page1.json") with open(data_file_path, "r") as data_file: return json.load(data_file) @pytest.fixture def bb_api_repositories_page2(datadir): data_file_path = os.path.join(datadir, "bb_api_repositories_page2.json") with open(data_file_path, "r") as data_file: return json.load(data_file) def _check_listed_origins(lister_origins, scheduler_origins): """Asserts that the two collections have the same origins from the point of view of the lister""" assert {(lo.url, lo.last_update) for lo in lister_origins} == { (so.url, so.last_update) for so in scheduler_origins } def test_bitbucket_incremental_lister( swh_scheduler, requests_mock, mocker, bb_api_repositories_page1, bb_api_repositories_page2, ): """Simple Bitbucket listing with two pages containing 10 origins""" requests_mock.get( BitbucketLister.API_URL, [ {"json": bb_api_repositories_page1}, {"json": bb_api_repositories_page2}, ], ) lister = BitbucketLister(scheduler=swh_scheduler, page_size=10) # First listing stats = lister.run() scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results assert stats.pages == 2 assert stats.origins == 20 assert len(scheduler_origins) == 20 assert lister.updated lister_state = lister.get_state_from_scheduler() last_repo_cdate = lister_state.last_repo_cdate.isoformat() assert hasattr(lister_state, "last_repo_cdate") assert last_repo_cdate == bb_api_repositories_page2["values"][-1]["created_on"] # Second listing, restarting from last state - lister.session.get = mocker.spy(lister.session, "get") + lister.session.request = mocker.spy(lister.session, "request") lister.run() url_params = lister.url_params url_params["after"] = last_repo_cdate - lister.session.get.assert_called_once_with(lister.API_URL, params=url_params) + lister.session.request.assert_called_once_with( + "GET", lister.API_URL, params=url_params + ) all_origins = ( bb_api_repositories_page1["values"] + bb_api_repositories_page2["values"] ) _check_listed_origins(lister.get_origins_from_page(all_origins), scheduler_origins) def test_bitbucket_lister_rate_limit_hit( swh_scheduler, requests_mock, mocker, bb_api_repositories_page1, bb_api_repositories_page2, ): """Simple Bitbucket listing with two pages containing 10 origins""" requests_mock.get( BitbucketLister.API_URL, [ {"json": bb_api_repositories_page1, "status_code": 200}, {"json": None, "status_code": 429}, {"json": None, "status_code": 429}, {"json": bb_api_repositories_page2, "status_code": 200}, ], ) lister = BitbucketLister(scheduler=swh_scheduler, page_size=10) - mocker.patch.object(lister.page_request.retry, "sleep") + mocker.patch.object(lister.http_request.retry, "sleep") stats = lister.run() assert stats.pages == 2 assert stats.origins == 20 assert len(swh_scheduler.get_listed_origins(lister.lister_obj.id).results) == 20 def test_bitbucket_full_lister( swh_scheduler, requests_mock, mocker, bb_api_repositories_page1, bb_api_repositories_page2, ): """Simple Bitbucket listing with two pages containing 10 origins""" requests_mock.get( BitbucketLister.API_URL, [ {"json": bb_api_repositories_page1}, {"json": bb_api_repositories_page2}, {"json": bb_api_repositories_page1}, {"json": bb_api_repositories_page2}, ], ) credentials = {"bitbucket": {"bitbucket": [{"username": "u", "password": "p"}]}} lister = BitbucketLister( scheduler=swh_scheduler, page_size=10, incremental=True, credentials=credentials ) assert lister.session.auth is not None # First do a incremental run to have an initial lister state stats = lister.run() last_lister_state = lister.get_state_from_scheduler() assert stats.origins == 20 # Then do the full run and verify lister state did not change # Modify last listed repo modification date to check it will be not saved # to lister state after its execution last_page2_repo = bb_api_repositories_page2["values"][-1] last_page2_repo["created_on"] = datetime.now().isoformat() last_page2_repo["updated_on"] = datetime.now().isoformat() lister = BitbucketLister(scheduler=swh_scheduler, page_size=10, incremental=False) assert lister.session.auth is None stats = lister.run() assert stats.pages == 2 assert stats.origins == 20 scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results # 20 because scheduler upserts based on (id, type, url) assert len(scheduler_origins) == 20 # Modification on created_on SHOULD NOT impact lister state assert lister.get_state_from_scheduler() == last_lister_state # Modification on updated_on SHOULD impact lister state all_origins = ( bb_api_repositories_page1["values"] + bb_api_repositories_page2["values"] ) _check_listed_origins(lister.get_origins_from_page(all_origins), scheduler_origins) diff --git a/swh/lister/bower/lister.py b/swh/lister/bower/lister.py index 64921e2..5b488e4 100644 --- a/swh/lister/bower/lister.py +++ b/swh/lister/bower/lister.py @@ -1,91 +1,64 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import logging -from typing import Any, Dict, Iterator, List, Optional -import requests -from tenacity.before_sleep import before_sleep_log +import logging +from typing import Dict, Iterator, List, Optional -from swh.lister.utils import http_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin -from .. import USER_AGENT from ..pattern import CredentialsType, StatelessLister logger = logging.getLogger(__name__) # Aliasing the page results returned by `get_pages` method from the lister. BowerListerPage = List[Dict[str, str]] class BowerLister(StatelessLister[BowerListerPage]): """List Bower (Javascript package manager) origins.""" LISTER_NAME = "bower" VISIT_TYPE = "git" # Bower origins url are Git repositories INSTANCE = "bower" API_URL = "https://registry.bower.io/packages" def __init__( self, scheduler: SchedulerInterface, credentials: Optional[CredentialsType] = None, ): super().__init__( scheduler=scheduler, credentials=credentials, instance=self.INSTANCE, url=self.API_URL, ) - self.session = requests.Session() - self.session.headers.update( - { - "Accept": "application/json", - "User-Agent": USER_AGENT, - } - ) - - @http_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) - def page_request(self, url: str, params: Dict[str, Any]) -> requests.Response: - - logger.info("Fetching URL %s with params %s", url, params) - - response = self.session.get(url, params=params) - if response.status_code != 200: - logger.warning( - "Unexpected HTTP status code %s on %s: %s", - response.status_code, - response.url, - response.content, - ) - response.raise_for_status() - - return response + self.session.headers.update({"Accept": "application/json"}) def get_pages(self) -> Iterator[BowerListerPage]: """Yield an iterator which returns 'page' It uses the api endpoint provided by `https://registry.bower.io/packages` to get a list of package names with an origin url that corresponds to Git repository. There is only one page that list all origins urls. """ - response = self.page_request(url=self.url, params={}) + response = self.http_request(self.url) yield response.json() def get_origins_from_page(self, page: BowerListerPage) -> Iterator[ListedOrigin]: """Iterate on all pages and yield ListedOrigin instances.""" assert self.lister_obj.id is not None for entry in page: yield ListedOrigin( lister_id=self.lister_obj.id, visit_type=self.VISIT_TYPE, url=entry["url"], last_update=None, ) diff --git a/swh/lister/cgit/lister.py b/swh/lister/cgit/lister.py index 6fcfb54..49458d0 100644 --- a/swh/lister/cgit/lister.py +++ b/swh/lister/cgit/lister.py @@ -1,234 +1,225 @@ -# Copyright (C) 2019-2021 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timezone import logging import re from typing import Any, Dict, Iterator, List, Optional from urllib.parse import urljoin, urlparse from bs4 import BeautifulSoup -import requests from requests.exceptions import HTTPError -from tenacity.before_sleep import before_sleep_log -from swh.lister import USER_AGENT from swh.lister.pattern import CredentialsType, StatelessLister -from swh.lister.utils import http_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin logger = logging.getLogger(__name__) Repositories = List[Dict[str, Any]] class CGitLister(StatelessLister[Repositories]): """Lister class for CGit repositories. This lister will retrieve the list of published git repositories by parsing the HTML page(s) of the index retrieved at `url`. The lister currently defines 2 listing behaviors: - If the `base_git_url` is provided, the listed origin urls are computed out of the base git url link and the one listed in the main listed page (resulting in less HTTP queries than the 2nd behavior below). This is expected to be the main deployed behavior. - Otherwise (with no `base_git_url`), for each found git repository listed, one extra HTTP query is made at the given url found in the main listing page to gather published "Clone" URLs to be used as origin URL for that git repo. If several "Clone" urls are provided, prefer the http/https one, if any, otherwise fallback to the first one. """ LISTER_NAME = "cgit" def __init__( self, scheduler: SchedulerInterface, url: str, instance: Optional[str] = None, credentials: Optional[CredentialsType] = None, base_git_url: Optional[str] = None, ): """Lister class for CGit repositories. Args: url: main URL of the CGit instance, i.e. url of the index of published git repositories on this instance. instance: Name of cgit instance. Defaults to url's network location if unset. base_git_url: Optional base git url which allows the origin url computations. """ super().__init__( scheduler=scheduler, url=url, instance=instance, credentials=credentials, ) - self.session = requests.Session() - self.session.headers.update( - {"Accept": "application/html", "User-Agent": USER_AGENT} - ) + self.session.headers.update({"Accept": "application/html"}) self.base_git_url = base_git_url - @http_retry(before_sleep=before_sleep_log(logger, logging.DEBUG)) def _get_and_parse(self, url: str) -> BeautifulSoup: """Get the given url and parse the retrieved HTML using BeautifulSoup""" - response = self.session.get(url) - response.raise_for_status() + response = self.http_request(url) return BeautifulSoup(response.text, features="html.parser") def get_pages(self) -> Iterator[Repositories]: """Generate git 'project' URLs found on the current CGit server The last_update date is retrieved on the list of repo page to avoid to compute it on the repository details which only give a date per branch """ next_page: Optional[str] = self.url while next_page: bs_idx = self._get_and_parse(next_page) page_results = [] for tr in bs_idx.find("div", {"class": "content"}).find_all( "tr", {"class": ""} ): repository_link = tr.find("a")["href"] repo_url = None git_url = None base_url = urljoin(self.url, repository_link).strip("/") if self.base_git_url: # mapping provided # computing git url git_url = base_url.replace(self.url, self.base_git_url) else: # we compute the git detailed page url from which we will retrieve # the git url (cf. self.get_origins_from_page) repo_url = base_url span = tr.find("span", {"class": re.compile("age-")}) last_updated_date = span.get("title") if span else None page_results.append( { "url": repo_url, "git_url": git_url, "last_updated_date": last_updated_date, } ) yield page_results try: pager = bs_idx.find("ul", {"class": "pager"}) current_page = pager.find("a", {"class": "current"}) if current_page: next_page = current_page.parent.next_sibling.a["href"] next_page = urljoin(self.url, next_page) except (AttributeError, KeyError): # no pager, or no next page next_page = None def get_origins_from_page( self, repositories: Repositories ) -> Iterator[ListedOrigin]: """Convert a page of cgit repositories into a list of ListedOrigins.""" assert self.lister_obj.id is not None for repo in repositories: origin_url = repo["git_url"] or self._get_origin_from_repository_url( repo["url"] ) if origin_url is None: continue yield ListedOrigin( lister_id=self.lister_obj.id, url=origin_url, visit_type="git", last_update=_parse_last_updated_date(repo), ) def _get_origin_from_repository_url(self, repository_url: str) -> Optional[str]: """Extract the git url from the repository page""" try: bs = self._get_and_parse(repository_url) except HTTPError as e: logger.warning( "Unexpected HTTP status code %s on %s", e.response.status_code, e.response.url, ) return None # check if we are on the summary tab, if not, go to this tab tab = bs.find("table", {"class": "tabs"}) if tab: summary_a = tab.find("a", string="summary") if summary_a: summary_url = urljoin(repository_url, summary_a["href"]).strip("/") if summary_url != repository_url: logger.debug( "%s : Active tab is not the summary, trying to load the summary page", repository_url, ) return self._get_origin_from_repository_url(summary_url) else: logger.debug("No summary tab found on %s", repository_url) # origin urls are listed on the repository page # TODO check if forcing https is better or not ? # # # urls = [x["href"] for x in bs.find_all("a", {"rel": "vcs-git"})] if not urls: logger.debug("No git urls found on %s", repository_url) return None # look for the http/https url, if any, and use it as origin_url for url in urls: if urlparse(url).scheme in ("http", "https"): origin_url = url break else: # otherwise, choose the first one origin_url = urls[0] return origin_url def _parse_last_updated_date(repository: Dict[str, Any]) -> Optional[datetime]: """Parse the last updated date""" date = repository.get("last_updated_date") if not date: return None parsed_date = None for date_format in ("%Y-%m-%d %H:%M:%S %z", "%Y-%m-%d %H:%M:%S (%Z)"): try: parsed_date = datetime.strptime(date, date_format) # force UTC to avoid naive datetime if not parsed_date.tzinfo: parsed_date = parsed_date.replace(tzinfo=timezone.utc) break except Exception: pass if not parsed_date: logger.warning( "Could not parse %s last_updated date: %s", repository["url"], date, ) return parsed_date diff --git a/swh/lister/cgit/tests/test_lister.py b/swh/lister/cgit/tests/test_lister.py index 9b5c0c3..9d0f123 100644 --- a/swh/lister/cgit/tests/test_lister.py +++ b/swh/lister/cgit/tests/test_lister.py @@ -1,280 +1,280 @@ -# Copyright (C) 2019-2021 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import datetime, timedelta, timezone import os from typing import List import pytest from swh.core.pytest_plugin import requests_mock_datadir_factory from swh.lister import __version__ from swh.lister.cgit.lister import CGitLister, _parse_last_updated_date from swh.lister.pattern import ListerStats def test_lister_cgit_get_pages_one_page(requests_mock_datadir, swh_scheduler): url = "https://git.savannah.gnu.org/cgit/" lister_cgit = CGitLister(swh_scheduler, url=url) repos: List[List[str]] = list(lister_cgit.get_pages()) flattened_repos = sum(repos, []) assert len(flattened_repos) == 977 assert flattened_repos[0]["url"] == "https://git.savannah.gnu.org/cgit/elisp-es.git" # note the url below is NOT a subpath of /cgit/ assert ( flattened_repos[-1]["url"] == "https://git.savannah.gnu.org/path/to/yetris.git" ) # noqa # note the url below is NOT on the same server assert flattened_repos[-2]["url"] == "http://example.org/cgit/xstarcastle.git" def test_lister_cgit_get_pages_with_pages(requests_mock_datadir, swh_scheduler): url = "https://git.tizen/cgit/" lister_cgit = CGitLister(swh_scheduler, url=url) repos: List[List[str]] = list(lister_cgit.get_pages()) flattened_repos = sum(repos, []) # we should have 16 repos (listed on 3 pages) assert len(repos) == 3 assert len(flattened_repos) == 16 def test_lister_cgit_run_with_page(requests_mock_datadir, swh_scheduler): """cgit lister supports pagination""" url = "https://git.tizen/cgit/" lister_cgit = CGitLister(swh_scheduler, url=url) stats = lister_cgit.run() expected_nb_origins = 16 assert stats == ListerStats(pages=3, origins=expected_nb_origins) # test page parsing scheduler_origins = swh_scheduler.get_listed_origins( lister_cgit.lister_obj.id ).results assert len(scheduler_origins) == expected_nb_origins # test listed repositories for listed_origin in scheduler_origins: assert listed_origin.visit_type == "git" assert listed_origin.url.startswith("https://git.tizen") # test user agent content assert len(requests_mock_datadir.request_history) != 0 for request in requests_mock_datadir.request_history: assert "User-Agent" in request.headers user_agent = request.headers["User-Agent"] assert "Software Heritage Lister" in user_agent assert __version__ in user_agent def test_lister_cgit_run_populates_last_update(requests_mock_datadir, swh_scheduler): """cgit lister returns last updated date""" url = "https://git.tizen/cgit" urls_without_date = [ f"https://git.tizen.org/cgit/{suffix_url}" for suffix_url in [ "All-Projects", "All-Users", "Lock-Projects", ] ] lister_cgit = CGitLister(swh_scheduler, url=url) stats = lister_cgit.run() expected_nb_origins = 16 assert stats == ListerStats(pages=3, origins=expected_nb_origins) # test page parsing scheduler_origins = swh_scheduler.get_listed_origins( lister_cgit.lister_obj.id ).results assert len(scheduler_origins) == expected_nb_origins # test listed repositories for listed_origin in scheduler_origins: if listed_origin.url in urls_without_date: assert listed_origin.last_update is None else: assert listed_origin.last_update is not None @pytest.mark.parametrize( "date_str,expected_date", [ ({}, None), ("unexpected date", None), ("2020-0140-10 10:10:10 (GMT)", None), ( "2020-01-10 10:10:10 (GMT)", datetime( year=2020, month=1, day=10, hour=10, minute=10, second=10, tzinfo=timezone.utc, ), ), ( "2019-08-04 05:10:41 +0100", datetime( year=2019, month=8, day=4, hour=5, minute=10, second=41, tzinfo=timezone(timedelta(hours=1)), ), ), ], ) def test_lister_cgit_date_parsing(date_str, expected_date): """test cgit lister date parsing""" repository = {"url": "url", "last_updated_date": date_str} assert _parse_last_updated_date(repository) == expected_date requests_mock_datadir_missing_url = requests_mock_datadir_factory( ignore_urls=[ "https://git.tizen/cgit/adaptation/ap_samsung/audio-hal-e4x12", ] ) def test_lister_cgit_get_origin_from_repo_failing( requests_mock_datadir_missing_url, swh_scheduler ): url = "https://git.tizen/cgit/" lister_cgit = CGitLister(swh_scheduler, url=url) stats = lister_cgit.run() expected_nb_origins = 15 assert stats == ListerStats(pages=3, origins=expected_nb_origins) @pytest.mark.parametrize( "credentials, expected_credentials", [ (None, []), ({"key": "value"}, []), ( {"cgit": {"tizen": [{"username": "user", "password": "pass"}]}}, [{"username": "user", "password": "pass"}], ), ], ) def test_lister_cgit_instantiation_with_credentials( credentials, expected_credentials, swh_scheduler ): url = "https://git.tizen/cgit/" lister = CGitLister( swh_scheduler, url=url, instance="tizen", credentials=credentials ) # Credentials are allowed in constructor assert lister.credentials == expected_credentials def test_lister_cgit_from_configfile(swh_scheduler_config, mocker): load_from_envvar = mocker.patch("swh.lister.pattern.load_from_envvar") load_from_envvar.return_value = { "scheduler": {"cls": "local", **swh_scheduler_config}, "url": "https://git.tizen/cgit/", "instance": "tizen", "credentials": {}, } lister = CGitLister.from_configfile() assert lister.scheduler is not None assert lister.credentials is not None @pytest.mark.parametrize( "url,base_git_url,expected_nb_origins", [ ("https://git.eclipse.org/c", "https://eclipse.org/r", 5), ("https://git.baserock.org/cgit/", "https://git.baserock.org/git/", 3), ("https://jff.email/cgit/", "git://jff.email/opt/git/", 6), ], ) def test_lister_cgit_with_base_git_url( url, base_git_url, expected_nb_origins, requests_mock_datadir, swh_scheduler ): """With base git url provided, listed urls should be the computed origin urls""" lister_cgit = CGitLister( swh_scheduler, url=url, base_git_url=base_git_url, ) stats = lister_cgit.run() assert stats == ListerStats(pages=1, origins=expected_nb_origins) # test page parsing scheduler_origins = swh_scheduler.get_listed_origins( lister_cgit.lister_obj.id ).results assert len(scheduler_origins) == expected_nb_origins # test listed repositories for listed_origin in scheduler_origins: assert listed_origin.visit_type == "git" assert listed_origin.url.startswith(base_git_url) assert ( listed_origin.url.startswith(url) is False ), f"url should be mapped to {base_git_url}" def test_lister_cgit_get_pages_with_pages_and_retry( requests_mock_datadir, requests_mock, datadir, mocker, swh_scheduler ): url = "https://git.tizen/cgit/" with open(os.path.join(datadir, "https_git.tizen/cgit,ofs=50"), "rb") as page: requests_mock.get( f"{url}?ofs=50", [ {"content": None, "status_code": 429}, {"content": None, "status_code": 429}, {"content": page.read(), "status_code": 200}, ], ) lister_cgit = CGitLister(swh_scheduler, url=url) - mocker.patch.object(lister_cgit._get_and_parse.retry, "sleep") + mocker.patch.object(lister_cgit.http_request.retry, "sleep") repos: List[List[str]] = list(lister_cgit.get_pages()) flattened_repos = sum(repos, []) # we should have 16 repos (listed on 3 pages) assert len(repos) == 3 assert len(flattened_repos) == 16 def test_lister_cgit_summary_not_default(requests_mock_datadir, swh_scheduler): """cgit lister returns git url when the default repository tab is not the summary""" url = "https://git.acdw.net/cgit" lister_cgit = CGitLister(swh_scheduler, url=url) stats = lister_cgit.run() expected_nb_origins = 1 assert stats == ListerStats(pages=1, origins=expected_nb_origins) diff --git a/swh/lister/debian/lister.py b/swh/lister/debian/lister.py index d31a33d..c37d48f 100644 --- a/swh/lister/debian/lister.py +++ b/swh/lister/debian/lister.py @@ -1,311 +1,308 @@ -# Copyright (C) 2017-2021 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information - import bz2 from collections import defaultdict from dataclasses import dataclass, field from email.utils import parsedate_to_datetime import gzip from itertools import product import logging import lzma import os from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple from urllib.parse import urljoin from debian.deb822 import Sources -import requests +from requests.exceptions import HTTPError from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin -from .. import USER_AGENT from ..pattern import CredentialsType, Lister logger = logging.getLogger(__name__) decompressors: Dict[str, Callable[[Any], Any]] = { "gz": lambda f: gzip.GzipFile(fileobj=f), "bz2": bz2.BZ2File, "xz": lzma.LZMAFile, } Suite = str Component = str PkgName = str PkgVersion = str DebianOrigin = str DebianPageType = Iterator[Sources] @dataclass class DebianListerState: """State of debian lister""" package_versions: Dict[PkgName, Set[PkgVersion]] = field(default_factory=dict) """Dictionary mapping a package name to all the versions found during last listing""" class DebianLister(Lister[DebianListerState, DebianPageType]): """ List source packages for a given debian or derivative distribution. The lister will create a snapshot for each package name from all its available versions. If a package snapshot is different from the last listing operation, it will be send to the scheduler that will create a loading task to archive newly found source code. Args: scheduler: instance of SchedulerInterface distribution: identifier of listed distribution (e.g. Debian, Ubuntu) mirror_url: debian package archives mirror URL suites: list of distribution suites to process components: list of package components to process """ LISTER_NAME = "debian" def __init__( self, scheduler: SchedulerInterface, distribution: str = "Debian", mirror_url: str = "http://deb.debian.org/debian/", suites: List[Suite] = ["stretch", "buster", "bullseye"], components: List[Component] = ["main", "contrib", "non-free"], credentials: Optional[CredentialsType] = None, ): super().__init__( scheduler=scheduler, url=mirror_url, instance=distribution, credentials=credentials, ) # to ensure urljoin will produce valid Sources URL if not self.url.endswith("/"): self.url += "/" self.distribution = distribution self.suites = suites self.components = components - self.session = requests.Session() - self.session.headers.update({"User-Agent": USER_AGENT}) - # will hold all listed origins info self.listed_origins: Dict[DebianOrigin, ListedOrigin] = {} # will contain origin urls that have already been listed # in a previous page self.sent_origins: Set[DebianOrigin] = set() # will contain already listed package info that need to be sent # to the scheduler for update in the commit_page method self.origins_to_update: Dict[DebianOrigin, ListedOrigin] = {} # will contain the lister state after a call to run self.package_versions: Dict[PkgName, Set[PkgVersion]] = {} def state_from_dict(self, d: Dict[str, Any]) -> DebianListerState: return DebianListerState(package_versions={k: set(v) for k, v in d.items()}) def state_to_dict(self, state: DebianListerState) -> Dict[str, Any]: return {k: list(v) for k, v in state.package_versions.items()} def debian_index_urls( self, suite: Suite, component: Component ) -> Iterator[Tuple[str, str]]: """Return an iterator on possible Sources file URLs as multiple compression formats can be used.""" compression_exts = ("xz", "bz2", "gz") base_urls = [ urljoin(self.url, f"dists/{suite}/{component}/source/Sources"), urljoin(self.url, f"dists/{suite}/updates/{component}/source/Sources"), ] for base_url, ext in product(base_urls, compression_exts): yield (f"{base_url}.{ext}", ext) yield (base_url, "") def page_request(self, suite: Suite, component: Component) -> DebianPageType: """Return parsed package Sources file for a given debian suite and component.""" for url, compression in self.debian_index_urls(suite, component): - response = requests.get(url, stream=True) - logging.debug("Fetched URL: %s, status code: %s", url, response.status_code) - if response.status_code == 200: + try: + response = self.http_request(url, stream=True) + except HTTPError: + pass + else: last_modified = response.headers.get("Last-Modified") self.last_sources_update = ( parsedate_to_datetime(last_modified) if last_modified else None ) decompressor = decompressors.get(compression) if decompressor: data = decompressor(response.raw).readlines() else: data = response.raw.readlines() break else: data = "" logger.debug("Could not retrieve sources index for %s/%s", suite, component) return Sources.iter_paragraphs(data) def get_pages(self) -> Iterator[DebianPageType]: """Return an iterator on parsed debian package Sources files, one per combination of debian suite and component.""" for suite, component in product(self.suites, self.components): logger.debug( "Processing %s %s source packages info for %s component.", self.instance, suite, component, ) self.current_suite = suite self.current_component = component yield self.page_request(suite, component) def origin_url_for_package(self, package_name: PkgName) -> DebianOrigin: """Return the origin url for the given package""" return f"deb://{self.instance}/packages/{package_name}" def get_origins_from_page(self, page: DebianPageType) -> Iterator[ListedOrigin]: """Convert a page of debian package sources into an iterator of ListedOrigin. Please note that the returned origins correspond to packages only listed for the first time in order to get an accurate origins counter in the statistics returned by the run method of the lister. Packages already listed in another page but with different versions will be put in cache by the method and updated ListedOrigin objects will be sent to the scheduler later in the commit_page method. Indeed as multiple debian suites can be processed, a similar set of package names can be listed for two different package source pages, only their version will differ, resulting in origins counted multiple times in lister statistics. """ assert self.lister_obj.id is not None origins_to_send = {} self.origins_to_update = {} # iterate on each package source info for src_pkg in page: # gather package files info that will be used by the debian loader files: Dict[str, Dict[str, Any]] = defaultdict(dict) for field_ in src_pkg._multivalued_fields: if field_.startswith("checksums-"): sum_name = field_[len("checksums-") :] else: sum_name = "md5sum" if field_ in src_pkg: for entry in src_pkg[field_]: name = entry["name"] files[name]["name"] = name files[name]["size"] = int(entry["size"], 10) files[name][sum_name] = entry[sum_name] files[name]["uri"] = os.path.join( self.url, src_pkg["Directory"], name ) # extract package name and version package_name = src_pkg["Package"] package_version = src_pkg["Version"] # build origin url origin_url = self.origin_url_for_package(package_name) # create package version key as expected by the debian loader package_version_key = ( f"{self.current_suite}/{self.current_component}/{package_version}" ) # this is the first time a package is listed if origin_url not in self.listed_origins: # create a ListedOrigin object for it that can be later # updated with new package versions info self.listed_origins[origin_url] = ListedOrigin( lister_id=self.lister_obj.id, url=origin_url, visit_type="deb", extra_loader_arguments={"packages": {}}, last_update=self.last_sources_update, ) # origin will be yielded at the end of that method origins_to_send[origin_url] = self.listed_origins[origin_url] # init set that will contain all listed package versions self.package_versions[package_name] = set() # package has already been listed in a previous page or current page elif origin_url not in origins_to_send: # if package has been listed in a previous page, its new versions # will be added to its ListedOrigin object but the update will # be sent to the scheduler in the commit_page method self.origins_to_update[origin_url] = self.listed_origins[origin_url] # update package versions data in parameter that will be provided # to the debian loader self.listed_origins[origin_url].extra_loader_arguments["packages"].update( { package_version_key: { "name": package_name, "version": package_version, "files": files, } } ) if self.listed_origins[origin_url].last_update is None or ( self.last_sources_update is not None and self.last_sources_update # type: ignore > self.listed_origins[origin_url].last_update ): # update debian package last update if current processed sources index # has a greater modification date self.listed_origins[origin_url].last_update = self.last_sources_update # add package version key to the set of found versions self.package_versions[package_name].add(package_version_key) # package has already been listed during a previous listing process if package_name in self.state.package_versions: new_versions = ( self.package_versions[package_name] - self.state.package_versions[package_name] ) # no new versions so far, no need to send the origin to the scheduler if not new_versions: origins_to_send.pop(origin_url, None) self.origins_to_update.pop(origin_url, None) # new versions found, ensure the origin will be sent to the scheduler elif origin_url not in self.sent_origins: self.origins_to_update.pop(origin_url, None) origins_to_send[origin_url] = self.listed_origins[origin_url] # update already counted origins with changes since last page self.sent_origins.update(origins_to_send.keys()) logger.debug( "Found %s new packages, %s packages with new versions.", len(origins_to_send), len(self.origins_to_update), ) logger.debug( "Current total number of listed packages is equal to %s.", len(self.listed_origins), ) yield from origins_to_send.values() def get_origins_to_update(self) -> Iterator[ListedOrigin]: yield from self.origins_to_update.values() def commit_page(self, page: DebianPageType): """Send to scheduler already listed origins where new versions have been found in current page.""" self.send_origins(self.get_origins_to_update()) def finalize(self): # set mapping between listed package names and versions as lister state self.state.package_versions = self.package_versions self.updated = len(self.sent_origins) > 0 diff --git a/swh/lister/gitlab/lister.py b/swh/lister/gitlab/lister.py index c878788..f57b7e2 100644 --- a/swh/lister/gitlab/lister.py +++ b/swh/lister/gitlab/lister.py @@ -1,265 +1,260 @@ -# Copyright (C) 2018-2021 The Software Heritage developers +# Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import asdict, dataclass import logging import random from typing import Any, Dict, Iterator, Optional, Tuple from urllib.parse import parse_qs, urlencode, urlparse import iso8601 -import requests from requests.exceptions import HTTPError from requests.status_codes import codes from tenacity.before_sleep import before_sleep_log -from swh.lister import USER_AGENT from swh.lister.pattern import CredentialsType, Lister from swh.lister.utils import http_retry, is_retryable_exception from swh.scheduler.model import ListedOrigin logger = logging.getLogger(__name__) # Some instance provides hg_git type which can be ingested as hg origins VCS_MAPPING = {"hg_git": "hg"} @dataclass class GitLabListerState: """State of the GitLabLister""" last_seen_next_link: Optional[str] = None """Last link header (not visited yet) during an incremental pass """ Repository = Dict[str, Any] @dataclass class PageResult: """Result from a query to a gitlab project api page.""" repositories: Optional[Tuple[Repository, ...]] = None next_page: Optional[str] = None def _if_rate_limited(retry_state) -> bool: """Custom tenacity retry predicate for handling HTTP responses with status code 403 with specific ratelimit header. """ attempt = retry_state.outcome if attempt.failed: exc = attempt.exception() return ( isinstance(exc, HTTPError) and exc.response.status_code == codes.forbidden and int(exc.response.headers.get("RateLimit-Remaining", "0")) == 0 ) or is_retryable_exception(exc) return False def _parse_id_after(url: Optional[str]) -> Optional[int]: """Given an url, extract a return the 'id_after' query parameter associated value or None. This is the the repository id used for pagination purposes. """ if not url: return None # link: https://${project-api}/?...&id_after=2x... query_data = parse_qs(urlparse(url).query) page = query_data.get("id_after") if page and len(page) > 0: return int(page[0]) return None class GitLabLister(Lister[GitLabListerState, PageResult]): """List origins for a gitlab instance. By default, the lister runs in incremental mode: it lists all repositories, starting with the `last_seen_next_link` stored in the scheduler backend. Args: scheduler: a scheduler instance url: the api v4 url of the gitlab instance to visit (e.g. https://gitlab.com/api/v4/) instance: a specific instance name (e.g. gitlab, tor, git-kernel, ...), url network location will be used if not provided incremental: defines if incremental listing is activated or not """ def __init__( self, scheduler, url: str, name: Optional[str] = "gitlab", instance: Optional[str] = None, credentials: Optional[CredentialsType] = None, incremental: bool = False, ): if name is not None: self.LISTER_NAME = name super().__init__( scheduler=scheduler, url=url.rstrip("/"), instance=instance, credentials=credentials, ) self.incremental = incremental self.last_page: Optional[str] = None self.per_page = 100 - self.session = requests.Session() - self.session.headers.update( - {"Accept": "application/json", "User-Agent": USER_AGENT} - ) + self.session.headers.update({"Accept": "application/json"}) if len(self.credentials) > 0: cred = random.choice(self.credentials) logger.info( "Using %s credentials from user %s", self.instance, cred["username"] ) api_token = cred["password"] if api_token: self.session.headers["Authorization"] = f"Bearer {api_token}" def state_from_dict(self, d: Dict[str, Any]) -> GitLabListerState: return GitLabListerState(**d) def state_to_dict(self, state: GitLabListerState) -> Dict[str, Any]: return asdict(state) @http_retry( retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING) ) def get_page_result(self, url: str) -> PageResult: logger.debug("Fetching URL %s", url) response = self.session.get(url) if response.status_code != 200: logger.warning( "Unexpected HTTP status code %s on %s: %s", response.status_code, response.url, response.content, ) # GitLab API can return errors 500 when listing projects. # https://gitlab.com/gitlab-org/gitlab/-/issues/262629 # To avoid ending the listing prematurely, skip buggy URLs and move # to next pages. if response.status_code == 500: id_after = _parse_id_after(url) assert id_after is not None while True: next_id_after = id_after + self.per_page url = url.replace(f"id_after={id_after}", f"id_after={next_id_after}") response = self.session.get(url) if response.status_code == 200: break else: id_after = next_id_after else: response.raise_for_status() repositories: Tuple[Repository, ...] = tuple(response.json()) if hasattr(response, "links") and response.links.get("next"): next_page = response.links["next"]["url"] else: next_page = None return PageResult(repositories, next_page) def page_url(self, id_after: Optional[int] = None) -> str: parameters = { "pagination": "keyset", "order_by": "id", "sort": "asc", "simple": "true", "per_page": f"{self.per_page}", } if id_after is not None: parameters["id_after"] = str(id_after) return f"{self.url}/projects?{urlencode(parameters)}" def get_pages(self) -> Iterator[PageResult]: next_page: Optional[str] if self.incremental and self.state and self.state.last_seen_next_link: next_page = self.state.last_seen_next_link else: next_page = self.page_url() while next_page: self.last_page = next_page page_result = self.get_page_result(next_page) yield page_result next_page = page_result.next_page def get_origins_from_page(self, page_result: PageResult) -> Iterator[ListedOrigin]: assert self.lister_obj.id is not None repositories = page_result.repositories if page_result.repositories else [] for repo in repositories: visit_type = repo.get("vcs_type", "git") visit_type = VCS_MAPPING.get(visit_type, visit_type) yield ListedOrigin( lister_id=self.lister_obj.id, url=repo["http_url_to_repo"], visit_type=visit_type, last_update=iso8601.parse_date(repo["last_activity_at"]), ) def commit_page(self, page_result: PageResult) -> None: """Update currently stored state using the latest listed "next" page if relevant. Relevancy is determined by the next_page link whose 'page' id must be strictly superior to the currently stored one. Note: this is a noop for full listing mode """ if self.incremental: # link: https://${project-api}/?...&page=2x... next_page = page_result.next_page if not next_page and self.last_page: next_page = self.last_page if next_page: id_after = _parse_id_after(next_page) previous_next_page = self.state.last_seen_next_link previous_id_after = _parse_id_after(previous_next_page) if previous_next_page is None or ( previous_id_after and id_after and previous_id_after < id_after ): self.state.last_seen_next_link = next_page def finalize(self) -> None: """finalize the lister state when relevant (see `fn:commit_page` for details) Note: this is a noop for full listing mode """ next_page = self.state.last_seen_next_link if self.incremental and next_page: # link: https://${project-api}/?...&page=2x... next_id_after = _parse_id_after(next_page) scheduler_state = self.get_state_from_scheduler() previous_next_id_after = _parse_id_after( scheduler_state.last_seen_next_link ) if (not previous_next_id_after and next_id_after) or ( previous_next_id_after and next_id_after and previous_next_id_after < next_id_after ): self.updated = True diff --git a/swh/lister/gogs/lister.py b/swh/lister/gogs/lister.py index 17ec17f..f87100d 100644 --- a/swh/lister/gogs/lister.py +++ b/swh/lister/gogs/lister.py @@ -1,220 +1,206 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information + from dataclasses import asdict, dataclass import logging import random from typing import Any, Dict, Iterator, List, Optional, Tuple from urllib.parse import parse_qs, parse_qsl, urlencode, urljoin, urlparse import iso8601 -import requests -from tenacity.before_sleep import before_sleep_log +from requests.exceptions import HTTPError -from swh.lister.utils import http_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin -from .. import USER_AGENT from ..pattern import CredentialsType, Lister logger = logging.getLogger(__name__) Repo = Dict[str, Any] @dataclass class GogsListerPage: repos: Optional[List[Repo]] = None next_link: Optional[str] = None @dataclass class GogsListerState: last_seen_next_link: Optional[str] = None """Last link header (could be already visited) during an incremental pass.""" last_seen_repo_id: Optional[int] = None """Last repo id seen during an incremental pass.""" def _parse_page_id(url: Optional[str]) -> int: """Parse the page id from a Gogs page url.""" if url is None: return 0 return int(parse_qs(urlparse(url).query)["page"][0]) class GogsLister(Lister[GogsListerState, GogsListerPage]): """List origins from the Gogs Gogs API documentation: https://github.com/gogs/docs-api The API is protected behind authentication so credentials/API tokens are mandatory. It supports pagination and provides next page URL through the 'next' value of the 'Link' header. The default value for page size ('limit') is 10 but the maximum allowed value is 50. """ LISTER_NAME = "gogs" VISIT_TYPE = "git" REPO_LIST_PATH = "repos/search" def __init__( self, scheduler: SchedulerInterface, url: str, instance: Optional[str] = None, api_token: Optional[str] = None, page_size: int = 50, credentials: CredentialsType = None, ): super().__init__( scheduler=scheduler, credentials=credentials, url=url, instance=instance, ) self.query_params = { "limit": page_size, } self.api_token = api_token if self.api_token is None: if len(self.credentials) > 0: cred = random.choice(self.credentials) username = cred.get("username") self.api_token = cred["password"] logger.info("Using authentication credentials from user %s", username) else: # Raises an error on Gogs, or a warning on Gitea self.on_anonymous_mode() - self.session = requests.Session() - self.session.headers.update( - { - "Accept": "application/json", - "User-Agent": USER_AGENT, - } - ) + self.session.headers.update({"Accept": "application/json"}) if self.api_token: self.session.headers["Authorization"] = f"token {self.api_token}" def on_anonymous_mode(self): raise ValueError("No credentials or API token provided") def state_from_dict(self, d: Dict[str, Any]) -> GogsListerState: return GogsListerState(**d) def state_to_dict(self, state: GogsListerState) -> Dict[str, Any]: return asdict(state) - @http_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) def page_request( self, url: str, params: Dict[str, Any] ) -> Tuple[Dict[str, Any], Dict[str, Any]]: logger.debug("Fetching URL %s with params %s", url, params) - response = self.session.get(url, params=params) - - if response.status_code != 200: - logger.warning( - "Unexpected HTTP status code %s on %s: %s", - response.status_code, - response.url, - response.content, - ) - if ( - response.status_code == 500 - ): # Temporary hack for skipping fatal repos (T4423) - url_parts = urlparse(url) - query: Dict[str, Any] = dict(parse_qsl(url_parts.query)) - query.update({"page": _parse_page_id(url) + 1}) - next_page_link = url_parts._replace(query=urlencode(query)).geturl() - body: Dict[str, Any] = {"data": []} - links = {"next": {"url": next_page_link}} - return body, links - else: - response.raise_for_status() + try: + response = self.http_request(url, params=params) + except HTTPError as http_error: + if ( + http_error.response.status_code == 500 + ): # Temporary hack for skipping fatal repos (T4423) + url_parts = urlparse(url) + query: Dict[str, Any] = dict(parse_qsl(url_parts.query)) + query.update({"page": _parse_page_id(url) + 1}) + next_page_link = url_parts._replace(query=urlencode(query)).geturl() + body: Dict[str, Any] = {"data": []} + links = {"next": {"url": next_page_link}} + return body, links + else: + raise return response.json(), response.links @classmethod def extract_repos(cls, body: Dict[str, Any]) -> List[Repo]: fields_filter = ["id", "clone_url", "updated_at"] return [{k: r[k] for k in fields_filter} for r in body["data"]] def get_pages(self) -> Iterator[GogsListerPage]: page_id = 1 if self.state.last_seen_next_link is not None: page_id = _parse_page_id(self.state.last_seen_next_link) # base with trailing slash, path without leading slash for urljoin next_link: Optional[str] = urljoin(self.url, self.REPO_LIST_PATH) + assert next_link is not None body, links = self.page_request( next_link, {**self.query_params, "page": page_id} ) while next_link is not None: repos = self.extract_repos(body) assert len(links) > 0, "API changed: no Link header found" if "next" in links: next_link = links["next"]["url"] else: next_link = None # Happens for the last page yield GogsListerPage(repos=repos, next_link=next_link) if next_link is not None: body, links = self.page_request(next_link, {}) def get_origins_from_page(self, page: GogsListerPage) -> Iterator[ListedOrigin]: """Convert a page of Gogs repositories into a list of ListedOrigins""" assert self.lister_obj.id is not None assert page.repos is not None for r in page.repos: last_update = iso8601.parse_date(r["updated_at"]) yield ListedOrigin( lister_id=self.lister_obj.id, visit_type=self.VISIT_TYPE, url=r["clone_url"], last_update=last_update, ) def commit_page(self, page: GogsListerPage) -> None: last_seen_next_link = page.next_link page_id = _parse_page_id(last_seen_next_link) state_page_id = _parse_page_id(self.state.last_seen_next_link) if page_id > state_page_id: self.state.last_seen_next_link = last_seen_next_link if (page.repos is not None) and len(page.repos) > 0: self.state.last_seen_repo_id = page.repos[-1]["id"] def finalize(self) -> None: scheduler_state = self.get_state_from_scheduler() state_page_id = _parse_page_id(self.state.last_seen_next_link) scheduler_page_id = _parse_page_id(scheduler_state.last_seen_next_link) state_last_repo_id = self.state.last_seen_repo_id or 0 scheduler_last_repo_id = scheduler_state.last_seen_repo_id or 0 if (state_page_id >= scheduler_page_id) and ( state_last_repo_id > scheduler_last_repo_id ): self.updated = True # Marked updated only if it finds new repos diff --git a/swh/lister/gogs/tests/test_lister.py b/swh/lister/gogs/tests/test_lister.py index dfcd991..4f9e370 100644 --- a/swh/lister/gogs/tests/test_lister.py +++ b/swh/lister/gogs/tests/test_lister.py @@ -1,330 +1,330 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from pathlib import Path from typing import List from unittest.mock import Mock import pytest from requests import HTTPError from swh.lister.gogs.lister import GogsLister, GogsListerPage, _parse_page_id from swh.scheduler.model import ListedOrigin TRY_GOGS_URL = "https://try.gogs.io/api/v1/" def try_gogs_page(n: int): return TRY_GOGS_URL + GogsLister.REPO_LIST_PATH + f"?page={n}&limit=3" P1 = try_gogs_page(1) P2 = try_gogs_page(2) P3 = try_gogs_page(3) P4 = try_gogs_page(4) @pytest.fixture def trygogs_p1(datadir): text = Path(datadir, "https_try.gogs.io", "repos_page1").read_text() headers = {"Link": f'<{P2}>; rel="next"'} page_result = GogsListerPage( repos=GogsLister.extract_repos(json.loads(text)), next_link=P2 ) origin_urls = [r["clone_url"] for r in page_result.repos] return text, headers, page_result, origin_urls @pytest.fixture def trygogs_p2(datadir): text = Path(datadir, "https_try.gogs.io", "repos_page2").read_text() headers = {"Link": f'<{P3}>; rel="next",<{P1}>; rel="prev"'} page_result = GogsListerPage( repos=GogsLister.extract_repos(json.loads(text)), next_link=P3 ) origin_urls = [r["clone_url"] for r in page_result.repos] return text, headers, page_result, origin_urls @pytest.fixture def trygogs_p3(datadir): text = Path(datadir, "https_try.gogs.io", "repos_page3").read_text() headers = {"Link": f'<{P4}>; rel="next",<{P2}>; rel="prev"'} page_result = GogsListerPage( repos=GogsLister.extract_repos(json.loads(text)), next_link=P3 ) origin_urls = [r["clone_url"] for r in page_result.repos] return text, headers, page_result, origin_urls @pytest.fixture def trygogs_p4(datadir): text = Path(datadir, "https_try.gogs.io", "repos_page4").read_text() headers = {"Link": f'<{P3}>; rel="prev"'} page_result = GogsListerPage( repos=GogsLister.extract_repos(json.loads(text)), next_link=P3 ) origin_urls = [r["clone_url"] for r in page_result.repos] return text, headers, page_result, origin_urls @pytest.fixture def trygogs_p3_last(datadir): text = Path(datadir, "https_try.gogs.io", "repos_page3").read_text() headers = {"Link": f'<{P2}>; rel="prev",<{P1}>; rel="first"'} page_result = GogsListerPage( repos=GogsLister.extract_repos(json.loads(text)), next_link=None ) origin_urls = [r["clone_url"] for r in page_result.repos] return text, headers, page_result, origin_urls @pytest.fixture def trygogs_p3_empty(): origins_urls = [] body = {"data": [], "ok": True} headers = {"Link": f'<{P2}>; rel="prev",<{P1}>; rel="first"'} page_result = GogsListerPage(repos=GogsLister.extract_repos(body), next_link=None) text = json.dumps(body) return text, headers, page_result, origins_urls def check_listed_origins(lister_urls: List[str], scheduler_origins: List[ListedOrigin]): """Asserts that the two collections have the same origin URLs. Does not test last_update.""" assert set(lister_urls) == {origin.url for origin in scheduler_origins} def test_gogs_full_listing( swh_scheduler, requests_mock, mocker, trygogs_p1, trygogs_p2, trygogs_p3_last ): kwargs = dict( url=TRY_GOGS_URL, instance="try_gogs", page_size=3, api_token="secret" ) lister = GogsLister(scheduler=swh_scheduler, **kwargs) lister.get_origins_from_page: Mock = mocker.spy(lister, "get_origins_from_page") p1_text, p1_headers, p1_result, p1_origin_urls = trygogs_p1 p2_text, p2_headers, p2_result, p2_origin_urls = trygogs_p2 p3_text, p3_headers, p3_result, p3_origin_urls = trygogs_p3_last requests_mock.get(P1, text=p1_text, headers=p1_headers) requests_mock.get(P2, text=p2_text, headers=p2_headers) requests_mock.get(P3, text=p3_text, headers=p3_headers) stats = lister.run() assert stats.pages == 3 assert stats.origins == 9 calls = map(mocker.call, [p1_result, p2_result, p3_result]) lister.get_origins_from_page.assert_has_calls(list(calls)) scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results check_listed_origins( p1_origin_urls + p2_origin_urls + p3_origin_urls, scheduler_origins ) assert ( lister.get_state_from_scheduler().last_seen_next_link == P3 ) # P3 didn't provide any next link so it remains the last_seen_next_link def test_gogs_auth_instance( swh_scheduler, requests_mock, trygogs_p1, trygogs_p2, trygogs_p3_empty ): """Covers token authentication, token from credentials, instance inference from URL.""" api_token = "secret" instance = "try_gogs" # Test lister initialization without api_token or credentials: with pytest.raises(ValueError, match="No credentials or API token provided"): kwargs1 = dict(url=TRY_GOGS_URL, instance=instance) GogsLister(scheduler=swh_scheduler, **kwargs1) # Test lister initialization using api_token: kwargs2 = dict(url=TRY_GOGS_URL, api_token=api_token, instance=instance) lister = GogsLister(scheduler=swh_scheduler, **kwargs2) assert lister.session.headers["Authorization"].lower() == "token %s" % api_token # Test lister initialization with credentials and run it: creds = {"gogs": {instance: [{"username": "u", "password": api_token}]}} kwargs3 = dict(url=TRY_GOGS_URL, credentials=creds, instance=instance, page_size=3) lister = GogsLister(scheduler=swh_scheduler, **kwargs3) assert lister.session.headers["Authorization"].lower() == "token %s" % api_token assert lister.instance == "try_gogs" # setup requests mocking p1_text, p1_headers, _, _ = trygogs_p1 p2_text, p2_headers, _, _ = trygogs_p2 p3_text, p3_headers, _, _ = trygogs_p3_empty requests_mock.get(P1, text=p1_text, headers=p1_headers) requests_mock.get(P2, text=p2_text, headers=p2_headers) requests_mock.get(P3, text=p3_text, headers=p3_headers) # lister should run without any error and extract the origins stats = lister.run() assert stats.pages == 3 assert stats.origins == 6 @pytest.mark.parametrize("http_code", [400, 500]) def test_gogs_list_http_error( swh_scheduler, requests_mock, http_code, trygogs_p1, trygogs_p3_last ): """Test handling of some HTTP errors commonly encountered""" lister = GogsLister(scheduler=swh_scheduler, url=TRY_GOGS_URL, api_token="secret") p1_text, p1_headers, _, p1_origin_urls = trygogs_p1 p3_text, p3_headers, _, p3_origin_urls = trygogs_p3_last base_url = TRY_GOGS_URL + lister.REPO_LIST_PATH requests_mock.get( base_url, [ {"text": p1_text, "headers": p1_headers, "status_code": 200}, {"status_code": http_code}, {"text": p3_text, "headers": p3_headers, "status_code": 200}, ], ) # pages with fatal repositories should be skipped (no error raised) # See T4423 for more details if http_code == 500: lister.run() else: with pytest.raises(HTTPError): lister.run() # Both P1 and P3 origins should be listed in case of 500 error # While in other cases, only P1 origins should be listed scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results check_listed_origins( (p1_origin_urls + p3_origin_urls) if http_code == 500 else p1_origin_urls, scheduler_origins, ) def test_gogs_incremental_lister( swh_scheduler, requests_mock, mocker, trygogs_p1, trygogs_p2, trygogs_p3, trygogs_p3_last, trygogs_p3_empty, trygogs_p4, ): kwargs = dict( url=TRY_GOGS_URL, instance="try_gogs", page_size=3, api_token="secret" ) lister = GogsLister(scheduler=swh_scheduler, **kwargs) lister.get_origins_from_page: Mock = mocker.spy(lister, "get_origins_from_page") # First listing attempt: P1 and P2 return 3 origins each # while P3 (current last page) is empty. p1_text, p1_headers, p1_result, p1_origin_urls = trygogs_p1 p2_text, p2_headers, p2_result, p2_origin_urls = trygogs_p2 p3_text, p3_headers, p3_result, p3_origin_urls = trygogs_p3_empty requests_mock.get(P1, text=p1_text, headers=p1_headers) requests_mock.get(P2, text=p2_text, headers=p2_headers) requests_mock.get(P3, text=p3_text, headers=p3_headers) attempt1_stats = lister.run() assert attempt1_stats.pages == 3 assert attempt1_stats.origins == 6 scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results lister_state = lister.get_state_from_scheduler() assert lister_state.last_seen_next_link == P3 assert lister_state.last_seen_repo_id == p2_result.repos[-1]["id"] assert lister.updated check_listed_origins(p1_origin_urls + p2_origin_urls, scheduler_origins) lister.updated = False # Reset the flag # Second listing attempt: P3 isn't empty anymore. # The lister should restart from last state and hence revisit P3. p3_text, p3_headers, p3_result, p3_origin_urls = trygogs_p3_last requests_mock.get(P3, text=p3_text, headers=p3_headers) - lister.session.get = mocker.spy(lister.session, "get") + lister.session.request = mocker.spy(lister.session, "request") attempt2_stats = lister.run() assert attempt2_stats.pages == 1 assert attempt2_stats.origins == 3 scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results page_id = _parse_page_id(lister_state.last_seen_next_link) query_params = lister.query_params query_params["page"] = page_id - lister.session.get.assert_called_once_with( - TRY_GOGS_URL + lister.REPO_LIST_PATH, params=query_params + lister.session.request.assert_called_once_with( + "GET", TRY_GOGS_URL + lister.REPO_LIST_PATH, params=query_params ) # All the 9 origins (3 pages) should be passed on to the scheduler: check_listed_origins( p1_origin_urls + p2_origin_urls + p3_origin_urls, scheduler_origins ) lister_state = lister.get_state_from_scheduler() assert lister_state.last_seen_next_link == P3 assert lister_state.last_seen_repo_id == p3_result.repos[-1]["id"] assert lister.updated lister.updated = False # Reset the flag # Third listing attempt: No new origins # The lister should revisit last seen page (P3) attempt3_stats = lister.run() assert attempt3_stats.pages == 1 assert attempt3_stats.origins == 3 lister_state = lister.get_state_from_scheduler() assert lister_state.last_seen_next_link == P3 assert lister_state.last_seen_repo_id == p3_result.repos[-1]["id"] assert lister.updated is False # No new origins so state isn't updated. # Fourth listing attempt: Page 4 is introduced and returns 3 new origins # The lister should revisit last seen page (P3) as well as P4. p3_text, p3_headers, p3_result, p3_origin_urls = trygogs_p3 # new P3 points to P4 p4_text, p4_headers, p4_result, p4_origin_urls = trygogs_p4 requests_mock.get(P3, text=p3_text, headers=p3_headers) requests_mock.get(P4, text=p4_text, headers=p4_headers) attempt4_stats = lister.run() assert attempt4_stats.pages == 2 assert attempt4_stats.origins == 6 lister_state = lister.get_state_from_scheduler() assert lister_state.last_seen_next_link == P4 assert lister_state.last_seen_repo_id == p4_result.repos[-1]["id"] assert lister.updated # All the 12 origins (4 pages) should be passed on to the scheduler: scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results check_listed_origins( p1_origin_urls + p2_origin_urls + p3_origin_urls + p4_origin_urls, scheduler_origins, ) diff --git a/swh/lister/golang/lister.py b/swh/lister/golang/lister.py index 87ea9f8..10e5935 100644 --- a/swh/lister/golang/lister.py +++ b/swh/lister/golang/lister.py @@ -1,187 +1,164 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import dataclass from datetime import datetime import json import logging from typing import Any, Dict, Iterator, List, Optional, Tuple import iso8601 -import requests -from tenacity import before_sleep_log -from swh.lister.utils import http_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin -from .. import USER_AGENT from ..pattern import CredentialsType, Lister logger = logging.getLogger(__name__) @dataclass class GolangStateType: last_seen: Optional[datetime] = None """Last timestamp of a package version we have saved. Used as a starting point for an incremental listing.""" GolangPageType = List[Dict[str, Any]] class GolangLister(Lister[GolangStateType, GolangPageType]): """ List all Golang modules and send associated origins to scheduler. The lister queries the Golang module index, whose documentation can be found at https://index.golang.org """ GOLANG_MODULES_INDEX_URL = "https://index.golang.org/index" # `limit` seems to be... limited to 2000. GOLANG_MODULES_INDEX_LIMIT = 2000 LISTER_NAME = "golang" def __init__( self, scheduler: SchedulerInterface, incremental: bool = False, credentials: CredentialsType = None, ): super().__init__( scheduler=scheduler, url=self.GOLANG_MODULES_INDEX_URL, instance=self.LISTER_NAME, credentials=credentials, ) - self.session = requests.Session() - self.session.headers.update( - {"Accept": "application/json", "User-Agent": USER_AGENT} - ) + self.session.headers.update({"Accept": "application/json"}) self.incremental = incremental def state_from_dict(self, d: Dict[str, Any]) -> GolangStateType: as_string = d.get("last_seen") last_seen = iso8601.parse_date(as_string) if as_string is not None else None return GolangStateType(last_seen=last_seen) def state_to_dict(self, state: GolangStateType) -> Dict[str, Any]: return { "last_seen": state.last_seen.isoformat() if state.last_seen is not None else None } def finalize(self): if self.incremental and self.state.last_seen is not None: scheduler_state = self.get_state_from_scheduler() if ( scheduler_state.last_seen is None or self.state.last_seen > scheduler_state.last_seen ): self.updated = True - @http_retry( - before_sleep=before_sleep_log(logger, logging.WARNING), - ) def api_request(self, url: str) -> List[str]: - logger.debug("Fetching URL %s", url) - - response = self.session.get(url) - - if response.status_code not in (200, 304): - # Log response content to ease debugging - logger.warning( - "Unexpected HTTP status code %s for URL %s", - response.status_code, - response.url, - ) - - response.raise_for_status() - + response = self.http_request(url) return response.text.split() def get_single_page( self, since: Optional[datetime] = None ) -> Tuple[GolangPageType, Optional[datetime]]: """Return a page from the API and the timestamp of its last entry. Since all entries are sorted by chronological order, the timestamp is useful both for pagination and later for incremental runs.""" url = f"{self.url}?limit={self.GOLANG_MODULES_INDEX_LIMIT}" if since is not None: # The Golang index does not understand `+00:00` for some reason # and expects the "timezone zero" notation instead. This works # because all times are UTC. utc_offset = since.utcoffset() assert ( utc_offset is not None and utc_offset.total_seconds() == 0 ), "Non-UTC datetime" as_date = since.isoformat().replace("+00:00", "Z") url = f"{url}&since={as_date}" entries = self.api_request(url) page: GolangPageType = [] if not entries: return page, since for as_json in entries: entry = json.loads(as_json) timestamp = iso8601.parse_date(entry["Timestamp"]) # We've already parsed it and we'll need the datetime later, save it entry["Timestamp"] = timestamp page.append(entry) # The index is guaranteed to be sorted in chronological order since = timestamp return page, since def get_pages(self) -> Iterator[GolangPageType]: since = None if self.incremental: since = self.state.last_seen page, since = self.get_single_page(since=since) if since == self.state.last_seen: # The index returns packages whose timestamp are greater or # equal to the date provided as parameter, which will create # an infinite loop if not stopped here. return [], since if since is not None: self.state.last_seen = since while page: yield page page, since = self.get_single_page(since=since) if since == self.state.last_seen: return [], since if since is not None: self.state.last_seen = since def get_origins_from_page(self, page: GolangPageType) -> Iterator[ListedOrigin]: """ Iterate on all Golang projects and yield ListedOrigin instances. """ assert self.lister_obj.id is not None for module in page: path = module["Path"] # The loader will be expected to use the golang proxy to do the # actual downloading. We're using `pkg.go.dev` so that the URL points # to somewhere useful for a human instead of an (incomplete) API path. origin_url = f"https://pkg.go.dev/{path}" # Since the Go index lists versions and not just packages, there will # be duplicates. Fortunately, `ListedOrigins` are "upserted" server-side, # so only the last timestamp will be used, with no duplicates. # Performance should not be an issue as they are sent to the db in bulk. yield ListedOrigin( lister_id=self.lister_obj.id, url=origin_url, visit_type="golang", last_update=module["Timestamp"], ) diff --git a/swh/lister/golang/tests/test_lister.py b/swh/lister/golang/tests/test_lister.py index 3cc9c64..e1abe90 100644 --- a/swh/lister/golang/tests/test_lister.py +++ b/swh/lister/golang/tests/test_lister.py @@ -1,241 +1,242 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from pathlib import Path import iso8601 from swh.lister.golang.lister import GolangLister, GolangStateType from swh.lister.tests.test_utils import assert_sleep_calls from swh.lister.utils import WAIT_EXP_BASE # https://pkg.go.dev prefix omitted expected_listed = [ ("collectd.org", "2019-04-11T18:47:25.450546+00:00"), ( "github.com/blang/semver", "2019-04-15T13:54:39.107258+00:00", ), ( "github.com/bmizerany/pat", "2019-04-11T18:47:29.390564+00:00", ), ( "github.com/djherbis/buffer", "2019-04-11T18:47:29.974874+00:00", ), ( "github.com/djherbis/nio", "2019-04-11T18:47:32.283312+00:00", ), ( "github.com/gobuffalo/buffalo-plugins", "2019-04-15T13:54:34.222985+00:00", ), ( "github.com/gobuffalo/buffalo-pop", "2019-04-15T13:54:39.135792+00:00", ), ( "github.com/gobuffalo/clara", "2019-04-15T13:54:40.651916+00:00", ), ( "github.com/gobuffalo/genny", "2019-04-15T13:54:37.841547+00:00", ), ( "github.com/gobuffalo/packr", "2019-04-15T13:54:35.688900+00:00", ), ( "github.com/markbates/refresh", "2019-04-15T13:54:35.250835+00:00", ), ( "github.com/mitchellh/go-homedir", "2019-04-15T13:54:35.678214+00:00", ), ( "github.com/nats-io/nuid", "2019-04-11T18:47:28.102348+00:00", ), ( "github.com/oklog/ulid", "2019-04-11T18:47:23.234198+00:00", ), ( "github.com/pkg/errors", "2019-04-18T02:07:41.336899+00:00", ), ( "golang.org/x/sys", "2019-04-15T13:54:37.555525+00:00", ), ("golang.org/x/text", "2019-04-10T19:08:52.997264+00:00"), # only one x/tools listed even though there are two version, and only the # latest one's timestamp is used. ( "golang.org/x/tools", "2019-04-15T13:54:41.905064+00:00", ), ] def _generate_responses(datadir, requests_mock): responses = [] for file in Path(datadir).glob("page-*.txt"): # Test that throttling and server errors are retries responses.append({"text": "", "status_code": 429}) responses.append({"text": "", "status_code": 500}) # Also test that the lister appropriately gets out of the infinite loop responses.append({"text": file.read_text(), "status_code": 200}) requests_mock.get(GolangLister.GOLANG_MODULES_INDEX_URL, responses) def test_golang_lister(swh_scheduler, mocker, requests_mock, datadir): - # first listing, should return one origin per package - lister = GolangLister(scheduler=swh_scheduler) # Exponential retries take a long time, so stub time.sleep - mocked_sleep = mocker.patch.object(lister.api_request.retry, "sleep") + mocked_sleep = mocker.patch.object(GolangLister.http_request.retry, "sleep") + + # first listing, should return one origin per package + lister = GolangLister(scheduler=swh_scheduler) _generate_responses(datadir, requests_mock) stats = lister.run() assert stats.pages == 3 # The two `golang.org/x/tools` versions are *not* listed as separate origins assert stats.origins == 18 scheduler_origins = sorted( swh_scheduler.get_listed_origins(lister.lister_obj.id).results, key=lambda x: x.url, ) for scheduled, (url, timestamp) in zip(scheduler_origins, expected_listed): assert scheduled.url == f"https://pkg.go.dev/{url}" assert scheduled.last_update == iso8601.parse_date(timestamp) assert scheduled.visit_type == "golang" assert len(scheduler_origins) == len(expected_listed) # Test `time.sleep` is called with exponential retries assert_sleep_calls( mocker, mocked_sleep, [1, WAIT_EXP_BASE, 1, WAIT_EXP_BASE, 1, WAIT_EXP_BASE] ) # doing it all again (without incremental) should give us the same result lister = GolangLister(scheduler=swh_scheduler) - mocked_sleep = mocker.patch.object(lister.api_request.retry, "sleep") + _generate_responses(datadir, requests_mock) stats = lister.run() assert stats.pages == 3 assert stats.origins == 18 def test_golang_lister_incremental(swh_scheduler, requests_mock, datadir, mocker): # first listing, should return one origin per package lister = GolangLister(scheduler=swh_scheduler, incremental=True) mock = mocker.spy(lister, "get_single_page") responses = [ {"text": Path(datadir, "page-1.txt").read_text(), "status_code": 200}, ] requests_mock.get(GolangLister.GOLANG_MODULES_INDEX_URL, responses) stats = lister.run() page1_last_timestamp = datetime.datetime( 2019, 4, 11, 18, 47, 29, 390564, tzinfo=datetime.timezone.utc ) page2_last_timestamp = datetime.datetime( 2019, 4, 15, 13, 54, 35, 250835, tzinfo=datetime.timezone.utc ) page3_last_timestamp = datetime.datetime( 2019, 4, 18, 2, 7, 41, 336899, tzinfo=datetime.timezone.utc ) mock.assert_has_calls( [ # First call has no state mocker.call(since=None), # Second call is the last timestamp in the listed page mocker.call(since=page1_last_timestamp), ] ) assert lister.get_state_from_scheduler() == GolangStateType( last_seen=page1_last_timestamp ) assert stats.pages == 1 assert stats.origins == 5 # Incremental should list nothing lister = GolangLister(scheduler=swh_scheduler, incremental=True) mock = mocker.spy(lister, "get_single_page") stats = lister.run() mock.assert_has_calls([mocker.call(since=page1_last_timestamp)]) assert stats.pages == 0 assert stats.origins == 0 # Add more responses responses = [ {"text": Path(datadir, "page-2.txt").read_text(), "status_code": 200}, ] requests_mock.get(GolangLister.GOLANG_MODULES_INDEX_URL, responses) # Incremental should list new page lister = GolangLister(scheduler=swh_scheduler, incremental=True) mock = mocker.spy(lister, "get_single_page") stats = lister.run() mock.assert_has_calls( [ mocker.call(since=page1_last_timestamp), mocker.call(since=page2_last_timestamp), ] ) assert stats.pages == 1 assert stats.origins == 4 # Incremental should list nothing again lister = GolangLister(scheduler=swh_scheduler, incremental=True) mock = mocker.spy(lister, "get_single_page") stats = lister.run() assert stats.pages == 0 assert stats.origins == 0 mock.assert_has_calls([mocker.call(since=page2_last_timestamp)]) # Add yet more responses responses = [ {"text": Path(datadir, "page-3.txt").read_text(), "status_code": 200}, ] requests_mock.get(GolangLister.GOLANG_MODULES_INDEX_URL, responses) # Incremental should list new page again lister = GolangLister(scheduler=swh_scheduler, incremental=True) mock = mocker.spy(lister, "get_single_page") stats = lister.run() assert stats.pages == 1 assert stats.origins == 9 mock.assert_has_calls( [ mocker.call(since=page2_last_timestamp), mocker.call(since=page3_last_timestamp), ] ) # Incremental should list nothing one last time lister = GolangLister(scheduler=swh_scheduler, incremental=True) mock = mocker.spy(lister, "get_single_page") stats = lister.run() assert stats.pages == 0 assert stats.origins == 0 mock.assert_has_calls([mocker.call(since=page3_last_timestamp)]) diff --git a/swh/lister/maven/lister.py b/swh/lister/maven/lister.py index 563c0a7..b230552 100644 --- a/swh/lister/maven/lister.py +++ b/swh/lister/maven/lister.py @@ -1,428 +1,406 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import asdict, dataclass from datetime import datetime, timezone import logging import re from typing import Any, Dict, Iterator, Optional from urllib.parse import urljoin from bs4 import BeautifulSoup import lxml import requests -from tenacity.before_sleep import before_sleep_log from swh.core.github.utils import GitHubSession -from swh.lister.utils import http_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin from .. import USER_AGENT from ..pattern import CredentialsType, Lister logger = logging.getLogger(__name__) RepoPage = Dict[str, Any] SUPPORTED_SCM_TYPES = ("git", "svn", "hg", "cvs", "bzr") @dataclass class MavenListerState: """State of the MavenLister""" last_seen_doc: int = -1 """Last doc ID ingested during an incremental pass """ last_seen_pom: int = -1 """Last doc ID related to a pom and ingested during an incremental pass """ class MavenLister(Lister[MavenListerState, RepoPage]): """List origins from a Maven repository. Maven Central provides artifacts for Java builds. It includes POM files and source archives, which we download to get the source code of artifacts and links to their scm repository. This lister yields origins of types: git/svn/hg or whatever the Artifacts use as repository type, plus maven types for the maven loader (tgz, jar).""" LISTER_NAME = "maven" def __init__( self, scheduler: SchedulerInterface, url: str, index_url: str = None, instance: Optional[str] = None, credentials: CredentialsType = None, incremental: bool = True, ): """Lister class for Maven repositories. Args: url: main URL of the Maven repository, i.e. url of the base index used to fetch maven artifacts. For Maven central use https://repo1.maven.org/maven2/ index_url: the URL to download the exported text indexes from. Would typically be a local host running the export docker image. See README.md in this directory for more information. instance: Name of maven instance. Defaults to url's network location if unset. incremental: bool, defaults to True. Defines if incremental listing is activated or not. """ self.BASE_URL = url self.INDEX_URL = index_url self.incremental = incremental super().__init__( scheduler=scheduler, credentials=credentials, url=url, instance=instance, ) - self.session = requests.Session() - self.session.headers.update( - { - "Accept": "application/json", - "User-Agent": USER_AGENT, - } - ) + self.session.headers.update({"Accept": "application/json"}) self.jar_origins: Dict[str, ListedOrigin] = {} self.github_session = GitHubSession( credentials=self.credentials, user_agent=USER_AGENT ) def state_from_dict(self, d: Dict[str, Any]) -> MavenListerState: return MavenListerState(**d) def state_to_dict(self, state: MavenListerState) -> Dict[str, Any]: return asdict(state) - @http_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) - def page_request(self, url: str, params: Dict[str, Any]) -> requests.Response: - - logger.info("Fetching URL %s with params %s", url, params) - - response = self.session.get(url, params=params) - if response.status_code != 200: - logger.warning( - "Unexpected HTTP status code %s on %s: %s", - response.status_code, - response.url, - response.content, - ) - response.raise_for_status() - - return response - def get_pages(self) -> Iterator[RepoPage]: """Retrieve and parse exported maven indexes to identify all pom files and src archives. """ # Example of returned RepoPage's: # [ # { # "type": "maven", # "url": "https://maven.xwiki.org/..-5.4.2-sources.jar", # "time": 1626109619335, # "gid": "org.xwiki.platform", # "aid": "xwiki-platform-wikistream-events-xwiki", # "version": "5.4.2" # }, # { # "type": "scm", # "url": "scm:git:git://github.com/openengsb/openengsb-framework.git", # "project": "openengsb-framework", # }, # ... # ] # Download the main text index file. logger.info("Downloading computed index from %s.", self.INDEX_URL) assert self.INDEX_URL is not None - response = requests.get(self.INDEX_URL, stream=True) - if response.status_code != 200: + try: + response = self.http_request(self.INDEX_URL, stream=True) + except requests.HTTPError: logger.error("Index %s not found, stopping", self.INDEX_URL) - response.raise_for_status() + raise # Prepare regexes to parse index exports. # Parse doc id. # Example line: "doc 13" re_doc = re.compile(r"^doc (?P\d+)$") # Parse gid, aid, version, classifier, extension. # Example line: " value al.aldi|sprova4j|0.1.0|sources|jar" re_val = re.compile( r"^\s{4}value (?P[^|]+)\|(?P[^|]+)\|(?P[^|]+)\|" + r"(?P[^|]+)\|(?P[^|]+)$" ) # Parse last modification time. # Example line: " value jar|1626109619335|14316|2|2|0|jar" re_time = re.compile( r"^\s{4}value ([^|]+)\|(?P[^|]+)\|([^|]+)\|([^|]+)\|([^|]+)" + r"\|([^|]+)\|([^|]+)$" ) # Read file line by line and process it out_pom: Dict = {} jar_src: Dict = {} doc_id: int = 0 jar_src["doc"] = None url_src = None iterator = response.iter_lines(chunk_size=1024) for line_bytes in iterator: # Read the index text export and get URLs and SCMs. line = line_bytes.decode(errors="ignore") m_doc = re_doc.match(line) if m_doc is not None: doc_id = int(m_doc.group("doc")) # jar_src["doc"] contains the id of the current document, whatever # its type (scm or jar). jar_src["doc"] = doc_id else: m_val = re_val.match(line) if m_val is not None: (gid, aid, version, classifier, ext) = m_val.groups() ext = ext.strip() path = "/".join(gid.split(".")) if classifier == "NA" and ext.lower() == "pom": # If incremental mode, we don't record any line that is # before our last recorded doc id. if ( self.incremental and self.state and self.state.last_seen_pom and self.state.last_seen_pom >= doc_id ): continue url_path = f"{path}/{aid}/{version}/{aid}-{version}.{ext}" url_pom = urljoin( self.BASE_URL, url_path, ) out_pom[url_pom] = doc_id elif ( classifier.lower() == "sources" or ("src" in classifier) ) and ext.lower() in ("zip", "jar"): url_path = ( f"{path}/{aid}/{version}/{aid}-{version}-{classifier}.{ext}" ) url_src = urljoin(self.BASE_URL, url_path) jar_src["gid"] = gid jar_src["aid"] = aid jar_src["version"] = version else: m_time = re_time.match(line) if m_time is not None and url_src is not None: time = m_time.group("mtime") jar_src["time"] = int(time) artifact_metadata_d = { "type": "maven", "url": url_src, **jar_src, } logger.debug( "* Yielding jar %s: %s", url_src, artifact_metadata_d ) yield artifact_metadata_d url_src = None logger.info("Found %s poms.", len(out_pom)) # Now fetch pom files and scan them for scm info. logger.info("Fetching poms..") - for pom in out_pom: + for pom_url in out_pom: try: - response = self.page_request(pom, {}) + response = self.http_request(pom_url) parsed_pom = BeautifulSoup(response.content, "xml") project = parsed_pom.find("project") if project is None: continue scm = project.find("scm") if scm is not None: connection = scm.find("connection") if connection is not None: artifact_metadata_d = { "type": "scm", - "doc": out_pom[pom], + "doc": out_pom[pom_url], "url": connection.text, } - logger.debug("* Yielding pom %s: %s", pom, artifact_metadata_d) + logger.debug( + "* Yielding pom %s: %s", pom_url, artifact_metadata_d + ) yield artifact_metadata_d else: - logger.debug("No scm.connection in pom %s", pom) + logger.debug("No scm.connection in pom %s", pom_url) else: - logger.debug("No scm in pom %s", pom) + logger.debug("No scm in pom %s", pom_url) except requests.HTTPError: logger.warning( "POM info page could not be fetched, skipping project '%s'", - pom, + pom_url, ) except lxml.etree.Error as error: - logger.info("Could not parse POM %s XML: %s.", pom, error) + logger.info("Could not parse POM %s XML: %s.", pom_url, error) def get_scm(self, page: RepoPage) -> Optional[ListedOrigin]: """Retrieve scm origin out of the page information. Only called when type of the page is scm. Try and detect an scm/vcs repository. Note that official format is in the form: scm:{type}:git://example.org/{user}/{repo}.git but some projects directly put the repo url (without the "scm:type"), so we have to check against the content to extract the type and url properly. Raises AssertionError when the type of the page is not 'scm' Returns ListedOrigin with proper canonical scm url (for github) if any is found, None otherwise. """ assert page["type"] == "scm" visit_type: Optional[str] = None url: Optional[str] = None m_scm = re.match(r"^scm:(?P[^:]+):(?P.*)$", page["url"]) if m_scm is None: return None scm_type = m_scm.group("type") if scm_type and scm_type in SUPPORTED_SCM_TYPES: url = m_scm.group("url") visit_type = scm_type elif page["url"].endswith(".git"): url = page["url"].lstrip("scm:") visit_type = "git" else: return None if url and visit_type == "git": # Non-github urls will be returned as is, github ones will be canonical ones url = self.github_session.get_canonical_url(url) if not url: return None assert visit_type is not None assert self.lister_obj.id is not None return ListedOrigin( lister_id=self.lister_obj.id, url=url, visit_type=visit_type, ) def get_origins_from_page(self, page: RepoPage) -> Iterator[ListedOrigin]: """Convert a page of Maven repositories into a list of ListedOrigins.""" if page["type"] == "scm": listed_origin = self.get_scm(page) if listed_origin: yield listed_origin else: # Origin is gathering source archives: last_update_dt = None last_update_iso = "" try: last_update_seconds = str(page["time"])[:-3] last_update_dt = datetime.fromtimestamp(int(last_update_seconds)) last_update_dt = last_update_dt.astimezone(timezone.utc) except (OverflowError, ValueError): logger.warning("- Failed to convert datetime %s.", last_update_seconds) if last_update_dt: last_update_iso = last_update_dt.isoformat() # Origin URL will target page holding sources for all versions of # an artifactId (package name) inside a groupId (namespace) path = "/".join(page["gid"].split(".")) origin_url = urljoin(self.BASE_URL, f"{path}/{page['aid']}") artifact = { **{k: v for k, v in page.items() if k != "doc"}, "time": last_update_iso, "base_url": self.BASE_URL, } if origin_url not in self.jar_origins: # Create ListedOrigin instance if we did not see that origin yet assert self.lister_obj.id is not None jar_origin = ListedOrigin( lister_id=self.lister_obj.id, url=origin_url, visit_type=page["type"], last_update=last_update_dt, extra_loader_arguments={"artifacts": [artifact]}, ) self.jar_origins[origin_url] = jar_origin else: # Update list of source artifacts for that origin otherwise jar_origin = self.jar_origins[origin_url] artifacts = jar_origin.extra_loader_arguments["artifacts"] if artifact not in artifacts: artifacts.append(artifact) if ( jar_origin.last_update and last_update_dt and last_update_dt > jar_origin.last_update ): jar_origin.last_update = last_update_dt if not self.incremental or ( self.state and page["doc"] > self.state.last_seen_doc ): # Yield origin with updated source artifacts, multiple instances of # ListedOrigin for the same origin URL but with different artifacts # list will be sent to the scheduler but it will deduplicate them and # take the latest one to upsert in database yield jar_origin def commit_page(self, page: RepoPage) -> None: """Update currently stored state using the latest listed doc. Note: this is a noop for full listing mode """ if self.incremental and self.state: # We need to differentiate the two state counters according # to the type of origin. if page["type"] == "maven" and page["doc"] > self.state.last_seen_doc: self.state.last_seen_doc = page["doc"] elif page["type"] == "scm" and page["doc"] > self.state.last_seen_pom: self.state.last_seen_doc = page["doc"] self.state.last_seen_pom = page["doc"] def finalize(self) -> None: """Finalize the lister state, set update if any progress has been made. Note: this is a noop for full listing mode """ if self.incremental and self.state: last_seen_doc = self.state.last_seen_doc last_seen_pom = self.state.last_seen_pom scheduler_state = self.get_state_from_scheduler() if last_seen_doc and last_seen_pom: if (scheduler_state.last_seen_doc < last_seen_doc) or ( scheduler_state.last_seen_pom < last_seen_pom ): self.updated = True diff --git a/swh/lister/maven/tests/test_lister.py b/swh/lister/maven/tests/test_lister.py index 505afa3..368592a 100644 --- a/swh/lister/maven/tests/test_lister.py +++ b/swh/lister/maven/tests/test_lister.py @@ -1,384 +1,384 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from pathlib import Path import iso8601 import pytest import requests from swh.lister.maven.lister import MavenLister MVN_URL = "https://repo1.maven.org/maven2/" # main maven repo url INDEX_URL = "http://indexes/export.fld" # index directory url URL_POM_1 = MVN_URL + "al/aldi/sprova4j/0.1.0/sprova4j-0.1.0.pom" URL_POM_2 = MVN_URL + "al/aldi/sprova4j/0.1.1/sprova4j-0.1.1.pom" URL_POM_3 = MVN_URL + "com/arangodb/arangodb-graphql/1.2/arangodb-graphql-1.2.pom" USER_REPO0 = "aldialimucaj/sprova4j" GIT_REPO_URL0_HTTPS = f"https://github.com/{USER_REPO0}" GIT_REPO_URL0_API = f"https://api.github.com/repos/{USER_REPO0}" LIST_GIT = (GIT_REPO_URL0_HTTPS,) USER_REPO1 = "ArangoDB-Community/arangodb-graphql-java" GIT_REPO_URL1_HTTPS = f"https://github.com/{USER_REPO1}" GIT_REPO_URL1_GIT = f"git://github.com/{USER_REPO1}.git" GIT_REPO_URL1_API = f"https://api.github.com/repos/{USER_REPO1}" LIST_GIT_INCR = (GIT_REPO_URL1_HTTPS,) USER_REPO2 = "webx/citrus" GIT_REPO_URL2_HTTPS = f"https://github.com/{USER_REPO2}" GIT_REPO_URL2_API = f"https://api.github.com/repos/{USER_REPO2}" LIST_SRC = (MVN_URL + "al/aldi/sprova4j",) LIST_SRC_DATA = ( { "type": "maven", "url": "https://repo1.maven.org/maven2/al/aldi/sprova4j" + "/0.1.0/sprova4j-0.1.0-sources.jar", "time": "2021-07-12T17:06:59+00:00", "gid": "al.aldi", "aid": "sprova4j", "version": "0.1.0", "base_url": MVN_URL, }, { "type": "maven", "url": "https://repo1.maven.org/maven2/al/aldi/sprova4j" + "/0.1.1/sprova4j-0.1.1-sources.jar", "time": "2021-07-12T17:37:05+00:00", "gid": "al.aldi", "aid": "sprova4j", "version": "0.1.1", "base_url": MVN_URL, }, ) @pytest.fixture def maven_index_full(datadir) -> bytes: return Path(datadir, "http_indexes", "export_full.fld").read_bytes() @pytest.fixture def maven_index_incr_first(datadir) -> bytes: return Path(datadir, "http_indexes", "export_incr_first.fld").read_bytes() @pytest.fixture def maven_pom_1(datadir) -> bytes: return Path(datadir, "https_maven.org", "sprova4j-0.1.0.pom").read_bytes() @pytest.fixture def maven_index_null_mtime(datadir) -> bytes: return Path(datadir, "http_indexes", "export_null_mtime.fld").read_bytes() @pytest.fixture def maven_pom_1_malformed(datadir) -> bytes: return Path(datadir, "https_maven.org", "sprova4j-0.1.0.malformed.pom").read_bytes() @pytest.fixture def maven_pom_2(datadir) -> bytes: return Path(datadir, "https_maven.org", "sprova4j-0.1.1.pom").read_bytes() @pytest.fixture def maven_pom_3(datadir) -> bytes: return Path(datadir, "https_maven.org", "arangodb-graphql-1.2.pom").read_bytes() @pytest.fixture def maven_pom_multi_byte_encoding(datadir) -> bytes: return Path(datadir, "https_maven.org", "citrus-parent-3.0.7.pom").read_bytes() @pytest.fixture def requests_mock(requests_mock): """If github api calls for the configured scm repository, returns its canonical url.""" for url_api, url_html in [ (GIT_REPO_URL0_API, GIT_REPO_URL0_HTTPS), (GIT_REPO_URL1_API, GIT_REPO_URL1_HTTPS), (GIT_REPO_URL2_API, GIT_REPO_URL2_HTTPS), ]: requests_mock.get( url_api, json={"html_url": url_html}, ) yield requests_mock @pytest.fixture(autouse=True) def network_requests_mock( requests_mock, maven_index_full, maven_pom_1, maven_pom_2, maven_pom_3 ): requests_mock.get(INDEX_URL, content=maven_index_full) requests_mock.get(URL_POM_1, content=maven_pom_1) requests_mock.get(URL_POM_2, content=maven_pom_2) requests_mock.get(URL_POM_3, content=maven_pom_3) @pytest.fixture(autouse=True) def retry_sleep_mock(mocker): - mocker.patch.object(MavenLister.page_request.retry, "sleep") + mocker.patch.object(MavenLister.http_request.retry, "sleep") def test_maven_full_listing(swh_scheduler): """Covers full listing of multiple pages, checking page results and listed origins, statelessness.""" # Run the lister. lister = MavenLister( scheduler=swh_scheduler, url=MVN_URL, instance="maven.org", index_url=INDEX_URL, incremental=False, ) stats = lister.run() # Start test checks. assert stats.pages == 5 scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results origin_urls = [origin.url for origin in scheduler_origins] # 3 git origins + 1 maven origin with 2 releases (one per jar) assert len(origin_urls) == 3 assert sorted(origin_urls) == sorted(LIST_GIT + LIST_GIT_INCR + LIST_SRC) for origin in scheduler_origins: if origin.visit_type == "maven": for src in LIST_SRC_DATA: last_update_src = iso8601.parse_date(src["time"]) assert last_update_src <= origin.last_update assert origin.extra_loader_arguments["artifacts"] == list(LIST_SRC_DATA) scheduler_state = lister.get_state_from_scheduler() assert scheduler_state is not None assert scheduler_state.last_seen_doc == -1 assert scheduler_state.last_seen_pom == -1 def test_maven_full_listing_malformed( swh_scheduler, requests_mock, maven_pom_1_malformed, ): """Covers full listing of multiple pages, checking page results with a malformed scm entry in pom.""" lister = MavenLister( scheduler=swh_scheduler, url=MVN_URL, instance="maven.org", index_url=INDEX_URL, incremental=False, ) # Set up test. requests_mock.get(URL_POM_1, content=maven_pom_1_malformed) # Then run the lister. stats = lister.run() # Start test checks. assert stats.pages == 5 scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results origin_urls = [origin.url for origin in scheduler_origins] # 2 git origins + 1 maven origin with 2 releases (one per jar) assert len(origin_urls) == 3 assert sorted(origin_urls) == sorted(LIST_GIT + LIST_GIT_INCR + LIST_SRC) for origin in scheduler_origins: if origin.visit_type == "maven": for src in LIST_SRC_DATA: last_update_src = iso8601.parse_date(src["time"]) assert last_update_src <= origin.last_update assert origin.extra_loader_arguments["artifacts"] == list(LIST_SRC_DATA) scheduler_state = lister.get_state_from_scheduler() assert scheduler_state is not None assert scheduler_state.last_seen_doc == -1 assert scheduler_state.last_seen_pom == -1 def test_maven_incremental_listing( swh_scheduler, requests_mock, maven_index_full, maven_index_incr_first, ): """Covers full listing of multiple pages, checking page results and listed origins, with a second updated run for statefulness.""" lister = MavenLister( scheduler=swh_scheduler, url=MVN_URL, instance="maven.org", index_url=INDEX_URL, incremental=True, ) # Set up test. requests_mock.get(INDEX_URL, content=maven_index_incr_first) # Then run the lister. stats = lister.run() # Start test checks. assert lister.incremental assert lister.updated assert stats.pages == 2 scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results origin_urls = [origin.url for origin in scheduler_origins] # 1 git origins + 1 maven origin with 1 release (one per jar) assert len(origin_urls) == 2 assert sorted(origin_urls) == sorted(LIST_GIT + LIST_SRC) for origin in scheduler_origins: if origin.visit_type == "maven": last_update_src = iso8601.parse_date(LIST_SRC_DATA[0]["time"]) assert last_update_src == origin.last_update assert origin.extra_loader_arguments["artifacts"] == [LIST_SRC_DATA[0]] # Second execution of the lister, incremental mode lister = MavenLister( scheduler=swh_scheduler, url=MVN_URL, instance="maven.org", index_url=INDEX_URL, incremental=True, ) scheduler_state = lister.get_state_from_scheduler() assert scheduler_state is not None assert scheduler_state.last_seen_doc == 1 assert scheduler_state.last_seen_pom == 1 # Set up test. requests_mock.get(INDEX_URL, content=maven_index_full) # Then run the lister. stats = lister.run() # Start test checks. assert lister.incremental assert lister.updated assert stats.pages == 4 scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results origin_urls = [origin.url for origin in scheduler_origins] assert sorted(origin_urls) == sorted(LIST_SRC + LIST_GIT + LIST_GIT_INCR) for origin in scheduler_origins: if origin.visit_type == "maven": for src in LIST_SRC_DATA: last_update_src = iso8601.parse_date(src["time"]) assert last_update_src <= origin.last_update assert origin.extra_loader_arguments["artifacts"] == list(LIST_SRC_DATA) scheduler_state = lister.get_state_from_scheduler() assert scheduler_state is not None assert scheduler_state.last_seen_doc == 4 assert scheduler_state.last_seen_pom == 4 @pytest.mark.parametrize("http_code", [400, 404, 500, 502]) def test_maven_list_http_error_on_index_read(swh_scheduler, requests_mock, http_code): """should stop listing if the lister fails to retrieve the main index url.""" lister = MavenLister(scheduler=swh_scheduler, url=MVN_URL, index_url=INDEX_URL) requests_mock.get(INDEX_URL, status_code=http_code) with pytest.raises(requests.HTTPError): # listing cannot continues so stop lister.run() scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results assert len(scheduler_origins) == 0 @pytest.mark.parametrize("http_code", [400, 404, 500, 502]) def test_maven_list_http_error_artifacts( swh_scheduler, requests_mock, http_code, ): """should continue listing when failing to retrieve artifacts.""" # Test failure of artefacts retrieval. requests_mock.get(URL_POM_1, status_code=http_code) lister = MavenLister(scheduler=swh_scheduler, url=MVN_URL, index_url=INDEX_URL) # on artifacts though, that raises but continue listing lister.run() # If the maven_index_full step succeeded but not the get_pom step, # then we get only one maven-jar origin and one git origin. scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results assert len(scheduler_origins) == 2 def test_maven_lister_null_mtime(swh_scheduler, requests_mock, maven_index_null_mtime): requests_mock.get(INDEX_URL, content=maven_index_null_mtime) # Run the lister. lister = MavenLister( scheduler=swh_scheduler, url=MVN_URL, instance="maven.org", index_url=INDEX_URL, incremental=False, ) stats = lister.run() # Start test checks. assert stats.pages == 1 scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results assert len(scheduler_origins) == 1 assert scheduler_origins[0].last_update is None def test_maven_list_pom_bad_encoding(swh_scheduler, requests_mock, maven_pom_1): """should continue listing when failing to decode pom file.""" # Test failure of pom parsing by reencoding a UTF-8 pom file to a not expected one requests_mock.get(URL_POM_1, content=maven_pom_1.decode("utf-8").encode("utf-32")) lister = MavenLister(scheduler=swh_scheduler, url=MVN_URL, index_url=INDEX_URL) lister.run() # If the maven_index_full step succeeded but not the pom parsing step, # then we get only one maven-jar origin and one git origin. scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results assert len(scheduler_origins) == 2 def test_maven_list_pom_multi_byte_encoding( swh_scheduler, requests_mock, maven_pom_multi_byte_encoding ): """should parse POM file with multi-byte encoding.""" # replace pom file with a multi-byte encoding one requests_mock.get(URL_POM_1, content=maven_pom_multi_byte_encoding) lister = MavenLister(scheduler=swh_scheduler, url=MVN_URL, index_url=INDEX_URL) lister.run() scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results assert len(scheduler_origins) == 3 diff --git a/swh/lister/npm/lister.py b/swh/lister/npm/lister.py index 8d48873..b940699 100644 --- a/swh/lister/npm/lister.py +++ b/swh/lister/npm/lister.py @@ -1,190 +1,170 @@ -# Copyright (C) 2018-2021 the Software Heritage developers +# Copyright (C) 2018-2022 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import asdict, dataclass import logging from typing import Any, Dict, Iterator, List, Optional import iso8601 -import requests -from tenacity.before_sleep import before_sleep_log -from swh.lister import USER_AGENT from swh.lister.pattern import CredentialsType, Lister -from swh.lister.utils import http_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin logger = logging.getLogger(__name__) @dataclass class NpmListerState: """State of npm lister""" last_seq: Optional[int] = None class NpmLister(Lister[NpmListerState, List[Dict[str, Any]]]): """ List all packages hosted on the npm registry. The lister is based on the npm replication API powered by a CouchDB database (https://docs.couchdb.org/en/stable/api/database/). Args: scheduler: a scheduler instance page_size: number of packages info to return per page when querying npm API incremental: defines if incremental listing should be used, in that case only modified or new packages since last incremental listing operation will be returned, otherwise all packages will be listed in lexicographical order """ LISTER_NAME = "npm" INSTANCE = "npm" API_BASE_URL = "https://replicate.npmjs.com" API_INCREMENTAL_LISTING_URL = f"{API_BASE_URL}/_changes" API_FULL_LISTING_URL = f"{API_BASE_URL}/_all_docs" PACKAGE_URL_TEMPLATE = "https://www.npmjs.com/package/{package_name}" def __init__( self, scheduler: SchedulerInterface, page_size: int = 1000, incremental: bool = False, credentials: CredentialsType = None, ): super().__init__( scheduler=scheduler, credentials=credentials, url=self.API_INCREMENTAL_LISTING_URL if incremental else self.API_FULL_LISTING_URL, instance=self.INSTANCE, ) self.page_size = page_size if not incremental: # in full listing mode, first package in each page corresponds to the one # provided as the startkey query parameter value, so we increment the page # size by one to avoid double package processing self.page_size += 1 self.incremental = incremental - self.session = requests.Session() - self.session.headers.update( - {"Accept": "application/json", "User-Agent": USER_AGENT} - ) + self.session.headers.update({"Accept": "application/json"}) def state_from_dict(self, d: Dict[str, Any]) -> NpmListerState: return NpmListerState(**d) def state_to_dict(self, state: NpmListerState) -> Dict[str, Any]: return asdict(state) def request_params(self, last_package_id: str) -> Dict[str, Any]: # include package JSON document to get its last update date params = {"limit": self.page_size, "include_docs": "true"} if self.incremental: params["since"] = last_package_id else: params["startkey"] = last_package_id return params - @http_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) - def page_request(self, last_package_id: str) -> requests.Response: - params = self.request_params(last_package_id) - logger.debug("Fetching URL %s with params %s", self.url, params) - response = self.session.get(self.url, params=params) - if response.status_code != 200: - logger.warning( - "Unexpected HTTP status code %s on %s: %s", - response.status_code, - response.url, - response.content, - ) - response.raise_for_status() - return response - def get_pages(self) -> Iterator[List[Dict[str, Any]]]: last_package_id: str = "0" if self.incremental else '""' if ( self.incremental and self.state is not None and self.state.last_seq is not None ): last_package_id = str(self.state.last_seq) while True: - response = self.page_request(last_package_id) + response = self.http_request( + self.url, params=self.request_params(last_package_id) + ) data = response.json() page = data["results"] if self.incremental else data["rows"] if not page: break if self.incremental or len(page) < self.page_size: yield page else: yield page[:-1] if len(page) < self.page_size: break last_package_id = ( str(page[-1]["seq"]) if self.incremental else f'"{page[-1]["id"]}"' ) def get_origins_from_page( self, page: List[Dict[str, Any]] ) -> Iterator[ListedOrigin]: """Convert a page of Npm repositories into a list of ListedOrigin.""" assert self.lister_obj.id is not None for package in page: # no source code to archive here if not package["doc"].get("versions", {}): continue package_name = package["doc"]["name"] package_latest_version = ( package["doc"].get("dist-tags", {}).get("latest", "") ) last_update = None if package_latest_version in package["doc"].get("time", {}): last_update = iso8601.parse_date( package["doc"]["time"][package_latest_version] ) yield ListedOrigin( lister_id=self.lister_obj.id, url=self.PACKAGE_URL_TEMPLATE.format(package_name=package_name), visit_type="npm", last_update=last_update, ) def commit_page(self, page: List[Dict[str, Any]]): """Update the currently stored state using the latest listed page.""" if self.incremental: last_package = page[-1] last_seq = last_package["seq"] if self.state.last_seq is None or last_seq > self.state.last_seq: self.state.last_seq = last_seq def finalize(self): if self.incremental and self.state.last_seq is not None: scheduler_state = self.get_state_from_scheduler() if ( scheduler_state.last_seq is None or self.state.last_seq > scheduler_state.last_seq ): self.updated = True diff --git a/swh/lister/npm/tests/test_lister.py b/swh/lister/npm/tests/test_lister.py index 2d41b59..e8f8fa8 100644 --- a/swh/lister/npm/tests/test_lister.py +++ b/swh/lister/npm/tests/test_lister.py @@ -1,212 +1,218 @@ -# Copyright (C) 2018-2021 The Software Heritage developers +# Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from itertools import chain import json from pathlib import Path import iso8601 import pytest from requests.exceptions import HTTPError from swh.lister import USER_AGENT from swh.lister.npm.lister import NpmLister, NpmListerState @pytest.fixture def npm_full_listing_page1(datadir): return json.loads(Path(datadir, "npm_full_page1.json").read_text()) @pytest.fixture def npm_full_listing_page2(datadir): return json.loads(Path(datadir, "npm_full_page2.json").read_text()) @pytest.fixture def npm_incremental_listing_page1(datadir): return json.loads(Path(datadir, "npm_incremental_page1.json").read_text()) @pytest.fixture def npm_incremental_listing_page2(datadir): return json.loads(Path(datadir, "npm_incremental_page2.json").read_text()) @pytest.fixture(autouse=True) def retry_sleep_mock(mocker): - mocker.patch.object(NpmLister.page_request.retry, "sleep") + mocker.patch.object(NpmLister.http_request.retry, "sleep") def _check_listed_npm_packages(lister, packages, scheduler_origins): for package in packages: package_name = package["doc"]["name"] latest_version = package["doc"]["dist-tags"]["latest"] package_last_update = iso8601.parse_date(package["doc"]["time"][latest_version]) origin_url = lister.PACKAGE_URL_TEMPLATE.format(package_name=package_name) scheduler_origin = [o for o in scheduler_origins if o.url == origin_url] assert scheduler_origin assert scheduler_origin[0].last_update == package_last_update def _match_request(request): return request.headers.get("User-Agent") == USER_AGENT def _url_params(page_size, **kwargs): params = {"limit": page_size, "include_docs": "true"} params.update(**kwargs) return params def test_npm_lister_full( swh_scheduler, requests_mock, mocker, npm_full_listing_page1, npm_full_listing_page2 ): """Simulate a full listing of four npm packages in two pages""" page_size = 2 lister = NpmLister(scheduler=swh_scheduler, page_size=page_size, incremental=False) requests_mock.get( lister.API_FULL_LISTING_URL, [ {"json": npm_full_listing_page1}, {"json": npm_full_listing_page2}, ], additional_matcher=_match_request, ) - spy_get = mocker.spy(lister.session, "get") + spy_request = mocker.spy(lister.session, "request") stats = lister.run() assert stats.pages == 2 assert stats.origins == page_size * stats.pages - spy_get.assert_has_calls( + spy_request.assert_has_calls( [ mocker.call( + "GET", lister.API_FULL_LISTING_URL, params=_url_params(page_size + 1, startkey='""'), ), mocker.call( + "GET", lister.API_FULL_LISTING_URL, params=_url_params( page_size + 1, startkey=f'"{npm_full_listing_page1["rows"][-1]["id"]}"', ), ), ] ) scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results _check_listed_npm_packages( lister, chain(npm_full_listing_page1["rows"][:-1], npm_full_listing_page2["rows"]), scheduler_origins, ) assert lister.get_state_from_scheduler() == NpmListerState() def test_npm_lister_incremental( swh_scheduler, requests_mock, mocker, npm_incremental_listing_page1, npm_incremental_listing_page2, ): """Simulate an incremental listing of four npm packages in two pages""" page_size = 2 lister = NpmLister(scheduler=swh_scheduler, page_size=page_size, incremental=True) requests_mock.get( lister.API_INCREMENTAL_LISTING_URL, [ {"json": npm_incremental_listing_page1}, {"json": npm_incremental_listing_page2}, {"json": {"results": []}}, ], additional_matcher=_match_request, ) - spy_get = mocker.spy(lister.session, "get") + spy_request = mocker.spy(lister.session, "request") assert lister.get_state_from_scheduler() == NpmListerState() stats = lister.run() assert stats.pages == 2 assert stats.origins == page_size * stats.pages last_seq = npm_incremental_listing_page2["results"][-1]["seq"] - spy_get.assert_has_calls( + spy_request.assert_has_calls( [ mocker.call( + "GET", lister.API_INCREMENTAL_LISTING_URL, params=_url_params(page_size, since="0"), ), mocker.call( + "GET", lister.API_INCREMENTAL_LISTING_URL, params=_url_params( page_size, since=str(npm_incremental_listing_page1["results"][-1]["seq"]), ), ), mocker.call( + "GET", lister.API_INCREMENTAL_LISTING_URL, params=_url_params(page_size, since=str(last_seq)), ), ] ) scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results _check_listed_npm_packages( lister, chain( npm_incremental_listing_page1["results"], npm_incremental_listing_page2["results"], ), scheduler_origins, ) assert lister.get_state_from_scheduler() == NpmListerState(last_seq=last_seq) def test_npm_lister_incremental_restart( swh_scheduler, requests_mock, mocker, ): """Check incremental npm listing will restart from saved state""" page_size = 2 last_seq = 67 lister = NpmLister(scheduler=swh_scheduler, page_size=page_size, incremental=True) lister.state = NpmListerState(last_seq=last_seq) requests_mock.get(lister.API_INCREMENTAL_LISTING_URL, json={"results": []}) - spy_get = mocker.spy(lister.session, "get") + spy_request = mocker.spy(lister.session, "request") lister.run() - spy_get.assert_called_with( + spy_request.assert_called_with( + "GET", lister.API_INCREMENTAL_LISTING_URL, params=_url_params(page_size, since=str(last_seq)), ) def test_npm_lister_http_error( swh_scheduler, requests_mock, mocker, ): lister = NpmLister(scheduler=swh_scheduler) requests_mock.get(lister.API_FULL_LISTING_URL, status_code=500) with pytest.raises(HTTPError): lister.run() diff --git a/swh/lister/packagist/lister.py b/swh/lister/packagist/lister.py index 19b4721..30d12d9 100644 --- a/swh/lister/packagist/lister.py +++ b/swh/lister/packagist/lister.py @@ -1,184 +1,167 @@ -# Copyright (C) 2019-2021 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import dataclass from datetime import datetime, timezone import logging from typing import Any, Dict, Iterator, List, Optional import iso8601 import requests from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin -from .. import USER_AGENT from ..pattern import CredentialsType, Lister logger = logging.getLogger(__name__) PackagistPageType = List[str] @dataclass class PackagistListerState: """State of Packagist lister""" last_listing_date: Optional[datetime] = None """Last date when packagist lister was executed""" class PackagistLister(Lister[PackagistListerState, PackagistPageType]): """ List all Packagist projects and send associated origins to scheduler. The lister queries the Packagist API, whose documentation can be found at https://packagist.org/apidoc. For each package, its metadata are retrieved using Packagist API endpoints whose responses are served from static files, which are guaranteed to be efficient on the Packagist side (no dymamic queries). Furthermore, subsequent listing will send the "If-Modified-Since" HTTP header to only retrieve packages metadata updated since the previous listing operation in order to save bandwidth and return only origins which might have new released versions. """ LISTER_NAME = "Packagist" PACKAGIST_PACKAGES_LIST_URL = "https://packagist.org/packages/list.json" PACKAGIST_REPO_BASE_URL = "https://repo.packagist.org/p" def __init__( self, scheduler: SchedulerInterface, credentials: CredentialsType = None, ): super().__init__( scheduler=scheduler, url=self.PACKAGIST_PACKAGES_LIST_URL, instance="packagist", credentials=credentials, ) - self.session = requests.Session() - self.session.headers.update( - {"Accept": "application/json", "User-Agent": USER_AGENT} - ) + self.session.headers.update({"Accept": "application/json"}) self.listing_date = datetime.now().astimezone(tz=timezone.utc) def state_from_dict(self, d: Dict[str, Any]) -> PackagistListerState: last_listing_date = d.get("last_listing_date") if last_listing_date is not None: d["last_listing_date"] = iso8601.parse_date(last_listing_date) return PackagistListerState(**d) def state_to_dict(self, state: PackagistListerState) -> Dict[str, Any]: d: Dict[str, Optional[str]] = {"last_listing_date": None} last_listing_date = state.last_listing_date if last_listing_date is not None: d["last_listing_date"] = last_listing_date.isoformat() return d def api_request(self, url: str) -> Any: - logger.debug("Fetching URL %s", url) - - response = self.session.get(url) - - if response.status_code not in (200, 304): - logger.warning( - "Unexpected HTTP status code %s on %s: %s", - response.status_code, - response.url, - response.content, - ) - - response.raise_for_status() - + response = self.http_request(url) # response is empty when status code is 304 return response.json() if response.status_code == 200 else {} def get_pages(self) -> Iterator[PackagistPageType]: """ Yield a single page listing all Packagist projects. """ yield self.api_request(self.PACKAGIST_PACKAGES_LIST_URL)["packageNames"] def get_origins_from_page(self, page: PackagistPageType) -> Iterator[ListedOrigin]: """ Iterate on all Packagist projects and yield ListedOrigin instances. """ assert self.lister_obj.id is not None # save some bandwidth by only getting packages metadata updated since # last listing if self.state.last_listing_date is not None: if_modified_since = self.state.last_listing_date.strftime( "%a, %d %b %Y %H:%M:%S GMT" ) self.session.headers["If-Modified-Since"] = if_modified_since # to ensure origins will not be listed multiple times origin_urls = set() for package_name in page: try: metadata = self.api_request( f"{self.PACKAGIST_REPO_BASE_URL}/{package_name}.json" ) if not metadata.get("packages", {}): # package metadata not updated since last listing continue if package_name not in metadata["packages"]: # missing package metadata in response continue versions_info = metadata["packages"][package_name].values() - except requests.exceptions.HTTPError: + except requests.HTTPError: # error when getting package metadata (usually 404 when a # package has been removed), skip it and process next package continue origin_url = None visit_type = None last_update = None # extract origin url for package, vcs type and latest release date for version_info in versions_info: origin_url = version_info.get("source", {}).get("url", "") if not origin_url: continue # can be git, hg or svn visit_type = version_info.get("source", {}).get("type", "") dist_time_str = version_info.get("time", "") if not dist_time_str: continue dist_time = iso8601.parse_date(dist_time_str) if last_update is None or dist_time > last_update: last_update = dist_time # skip package with already seen origin url or with missing required info if visit_type is None or origin_url is None or origin_url in origin_urls: continue # bitbucket closed its mercurial hosting service, those origins can not be # loaded into the archive anymore if visit_type == "hg" and origin_url.startswith("https://bitbucket.org/"): continue origin_urls.add(origin_url) logger.debug( "Found package %s last updated on %s", package_name, last_update ) yield ListedOrigin( lister_id=self.lister_obj.id, url=origin_url, visit_type=visit_type, last_update=last_update, ) def finalize(self) -> None: self.state.last_listing_date = self.listing_date self.updated = True diff --git a/swh/lister/pattern.py b/swh/lister/pattern.py index 63a2fff..5b327e1 100644 --- a/swh/lister/pattern.py +++ b/swh/lister/pattern.py @@ -1,284 +1,314 @@ -# Copyright (C) 2020-2021 The Software Heritage developers +# Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from dataclasses import dataclass +import logging from typing import Any, Dict, Generic, Iterable, Iterator, List, Optional, TypeVar from urllib.parse import urlparse +import requests +from tenacity.before_sleep import before_sleep_log + from swh.core.config import load_from_envvar from swh.core.utils import grouper from swh.scheduler import get_scheduler, model from swh.scheduler.interface import SchedulerInterface +from . import USER_AGENT +from .utils import http_retry + +logger = logging.getLogger(__name__) + @dataclass class ListerStats: pages: int = 0 origins: int = 0 def __add__(self, other: ListerStats) -> ListerStats: return self.__class__(self.pages + other.pages, self.origins + other.origins) def __iadd__(self, other: ListerStats): self.pages += other.pages self.origins += other.origins def dict(self) -> Dict[str, int]: return {"pages": self.pages, "origins": self.origins} StateType = TypeVar("StateType") PageType = TypeVar("PageType") BackendStateType = Dict[str, Any] CredentialsType = Optional[Dict[str, Dict[str, List[Dict[str, str]]]]] class Lister(Generic[StateType, PageType]): """The base class for a Software Heritage lister. A lister scrapes a page by page list of origins from an upstream (a forge, the API of a package manager, ...), and massages the results of that scrape into a list of origins that are recorded by the scheduler backend. The main loop of the lister, :meth:`run`, basically revolves around the :meth:`get_pages` iterator, which sets up the lister state, then yields the scrape results page by page. The :meth:`get_origins_from_page` method converts the pages into a list of :class:`model.ListedOrigin`, sent to the scheduler at every page. The :meth:`commit_page` method can be used to update the lister state after a page of origins has been recorded in the scheduler backend. The :func:`finalize` method is called at lister teardown (whether the run has been successful or not) to update the local :attr:`state` object before it's sent to the database. This method must set the :attr:`updated` attribute if an updated state needs to be sent to the scheduler backend. This method can call :func:`get_state_from_scheduler` to refresh and merge the lister state from the scheduler before it's finalized (and potentially minimize the risk of race conditions between concurrent runs of the lister). The state of the lister is serialized and deserialized from the dict stored in the scheduler backend, using the :meth:`state_from_dict` and :meth:`state_to_dict` methods. Args: scheduler: the instance of the Scheduler being used to register the origins listed by this lister url: a URL representing this lister, e.g. the API's base URL instance: the instance name, to uniquely identify this lister instance, if not provided the URL network location will be used credentials: dictionary of credentials for all listers. The first level identifies the :attr:`LISTER_NAME`, the second level the lister :attr:`instance`. The final level is a list of dicts containing the expected credentials for the given instance of that lister. Generic types: - *StateType*: concrete lister type; should usually be a :class:`dataclass` for stricter typing - *PageType*: type of scrape results; can usually be a :class:`requests.Response`, or a :class:`dict` """ LISTER_NAME: str = "" def __init__( self, scheduler: SchedulerInterface, url: str, instance: Optional[str] = None, credentials: CredentialsType = None, ): if not self.LISTER_NAME: raise ValueError("Must set the LISTER_NAME attribute on Lister classes") self.url = url if instance is not None: self.instance = instance else: self.instance = urlparse(url).netloc self.scheduler = scheduler if not credentials: credentials = {} self.credentials = list( credentials.get(self.LISTER_NAME, {}).get(self.instance, []) ) # store the initial state of the lister self.state = self.get_state_from_scheduler() self.updated = False + self.session = requests.Session() + # Declare the USER_AGENT is more sysadm-friendly for the forge we list + self.session.headers.update({"User-Agent": USER_AGENT}) + + @http_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) + def http_request(self, url: str, method="GET", **kwargs) -> requests.Response: + + logger.debug("Fetching URL %s with params %s", url, kwargs.get("params")) + + response = self.session.request(method, url, **kwargs) + if response.status_code not in (200, 304): + logger.warning( + "Unexpected HTTP status code %s on %s: %s", + response.status_code, + response.url, + response.content, + ) + response.raise_for_status() + + return response + def run(self) -> ListerStats: """Run the lister. Returns: A counter with the number of pages and origins seen for this run of the lister. """ full_stats = ListerStats() try: for page in self.get_pages(): full_stats.pages += 1 origins = self.get_origins_from_page(page) full_stats.origins += self.send_origins(origins) self.commit_page(page) finally: self.finalize() if self.updated: self.set_state_in_scheduler() return full_stats def get_state_from_scheduler(self) -> StateType: """Update the state in the current instance from the state in the scheduler backend. This updates :attr:`lister_obj`, and returns its (deserialized) current state, to allow for comparison with the local state. Returns: the state retrieved from the scheduler backend """ self.lister_obj = self.scheduler.get_or_create_lister( name=self.LISTER_NAME, instance_name=self.instance ) return self.state_from_dict(self.lister_obj.current_state) def set_state_in_scheduler(self) -> None: """Update the state in the scheduler backend from the state of the current instance. Raises: swh.scheduler.exc.StaleData: in case of a race condition between concurrent listers (from :meth:`swh.scheduler.Scheduler.update_lister`). """ self.lister_obj.current_state = self.state_to_dict(self.state) self.lister_obj = self.scheduler.update_lister(self.lister_obj) # State management to/from the scheduler def state_from_dict(self, d: BackendStateType) -> StateType: """Convert the state stored in the scheduler backend (as a dict), to the concrete StateType for this lister.""" raise NotImplementedError def state_to_dict(self, state: StateType) -> BackendStateType: """Convert the StateType for this lister to its serialization as dict for storage in the scheduler. Values must be JSON-compatible as that's what the backend database expects. """ raise NotImplementedError def finalize(self) -> None: """Custom hook to finalize the lister state before returning from the main loop. This method must set :attr:`updated` if the lister has done some work. If relevant, this method can use :meth`get_state_from_scheduler` to merge the current lister state with the one from the scheduler backend, reducing the risk of race conditions if we're running concurrent listings. This method is called in a `finally` block, which means it will also run when the lister fails. """ pass # Actual listing logic def get_pages(self) -> Iterator[PageType]: """Retrieve a list of pages of listed results. This is the main loop of the lister. Returns: an iterator of raw pages fetched from the platform currently being listed. """ raise NotImplementedError def get_origins_from_page(self, page: PageType) -> Iterator[model.ListedOrigin]: """Extract a list of :class:`model.ListedOrigin` from a raw page of results. Args: page: a single page of results Returns: an iterator for the origins present on the given page of results """ raise NotImplementedError def commit_page(self, page: PageType) -> None: """Custom hook called after the current page has been committed in the scheduler backend. This method can be used to update the state after a page of origins has been successfully recorded in the scheduler backend. If the new state should be recorded at the point the lister completes, the :attr:`updated` attribute must be set. """ pass def send_origins(self, origins: Iterable[model.ListedOrigin]) -> int: """Record a list of :class:`model.ListedOrigin` in the scheduler. Returns: the number of listed origins recorded in the scheduler """ count = 0 for batch_origins in grouper(origins, n=1000): ret = self.scheduler.record_listed_origins(batch_origins) count += len(ret) return count @classmethod def from_config(cls, scheduler: Dict[str, Any], **config: Any): """Instantiate a lister from a configuration dict. This is basically a backwards-compatibility shim for the CLI. Args: scheduler: instantiation config for the scheduler config: the configuration dict for the lister, with the following keys: - credentials (optional): credentials list for the scheduler - any other kwargs passed to the lister. Returns: the instantiated lister """ # Drop the legacy config keys which aren't used for this generation of listers. for legacy_key in ("storage", "lister", "celery"): config.pop(legacy_key, None) # Instantiate the scheduler scheduler_instance = get_scheduler(**scheduler) return cls(scheduler=scheduler_instance, **config) @classmethod def from_configfile(cls, **kwargs: Any): """Instantiate a lister from the configuration loaded from the SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their value is not None. Args: kwargs: kwargs passed to the lister instantiation """ config = dict(load_from_envvar()) config.update({k: v for k, v in kwargs.items() if v is not None}) return cls.from_config(**config) class StatelessLister(Lister[None, PageType], Generic[PageType]): def state_from_dict(self, d: BackendStateType) -> None: """Always return empty state""" return None def state_to_dict(self, state: None) -> BackendStateType: """Always set empty state""" return {} diff --git a/swh/lister/phabricator/lister.py b/swh/lister/phabricator/lister.py index 83ddc31..4556178 100644 --- a/swh/lister/phabricator/lister.py +++ b/swh/lister/phabricator/lister.py @@ -1,185 +1,165 @@ -# Copyright (C) 2019-2021 the Software Heritage developers +# Copyright (C) 2019-2022 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information + from collections import defaultdict import logging import random from typing import Any, Dict, Iterator, List, Optional from urllib.parse import urljoin -import requests - -from swh.lister import USER_AGENT from swh.lister.pattern import CredentialsType, StatelessLister from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin logger = logging.getLogger(__name__) PageType = List[Dict[str, Any]] class PhabricatorLister(StatelessLister[PageType]): """ List all repositories hosted on a Phabricator instance. Args: url: base URL of a phabricator forge (for instance https://forge.softwareheritage.org) instance: string identifier for the listed forge, URL network location will be used if not provided api_token: authentication token for Conduit API """ LISTER_NAME = "phabricator" API_REPOSITORY_PATH = "/api/diffusion.repository.search" def __init__( self, scheduler: SchedulerInterface, url: str, instance: Optional[str] = None, api_token: Optional[str] = None, credentials: CredentialsType = None, ): super().__init__( scheduler, urljoin(url, self.API_REPOSITORY_PATH), instance, credentials ) - self.session = requests.Session() - self.session.headers.update( - {"Accept": "application/json", "User-Agent": USER_AGENT} - ) + self.session.headers.update({"Accept": "application/json"}) if api_token is not None: self.api_token = api_token else: if not self.credentials: raise ValueError( f"No credentials found for phabricator instance {self.instance};" " Please set them in the lister configuration file." ) self.api_token = random.choice(self.credentials)["password"] def get_request_params(self, after: Optional[str]) -> Dict[str, str]: """Get the query parameters for the request.""" base_params = { # Stable order "order": "oldest", # Add all URIs to the response "attachments[uris]": "1", # API token from stored credentials "api.token": self.api_token, } if after is not None: base_params["after"] = after return base_params @staticmethod def filter_params(params: Dict[str, str]) -> Dict[str, str]: """Filter the parameters for debug purposes""" return { k: (v if k != "api.token" else "**redacted**") for k, v in params.items() } def get_pages(self) -> Iterator[PageType]: after: Optional[str] = None while True: params = self.get_request_params(after) - logger.debug( - "Retrieving results on URI %s with parameters %s", - self.url, - self.filter_params(params), - ) - response = self.session.post(self.url, data=params) - - if response.status_code != 200: - logger.warning( - "Unexpected HTTP status code %s on %s: %s", - response.status_code, - response.url, - response.content, - ) - - response.raise_for_status() + response = self.http_request(self.url, method="POST", data=params) response_data = response.json() if response_data.get("result") is None: logger.warning( "Got unexpected response on %s: %s", response.url, response_data, ) break result = response_data["result"] yield result["data"] after = None if "cursor" in result and "after" in result["cursor"]: after = result["cursor"]["after"] if not after: logger.debug("Empty `after` cursor. All done") break def get_origins_from_page(self, page: PageType) -> Iterator[ListedOrigin]: assert self.lister_obj.id is not None for repo in page: url = get_repo_url(repo["attachments"]["uris"]["uris"]) if url is None: short_name: Optional[str] = None for field in "shortName", "name", "callsign": short_name = repo["fields"].get(field) if short_name: break logger.warning( "No valid url for repository [%s] (phid=%s)", short_name or repo["phid"], repo["phid"], ) continue yield ListedOrigin( lister_id=self.lister_obj.id, url=url, visit_type=repo["fields"]["vcs"], # The "dateUpdated" field returned by the Phabricator API only refers to # the repository metadata; We can't use it for our purposes. last_update=None, ) def get_repo_url(attachments: List[Dict[str, Any]]) -> Optional[str]: """ Return url for a hosted repository from its uris attachments according to the following priority lists: * protocol: https > http * identifier: shortname > callsign > id """ processed_urls = defaultdict(dict) # type: Dict[str, Any] for uri in attachments: protocol = uri["fields"]["builtin"]["protocol"] url = uri["fields"]["uri"]["effective"] identifier = uri["fields"]["builtin"]["identifier"] if protocol in ("http", "https"): processed_urls[protocol][identifier] = url elif protocol is None: for protocol in ("https", "http"): if url.startswith(protocol): processed_urls[protocol]["undefined"] = url break for protocol in ["https", "http"]: for identifier in ["shortname", "callsign", "id", "undefined"]: if protocol in processed_urls and identifier in processed_urls[protocol]: return processed_urls[protocol][identifier] return None diff --git a/swh/lister/phabricator/tests/test_lister.py b/swh/lister/phabricator/tests/test_lister.py index a638c40..ed35435 100644 --- a/swh/lister/phabricator/tests/test_lister.py +++ b/swh/lister/phabricator/tests/test_lister.py @@ -1,137 +1,142 @@ -# Copyright (C) 2019-2021 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from pathlib import Path import pytest from requests.exceptions import HTTPError from swh.lister import USER_AGENT from swh.lister.phabricator.lister import PhabricatorLister, get_repo_url @pytest.fixture def phabricator_repositories_page1(datadir): return json.loads( Path(datadir, "phabricator_api_repositories_page1.json").read_text() ) @pytest.fixture def phabricator_repositories_page2(datadir): return json.loads( Path(datadir, "phabricator_api_repositories_page2.json").read_text() ) +@pytest.fixture(autouse=True) +def retry_sleep_mock(mocker): + mocker.patch.object(PhabricatorLister.http_request.retry, "sleep") + + def test_get_repo_url(phabricator_repositories_page1): repos = phabricator_repositories_page1["result"]["data"] for repo in repos: expected_name = "https://forge.softwareheritage.org/source/%s.git" % ( repo["fields"]["shortName"] ) assert get_repo_url(repo["attachments"]["uris"]["uris"]) == expected_name def test_get_repo_url_undefined_protocol(): undefined_protocol_uris = [ { "fields": { "uri": { "raw": "https://svn.blender.org/svnroot/bf-blender/", "display": "https://svn.blender.org/svnroot/bf-blender/", "effective": "https://svn.blender.org/svnroot/bf-blender/", "normalized": "svn.blender.org/svnroot/bf-blender", }, "builtin": {"protocol": None, "identifier": None}, }, } ] expected_name = "https://svn.blender.org/svnroot/bf-blender/" assert get_repo_url(undefined_protocol_uris) == expected_name def test_lister_url_param(swh_scheduler): FORGE_BASE_URL = "https://forge.softwareheritage.org" API_REPOSITORY_PATH = "/api/diffusion.repository.search" for url in ( FORGE_BASE_URL, f"{FORGE_BASE_URL}/", f"{FORGE_BASE_URL}/{API_REPOSITORY_PATH}", f"{FORGE_BASE_URL}/{API_REPOSITORY_PATH}/", ): lister = PhabricatorLister( scheduler=swh_scheduler, url=FORGE_BASE_URL, instance="swh", api_token="foo" ) expected_url = f"{FORGE_BASE_URL}{API_REPOSITORY_PATH}" assert lister.url == expected_url def test_lister( swh_scheduler, requests_mock, phabricator_repositories_page1, phabricator_repositories_page2, ): FORGE_BASE_URL = "https://forge.softwareheritage.org" API_TOKEN = "foo" lister = PhabricatorLister( scheduler=swh_scheduler, url=FORGE_BASE_URL, instance="swh", api_token=API_TOKEN ) def match_request(request): return ( request.headers.get("User-Agent") == USER_AGENT and f"api.token={API_TOKEN}" in request.body ) requests_mock.post( f"{FORGE_BASE_URL}{lister.API_REPOSITORY_PATH}", [ {"json": phabricator_repositories_page1}, {"json": phabricator_repositories_page2}, ], additional_matcher=match_request, ) stats = lister.run() expected_nb_origins = len(phabricator_repositories_page1["result"]["data"]) * 2 assert stats.pages == 2 assert stats.origins == expected_nb_origins scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results assert len(scheduler_origins) == expected_nb_origins def test_lister_request_error( swh_scheduler, requests_mock, phabricator_repositories_page1, ): FORGE_BASE_URL = "https://forge.softwareheritage.org" lister = PhabricatorLister( scheduler=swh_scheduler, url=FORGE_BASE_URL, instance="swh", api_token="foo" ) requests_mock.post( f"{FORGE_BASE_URL}{lister.API_REPOSITORY_PATH}", [ {"status_code": 200, "json": phabricator_repositories_page1}, {"status_code": 500, "reason": "Internal Server Error"}, ], ) with pytest.raises(HTTPError): lister.run() diff --git a/swh/lister/pubdev/lister.py b/swh/lister/pubdev/lister.py index 77cb4de..8910f39 100644 --- a/swh/lister/pubdev/lister.py +++ b/swh/lister/pubdev/lister.py @@ -1,125 +1,106 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information + import logging -from typing import Any, Dict, Iterator, List, Optional +from typing import Iterator, List, Optional import iso8601 -import requests from requests.exceptions import HTTPError -from tenacity.before_sleep import before_sleep_log -from swh.lister.utils import http_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin from .. import __version__ from ..pattern import CredentialsType, StatelessLister # https://github.com/dart-lang/pub/blob/master/doc/repository-spec-v2.md#metadata-headers USER_AGENT = ( f"Software Heritage PubDev Lister v{__version__} " "(+https://www.softwareheritage.org/contact)" ) logger = logging.getLogger(__name__) # Aliasing the page results returned by `get_pages` method from the lister. PubDevListerPage = List[str] class PubDevLister(StatelessLister[PubDevListerPage]): """List pub.dev (Dart, Flutter) origins.""" LISTER_NAME = "pubdev" VISIT_TYPE = "pubdev" INSTANCE = "pubdev" BASE_URL = "https://pub.dev/" PACKAGE_NAMES_URL_PATTERN = "{base_url}api/package-names" PACKAGE_INFO_URL_PATTERN = "{base_url}api/packages/{pkgname}" ORIGIN_URL_PATTERN = "{base_url}packages/{pkgname}" def __init__( self, scheduler: SchedulerInterface, credentials: Optional[CredentialsType] = None, ): super().__init__( scheduler=scheduler, credentials=credentials, instance=self.INSTANCE, url=self.BASE_URL, ) - self.session = requests.Session() + self.session.headers.update( { "Accept": "application/json", "User-Agent": USER_AGENT, } ) - @http_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) - def page_request(self, url: str, params: Dict[str, Any]) -> requests.Response: - - logger.debug("Fetching URL %s with params %s", url, params) - - response = self.session.get(url, params=params) - if response.status_code != 200: - logger.warning( - "Unexpected HTTP status code %s on %s: %s", - response.status_code, - response.url, - response.content, - ) - response.raise_for_status() - - return response - def get_pages(self) -> Iterator[PubDevListerPage]: """Yield an iterator which returns 'page' It uses the api provided by https://pub.dev/api/ to find Dart and Flutter package origins. The http api call get "{base_url}package-names" to retrieve a sorted list of all package names. There is only one page that list all origins url based on "{base_url}packages/{pkgname}" """ - response = self.page_request( - url=self.PACKAGE_NAMES_URL_PATTERN.format(base_url=self.url), params={} + response = self.http_request( + url=self.PACKAGE_NAMES_URL_PATTERN.format(base_url=self.url) ) yield response.json()["packages"] def get_origins_from_page(self, page: PubDevListerPage) -> Iterator[ListedOrigin]: """Iterate on all pages and yield ListedOrigin instances.""" assert self.lister_obj.id is not None for pkgname in page: package_info_url = self.PACKAGE_INFO_URL_PATTERN.format( base_url=self.url, pkgname=pkgname ) try: - response = self.page_request(url=package_info_url, params={}) + response = self.http_request(url=package_info_url) except HTTPError: logger.warning( "Failed to fetch metadata for package %s, skipping it from listing.", pkgname, ) continue package_metadata = response.json() package_versions = package_metadata["versions"] last_published = max( package_version["published"] for package_version in package_versions ) origin_url = self.ORIGIN_URL_PATTERN.format( base_url=self.url, pkgname=pkgname ) yield ListedOrigin( lister_id=self.lister_obj.id, visit_type=self.VISIT_TYPE, url=origin_url, last_update=iso8601.parse_date(last_published), ) diff --git a/swh/lister/sourceforge/lister.py b/swh/lister/sourceforge/lister.py index 8bc56ee..ba8c412 100644 --- a/swh/lister/sourceforge/lister.py +++ b/swh/lister/sourceforge/lister.py @@ -1,456 +1,430 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import dataclass, field import datetime from enum import Enum import logging import re from typing import Any, Dict, Iterator, List, Optional, Set, Tuple from xml.etree import ElementTree from bs4 import BeautifulSoup import iso8601 import lxml import requests -from tenacity.before_sleep import before_sleep_log from swh.core.api.classes import stream_results -from swh.lister.utils import http_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin -from .. import USER_AGENT from ..pattern import CredentialsType, Lister logger = logging.getLogger(__name__) class VcsNames(Enum): """Used to filter SourceForge tool names for valid VCS types""" # CVS projects are read-only CVS = "cvs" GIT = "git" SUBVERSION = "svn" MERCURIAL = "hg" BAZAAR = "bzr" VCS_NAMES = set(v.value for v in VcsNames.__members__.values()) @dataclass class SourceForgeListerEntry: vcs: VcsNames url: str last_modified: datetime.date SubSitemapNameT = str ProjectNameT = str # SourceForge only offers day-level granularity, which is good enough for our purposes LastModifiedT = datetime.date @dataclass class SourceForgeListerState: """Current state of the SourceForge lister in incremental runs""" """If the subsitemap does not exist, we assume a full run of this subsitemap is needed. If the date is the same, we skip the subsitemap, otherwise we request the subsitemap and look up every project's "last modified" date to compare against `ListedOrigins` from the database.""" subsitemap_last_modified: Dict[SubSitemapNameT, LastModifiedT] = field( default_factory=dict ) """Some projects (not the majority, but still meaningful) have no VCS for us to archive. We need to remember a mapping of their API URL to their "last modified" date so we don't keep querying them needlessly every time.""" empty_projects: Dict[str, LastModifiedT] = field(default_factory=dict) SourceForgeListerPage = List[SourceForgeListerEntry] MAIN_SITEMAP_URL = "https://sourceforge.net/allura_sitemap/sitemap.xml" SITEMAP_XML_NAMESPACE = "{http://www.sitemaps.org/schemas/sitemap/0.9}" # API resource endpoint for information about the given project. # # `namespace`: Project namespace. Very often `p`, but can be something else like # `adobe` # `project`: Project name, e.g. `seedai`. Can be a subproject, e.g `backapps/website`. PROJECT_API_URL_FORMAT = "https://sourceforge.net/rest/{namespace}/{project}" # Predictable URL for cloning (in the broad sense) a VCS registered for the project. # # Warning: does not apply to bzr repos, and Mercurial are http only, see use of this # constant below. # # `vcs`: VCS type, one of `VCS_NAMES` # `namespace`: Project namespace. Very often `p`, but can be something else like # `adobe`. # `project`: Project name, e.g. `seedai`. Can be a subproject, e.g `backapps/website`. # `mount_point`: url path used by the repo. For example, the Code::Blocks project uses # `git` (https://git.code.sf.net/p/codeblocks/git). CLONE_URL_FORMAT = "https://{vcs}.code.sf.net/{namespace}/{project}/{mount_point}" PROJ_URL_RE = re.compile( r"^https://sourceforge.net/(?P[^/]+)/(?P[^/]+)/(?P.*)?" ) # Mapping of `(namespace, project name)` to `last modified` date. ProjectsLastModifiedCache = Dict[Tuple[str, str], LastModifiedT] class SourceForgeLister(Lister[SourceForgeListerState, SourceForgeListerPage]): """List origins from the "SourceForge" forge.""" # Part of the lister API, that identifies this lister LISTER_NAME = "sourceforge" def __init__( self, scheduler: SchedulerInterface, incremental: bool = False, credentials: Optional[CredentialsType] = None, ): super().__init__( scheduler=scheduler, url="https://sourceforge.net", instance="main", credentials=credentials, ) # Will hold the currently saved "last modified" dates to compare against our # requests. self._project_last_modified: Optional[ProjectsLastModifiedCache] = None - self.session = requests.Session() - # Declare the USER_AGENT is more sysadm-friendly for the forge we list - self.session.headers.update( - {"Accept": "application/json", "User-Agent": USER_AGENT} - ) + + self.session.headers.update({"Accept": "application/json"}) self.incremental = incremental def state_from_dict(self, d: Dict[str, Dict[str, Any]]) -> SourceForgeListerState: subsitemaps = { k: datetime.date.fromisoformat(v) for k, v in d.get("subsitemap_last_modified", {}).items() } empty_projects = { k: datetime.date.fromisoformat(v) for k, v in d.get("empty_projects", {}).items() } return SourceForgeListerState( subsitemap_last_modified=subsitemaps, empty_projects=empty_projects ) def state_to_dict(self, state: SourceForgeListerState) -> Dict[str, Any]: return { "subsitemap_last_modified": { k: v.isoformat() for k, v in state.subsitemap_last_modified.items() }, "empty_projects": { k: v.isoformat() for k, v in state.empty_projects.items() }, } def projects_last_modified(self) -> ProjectsLastModifiedCache: if not self.incremental: # No point in loading the previous results if we're doing a full run return {} if self._project_last_modified is not None: return self._project_last_modified # We know there will be at least that many origins stream = stream_results( self.scheduler.get_listed_origins, self.lister_obj.id, limit=300_000 ) listed_origins = dict() # Projects can have slashes in them if they're subprojects, but the # mointpoint (last component) cannot. url_match = re.compile( r".*\.code\.sf\.net/(?P[^/]+)/(?P.+)/.*" ) bzr_url_match = re.compile( r"http://(?P[^/]+).bzr.sourceforge.net/bzr/([^/]+)" ) cvs_url_match = re.compile( r"rsync://a.cvs.sourceforge.net/cvsroot/(?P.+)/([^/]+)" ) for origin in stream: url = origin.url match = url_match.match(url) if match is None: # Could be a bzr or cvs special endpoint bzr_match = bzr_url_match.match(url) cvs_match = cvs_url_match.match(url) matches = None if bzr_match is not None: matches = bzr_match.groupdict() elif cvs_match is not None: matches = cvs_match.groupdict() assert matches project = matches["project"] namespace = "p" # no special namespacing for bzr and cvs projects else: matches = match.groupdict() namespace = matches["namespace"] project = matches["project"] # "Last modified" dates are the same across all VCS (tools, even) # within a project or subproject. An assertion here would be overkill. last_modified = origin.last_update assert last_modified is not None listed_origins[(namespace, project)] = last_modified.date() self._project_last_modified = listed_origins return listed_origins - @http_retry( - before_sleep=before_sleep_log(logger, logging.WARNING), - ) - def page_request(self, url, params) -> requests.Response: - # Log listed URL to ease debugging - logger.debug("Fetching URL %s with params %s", url, params) - response = self.session.get(url, params=params) - - if response.status_code != 200: - # Log response content to ease debugging - logger.warning( - "Unexpected HTTP status code %s for URL %s", - response.status_code, - response.url, - ) - # The lister must fail on blocking errors - response.raise_for_status() - - return response - def get_pages(self) -> Iterator[SourceForgeListerPage]: """ SourceForge has a main XML sitemap that lists its sharded sitemaps for all projects. Each XML sub-sitemap lists project pages, which are not unique per project: a project can have a wiki, a home, a git, an svn, etc. For each unique project, we query an API endpoint that lists (among other things) the tools associated with said project, some of which are the VCS used. Subprojects are considered separate projects. Lastly we use the information of which VCS are used to build the predictable clone URL for any given VCS. """ - sitemap_contents = self.page_request(MAIN_SITEMAP_URL, {}).text + sitemap_contents = self.http_request(MAIN_SITEMAP_URL).text tree = ElementTree.fromstring(sitemap_contents) for subsitemap in tree.iterfind(f"{SITEMAP_XML_NAMESPACE}sitemap"): last_modified_el = subsitemap.find(f"{SITEMAP_XML_NAMESPACE}lastmod") assert last_modified_el is not None and last_modified_el.text is not None last_modified = datetime.date.fromisoformat(last_modified_el.text) location = subsitemap.find(f"{SITEMAP_XML_NAMESPACE}loc") assert location is not None and location.text is not None sub_url = location.text if self.incremental: recorded_last_mod = self.state.subsitemap_last_modified.get(sub_url) if recorded_last_mod == last_modified: # The entire subsitemap hasn't changed, so none of its projects # have either, skip it. continue self.state.subsitemap_last_modified[sub_url] = last_modified - subsitemap_contents = self.page_request(sub_url, {}).text + subsitemap_contents = self.http_request(sub_url).text subtree = ElementTree.fromstring(subsitemap_contents) yield from self._get_pages_from_subsitemap(subtree) def get_origins_from_page( self, page: SourceForgeListerPage ) -> Iterator[ListedOrigin]: assert self.lister_obj.id is not None for hit in page: last_modified: str = str(hit.last_modified) last_update: datetime.datetime = iso8601.parse_date(last_modified) yield ListedOrigin( lister_id=self.lister_obj.id, visit_type=hit.vcs.value, url=hit.url, last_update=last_update, ) def _get_pages_from_subsitemap( self, subtree: ElementTree.Element ) -> Iterator[SourceForgeListerPage]: projects: Set[ProjectNameT] = set() for project_block in subtree.iterfind(f"{SITEMAP_XML_NAMESPACE}url"): last_modified_block = project_block.find(f"{SITEMAP_XML_NAMESPACE}lastmod") assert last_modified_block is not None last_modified = last_modified_block.text location = project_block.find(f"{SITEMAP_XML_NAMESPACE}loc") assert location is not None project_url = location.text assert project_url is not None match = PROJ_URL_RE.match(project_url) if match: matches = match.groupdict() namespace = matches["namespace"] if namespace == "projects": # These have a `p`-namespaced counterpart, use that instead continue project = matches["project"] rest = matches["rest"] if rest.count("/") > 1: # This is a subproject. There exists no sub-subprojects. subproject_name = rest.rsplit("/", 2)[0] project = f"{project}/{subproject_name}" prev_len = len(projects) projects.add(project) if prev_len == len(projects): # Already seen continue pages = self._get_pages_for_project(namespace, project, last_modified) if pages: yield pages else: logger.debug("Project '%s' does not have any VCS", project) else: # Should almost always match, let's log it # The only ones that don't match are mostly specialized one-off URLs. msg = "Project URL '%s' does not match expected pattern" logger.warning(msg, project_url) def _get_pages_for_project( self, namespace, project, last_modified ) -> SourceForgeListerPage: endpoint = PROJECT_API_URL_FORMAT.format(namespace=namespace, project=project) empty_project_last_modified = self.state.empty_projects.get(endpoint) if empty_project_last_modified is not None: if last_modified == empty_project_last_modified.isoformat(): # Project has not changed, so is still empty, meaning it has # no VCS attached that we can archive. logger.debug(f"Project {namespace}/{project} is still empty") return [] if self.incremental: expected = self.projects_last_modified().get((namespace, project)) if expected is not None: if expected.isoformat() == last_modified: # Project has not changed logger.debug(f"Project {namespace}/{project} has not changed") return [] else: logger.debug(f"Project {namespace}/{project} was updated") else: msg = "New project during an incremental run: %s/%s" logger.debug(msg, namespace, project) try: - res = self.page_request(endpoint, {}).json() + res = self.http_request(endpoint).json() except requests.HTTPError: - # We've already logged in `page_request` + # We've already logged in `http_request` return [] tools = res.get("tools") if tools is None: # This rarely happens, on very old URLs logger.warning("Project '%s' does not have any tools", endpoint) return [] hits = [] for tool in tools: tool_name = tool["name"] if tool_name not in VCS_NAMES: continue if tool_name == VcsNames.CVS.value: # CVS projects are different from other VCS ones, they use the rsync # protocol, a list of modules needs to be fetched from an info page # and multiple origin URLs can be produced for a same project. cvs_info_url = f"http://{project}.cvs.sourceforge.net" try: - response = self.page_request(cvs_info_url, params={}) + response = self.http_request(cvs_info_url) except requests.HTTPError: logger.warning( "CVS info page could not be fetched, skipping project '%s'", project, ) continue else: bs = BeautifulSoup(response.text, features="html.parser") cvs_base_url = "rsync://a.cvs.sourceforge.net/cvsroot" for text in [b.text for b in bs.find_all("b")]: match = re.search(rf".*/cvsroot/{project} co -P (.+)", text) if match is not None: module = match.group(1) if module != "Attic": url = f"{cvs_base_url}/{project}/{module}" hits.append( SourceForgeListerEntry( vcs=VcsNames(tool_name), url=url, last_modified=last_modified, ) ) continue url = CLONE_URL_FORMAT.format( vcs=tool_name, namespace=namespace, project=project, mount_point=tool["mount_point"], ) if tool_name == VcsNames.MERCURIAL.value: # SourceForge does not yet support anonymous HTTPS cloning for Mercurial # See https://sourceforge.net/p/forge/feature-requests/727/ url = url.replace("https://", "http://") if tool_name == VcsNames.BAZAAR.value: # SourceForge has removed support for bzr and only keeps legacy projects # around at a separate (also not https) URL. Bzr projects are very rare # and a lot of them are 404 now. url = f"http://{project}.bzr.sourceforge.net/bzr/{project}" try: - response = self.page_request(url, params={}) + response = self.http_request(url) if "To get this branch, use:" not in response.text: # If a bzr project has multiple branches, we need to extract their # names from the repository landing page and create one listed origin # per branch parser = lxml.etree.HTMLParser() tree = lxml.etree.fromstring(response.text, parser) # Get all tds with class 'autcell' tds = tree.xpath(".//td[contains(@class, 'autcell')]") for td in tds: branch = td.findtext("a") # If the td's parent contains Branch and # it has non-empty text: if td.xpath("..//img[@alt='Branch']") and branch: hits.append( SourceForgeListerEntry( vcs=VcsNames(tool_name), url=f"{url}/{branch}", last_modified=last_modified, ) ) continue except requests.HTTPError: logger.warning( "Bazaar repository page could not be fetched, skipping project '%s'", project, ) continue entry = SourceForgeListerEntry( vcs=VcsNames(tool_name), url=url, last_modified=last_modified ) hits.append(entry) if not hits: date = datetime.date.fromisoformat(last_modified) self.state.empty_projects[endpoint] = date else: self.state.empty_projects.pop(endpoint, None) return hits diff --git a/swh/lister/sourceforge/tests/test_lister.py b/swh/lister/sourceforge/tests/test_lister.py index 516f562..d6aabc3 100644 --- a/swh/lister/sourceforge/tests/test_lister.py +++ b/swh/lister/sourceforge/tests/test_lister.py @@ -1,536 +1,537 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information + import datetime import functools import json from pathlib import Path import re from iso8601 import iso8601 import pytest from requests.exceptions import HTTPError from swh.lister import USER_AGENT from swh.lister.sourceforge.lister import ( MAIN_SITEMAP_URL, PROJECT_API_URL_FORMAT, SourceForgeLister, SourceForgeListerState, ) from swh.lister.tests.test_utils import assert_sleep_calls from swh.lister.utils import WAIT_EXP_BASE # Mapping of project name to namespace from swh.scheduler.model import ListedOrigin TEST_PROJECTS = { "aaron": "p", "adobexmp": "adobe", "backapps": "p", "backapps/website": "p", "bzr-repo": "p", "mojunk": "p", "mramm": "p", "os3dmodels": "p", "random-mercurial": "p", "t12eksandbox": "p", "ocaml-lpd": "p", } URLS_MATCHER = { PROJECT_API_URL_FORMAT.format(namespace=namespace, project=project): project for project, namespace in TEST_PROJECTS.items() } def get_main_sitemap(datadir): return Path(datadir, "main-sitemap.xml").read_text() def get_subsitemap_0(datadir): return Path(datadir, "subsitemap-0.xml").read_text() def get_subsitemap_1(datadir): return Path(datadir, "subsitemap-1.xml").read_text() def get_project_json(datadir, request, context): url = request.url project = URLS_MATCHER.get(url) assert project is not None, f"Url '{url}' could not be matched" project = project.replace("/", "-") return json.loads(Path(datadir, f"{project}.json").read_text()) def get_cvs_info_page(datadir): return Path(datadir, "aaron.html").read_text() def get_bzr_repo_page(datadir, repo_name): return Path(datadir, f"{repo_name}.html").read_text() def _check_request_headers(request): return request.headers.get("User-Agent") == USER_AGENT def _check_listed_origins(lister, swh_scheduler): scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results res = {o.url: (o.visit_type, str(o.last_update.date())) for o in scheduler_origins} assert res == { "https://svn.code.sf.net/p/backapps/website/code": ("svn", "2021-02-11"), "https://git.code.sf.net/p/os3dmodels/git": ("git", "2017-03-31"), "https://svn.code.sf.net/p/os3dmodels/svn": ("svn", "2017-03-31"), "https://git.code.sf.net/p/mramm/files": ("git", "2019-04-04"), "https://git.code.sf.net/p/mramm/git": ("git", "2019-04-04"), "https://svn.code.sf.net/p/mramm/svn": ("svn", "2019-04-04"), "https://git.code.sf.net/p/mojunk/git": ("git", "2017-12-31"), "https://git.code.sf.net/p/mojunk/git2": ("git", "2017-12-31"), "https://svn.code.sf.net/p/mojunk/svn": ("svn", "2017-12-31"), "http://hg.code.sf.net/p/random-mercurial/hg": ("hg", "2019-05-02"), "http://t12eksandbox.bzr.sourceforge.net/bzr/t12eksandbox": ( "bzr", "2011-02-09", ), "http://ocaml-lpd.bzr.sourceforge.net/bzr/ocaml-lpd/trunk": ( "bzr", "2011-04-17", ), "rsync://a.cvs.sourceforge.net/cvsroot/aaron/aaron": ("cvs", "2013-03-07"), "rsync://a.cvs.sourceforge.net/cvsroot/aaron/www": ("cvs", "2013-03-07"), } def test_sourceforge_lister_full(swh_scheduler, requests_mock, datadir): """ Simulate a full listing of an artificially restricted sourceforge. There are 5 different projects, spread over two sub-sitemaps, a few of which have multiple VCS listed, one has none, one is outside of the standard `/p/` namespace, some with custom mount points. All non-interesting but related entries have been kept. """ lister = SourceForgeLister(scheduler=swh_scheduler) requests_mock.get( MAIN_SITEMAP_URL, text=get_main_sitemap(datadir), additional_matcher=_check_request_headers, ) requests_mock.get( "https://sourceforge.net/allura_sitemap/sitemap-0.xml", text=get_subsitemap_0(datadir), additional_matcher=_check_request_headers, ) requests_mock.get( "https://sourceforge.net/allura_sitemap/sitemap-1.xml", text=get_subsitemap_1(datadir), additional_matcher=_check_request_headers, ) requests_mock.get( re.compile("https://sourceforge.net/rest/.*"), json=functools.partial(get_project_json, datadir), additional_matcher=_check_request_headers, ) requests_mock.get( re.compile("http://aaron.cvs.sourceforge.net/"), text=get_cvs_info_page(datadir), additional_matcher=_check_request_headers, ) requests_mock.get( re.compile("http://t12eksandbox.bzr.sourceforge.net/bzr/t12eksandbox"), text=get_bzr_repo_page(datadir, "t12eksandbox"), additional_matcher=_check_request_headers, ) requests_mock.get( re.compile("http://ocaml-lpd.bzr.sourceforge.net/bzr/ocaml-lpd"), text=get_bzr_repo_page(datadir, "ocaml-lpd"), additional_matcher=_check_request_headers, ) stats = lister.run() # - os3dmodels (2 repos), # - mramm (3 repos), # - mojunk (3 repos), # - backapps/website (1 repo), # - random-mercurial (1 repo). # - t12eksandbox (1 repo). # - ocaml-lpd (1 repo). # adobe and backapps itself have no repos. assert stats.pages == 8 assert stats.origins == 14 expected_state = { "subsitemap_last_modified": { "https://sourceforge.net/allura_sitemap/sitemap-0.xml": "2021-03-18", "https://sourceforge.net/allura_sitemap/sitemap-1.xml": "2021-03-18", }, "empty_projects": { "https://sourceforge.net/rest/p/backapps": "2021-02-11", "https://sourceforge.net/rest/adobe/adobexmp": "2017-10-17", }, } assert lister.state_to_dict(lister.state) == expected_state _check_listed_origins(lister, swh_scheduler) def test_sourceforge_lister_incremental(swh_scheduler, requests_mock, datadir, mocker): """ Simulate an incremental listing of an artificially restricted sourceforge. Same dataset as the full run, because it's enough to validate the different cases. """ lister = SourceForgeLister(scheduler=swh_scheduler, incremental=True) requests_mock.get( MAIN_SITEMAP_URL, text=get_main_sitemap(datadir), additional_matcher=_check_request_headers, ) def not_called(request, *args, **kwargs): raise AssertionError(f"Should not have been called: '{request.url}'") requests_mock.get( "https://sourceforge.net/allura_sitemap/sitemap-0.xml", text=get_subsitemap_0(datadir), additional_matcher=_check_request_headers, ) requests_mock.get( "https://sourceforge.net/allura_sitemap/sitemap-1.xml", text=not_called, additional_matcher=_check_request_headers, ) def filtered_get_project_json(request, context): # These projects should not be requested again assert URLS_MATCHER[request.url] not in {"adobe", "mojunk"} return get_project_json(datadir, request, context) requests_mock.get( re.compile("https://sourceforge.net/rest/.*"), json=filtered_get_project_json, additional_matcher=_check_request_headers, ) requests_mock.get( re.compile("http://aaron.cvs.sourceforge.net/"), text=get_cvs_info_page(datadir), additional_matcher=_check_request_headers, ) requests_mock.get( re.compile("http://t12eksandbox.bzr.sourceforge.net/bzr/t12eksandbox"), text=get_bzr_repo_page(datadir, "t12eksandbox"), additional_matcher=_check_request_headers, ) requests_mock.get( re.compile("http://ocaml-lpd.bzr.sourceforge.net/bzr/ocaml-lpd"), text=get_bzr_repo_page(datadir, "ocaml-lpd"), additional_matcher=_check_request_headers, ) faked_listed_origins = [ # mramm: changed ListedOrigin( lister_id=lister.lister_obj.id, visit_type="git", url="https://git.code.sf.net/p/mramm/files", last_update=iso8601.parse_date("2019-01-01"), ), ListedOrigin( lister_id=lister.lister_obj.id, visit_type="git", url="https://git.code.sf.net/p/mramm/git", last_update=iso8601.parse_date("2019-01-01"), ), ListedOrigin( lister_id=lister.lister_obj.id, visit_type="svn", url="https://svn.code.sf.net/p/mramm/svn", last_update=iso8601.parse_date("2019-01-01"), ), # stayed the same, even though its subsitemap has changed ListedOrigin( lister_id=lister.lister_obj.id, visit_type="git", url="https://git.code.sf.net/p/os3dmodels/git", last_update=iso8601.parse_date("2017-03-31"), ), ListedOrigin( lister_id=lister.lister_obj.id, visit_type="svn", url="https://svn.code.sf.net/p/os3dmodels/svn", last_update=iso8601.parse_date("2017-03-31"), ), # others: stayed the same, should be skipped ListedOrigin( lister_id=lister.lister_obj.id, visit_type="git", url="https://git.code.sf.net/p/mojunk/git", last_update=iso8601.parse_date("2017-12-31"), ), ListedOrigin( lister_id=lister.lister_obj.id, visit_type="git", url="https://git.code.sf.net/p/mojunk/git2", last_update=iso8601.parse_date("2017-12-31"), ), ListedOrigin( lister_id=lister.lister_obj.id, visit_type="svn", url="https://svn.code.sf.net/p/mojunk/svn", last_update=iso8601.parse_date("2017-12-31"), ), ListedOrigin( lister_id=lister.lister_obj.id, visit_type="svn", url="https://svn.code.sf.net/p/backapps/website/code", last_update=iso8601.parse_date("2021-02-11"), ), ListedOrigin( lister_id=lister.lister_obj.id, visit_type="hg", url="http://hg.code.sf.net/p/random-mercurial/hg", last_update=iso8601.parse_date("2019-05-02"), ), ListedOrigin( lister_id=lister.lister_obj.id, visit_type="bzr", url="http://t12eksandbox.bzr.sourceforge.net/bzr/t12eksandbox", last_update=iso8601.parse_date("2011-02-09"), ), ListedOrigin( lister_id=lister.lister_obj.id, visit_type="bzr", url="http://ocaml-lpd.bzr.sourceforge.net/bzr/ocaml-lpd/trunk", last_update=iso8601.parse_date("2011-04-17"), ), ListedOrigin( lister_id=lister.lister_obj.id, visit_type="cvs", url="rsync://a.cvs.sourceforge.net/cvsroot/aaron/aaron", last_update=iso8601.parse_date("2013-03-07"), ), ListedOrigin( lister_id=lister.lister_obj.id, visit_type="cvs", url="rsync://a.cvs.sourceforge.net/cvsroot/aaron/www", last_update=iso8601.parse_date("2013-03-07"), ), ] swh_scheduler.record_listed_origins(faked_listed_origins) to_date = datetime.date.fromisoformat faked_state = SourceForgeListerState( subsitemap_last_modified={ # changed "https://sourceforge.net/allura_sitemap/sitemap-0.xml": to_date( "2021-02-18" ), # stayed the same "https://sourceforge.net/allura_sitemap/sitemap-1.xml": to_date( "2021-03-18" ), }, empty_projects={ "https://sourceforge.net/rest/p/backapps": to_date("2020-02-11"), "https://sourceforge.net/rest/adobe/adobexmp": to_date("2017-10-17"), }, ) lister.state = faked_state stats = lister.run() # - mramm (3 repos), # changed assert stats.pages == 1 assert stats.origins == 3 expected_state = { "subsitemap_last_modified": { "https://sourceforge.net/allura_sitemap/sitemap-0.xml": "2021-03-18", "https://sourceforge.net/allura_sitemap/sitemap-1.xml": "2021-03-18", }, "empty_projects": { "https://sourceforge.net/rest/p/backapps": "2021-02-11", # changed "https://sourceforge.net/rest/adobe/adobexmp": "2017-10-17", }, } assert lister.state_to_dict(lister.state) == expected_state # origins have been updated _check_listed_origins(lister, swh_scheduler) def test_sourceforge_lister_retry(swh_scheduler, requests_mock, mocker, datadir): lister = SourceForgeLister(scheduler=swh_scheduler) # Exponential retries take a long time, so stub time.sleep - mocked_sleep = mocker.patch.object(lister.page_request.retry, "sleep") + mocked_sleep = mocker.patch.object(lister.http_request.retry, "sleep") requests_mock.get( MAIN_SITEMAP_URL, [ {"status_code": 429}, {"status_code": 429}, {"text": get_main_sitemap(datadir)}, ], additional_matcher=_check_request_headers, ) requests_mock.get( "https://sourceforge.net/allura_sitemap/sitemap-0.xml", [{"status_code": 429}, {"text": get_subsitemap_0(datadir), "status_code": 301}], additional_matcher=_check_request_headers, ) requests_mock.get( "https://sourceforge.net/allura_sitemap/sitemap-1.xml", [{"status_code": 429}, {"text": get_subsitemap_1(datadir)}], additional_matcher=_check_request_headers, ) requests_mock.get( re.compile("https://sourceforge.net/rest/.*"), [{"status_code": 429}, {"json": functools.partial(get_project_json, datadir)}], additional_matcher=_check_request_headers, ) requests_mock.get( re.compile("http://aaron.cvs.sourceforge.net/"), text=get_cvs_info_page(datadir), additional_matcher=_check_request_headers, ) requests_mock.get( re.compile("http://t12eksandbox.bzr.sourceforge.net/bzr/t12eksandbox"), text=get_bzr_repo_page(datadir, "t12eksandbox"), additional_matcher=_check_request_headers, ) requests_mock.get( re.compile("http://ocaml-lpd.bzr.sourceforge.net/bzr/ocaml-lpd"), text=get_bzr_repo_page(datadir, "ocaml-lpd"), additional_matcher=_check_request_headers, ) stats = lister.run() # - os3dmodels (2 repos), # - mramm (3 repos), # - mojunk (3 repos), # - backapps/website (1 repo), # - random-mercurial (1 repo). # - t12eksandbox (1 repo). # - ocaml-lpd (1 repo). # adobe and backapps itself have no repos. assert stats.pages == 8 assert stats.origins == 14 _check_listed_origins(lister, swh_scheduler) # Test `time.sleep` is called with exponential retries assert_sleep_calls(mocker, mocked_sleep, [1, WAIT_EXP_BASE, 1, 1]) @pytest.mark.parametrize("status_code", [500, 503, 504, 403, 404]) def test_sourceforge_lister_http_error( swh_scheduler, requests_mock, status_code, mocker ): lister = SourceForgeLister(scheduler=swh_scheduler) # Exponential retries take a long time, so stub time.sleep - mocked_sleep = mocker.patch.object(lister.page_request.retry, "sleep") + mocked_sleep = mocker.patch.object(lister.http_request.retry, "sleep") requests_mock.get(MAIN_SITEMAP_URL, status_code=status_code) with pytest.raises(HTTPError): lister.run() exp_retries = [] if status_code >= 500: exp_retries = [1.0, 10.0, 100.0, 1000.0] assert_sleep_calls(mocker, mocked_sleep, exp_retries) @pytest.mark.parametrize("status_code", [500, 503, 504, 403, 404]) def test_sourceforge_lister_project_error( datadir, swh_scheduler, requests_mock, status_code, mocker ): lister = SourceForgeLister(scheduler=swh_scheduler) # Exponential retries take a long time, so stub time.sleep - mocker.patch.object(lister.page_request.retry, "sleep") + mocker.patch.object(lister.http_request.retry, "sleep") requests_mock.get( MAIN_SITEMAP_URL, text=get_main_sitemap(datadir), additional_matcher=_check_request_headers, ) requests_mock.get( "https://sourceforge.net/allura_sitemap/sitemap-0.xml", text=get_subsitemap_0(datadir), additional_matcher=_check_request_headers, ) requests_mock.get( "https://sourceforge.net/allura_sitemap/sitemap-1.xml", text=get_subsitemap_1(datadir), additional_matcher=_check_request_headers, ) # Request mocks precedence is LIFO requests_mock.get( re.compile("https://sourceforge.net/rest/.*"), json=functools.partial(get_project_json, datadir), additional_matcher=_check_request_headers, ) requests_mock.get( re.compile("http://t12eksandbox.bzr.sourceforge.net/bzr/t12eksandbox"), text=get_bzr_repo_page(datadir, "t12eksandbox"), additional_matcher=_check_request_headers, ) requests_mock.get( re.compile("http://ocaml-lpd.bzr.sourceforge.net/bzr/ocaml-lpd"), text=get_bzr_repo_page(datadir, "ocaml-lpd"), additional_matcher=_check_request_headers, ) # Make all `mramm` requests fail # `mramm` is in subsitemap 0, which ensures we keep listing after an error. requests_mock.get( re.compile("https://sourceforge.net/rest/p/mramm"), status_code=status_code ) # Make request to CVS info page fail requests_mock.get( re.compile("http://aaron.cvs.sourceforge.net/"), status_code=status_code ) stats = lister.run() # - os3dmodels (2 repos), # - mojunk (3 repos), # - backapps/website (1 repo), # - random-mercurial (1 repo). # - t12eksandbox (1 repo). # - ocaml-lpd (1 repo). # adobe and backapps itself have no repos. # Did *not* list mramm assert stats.pages == 6 assert stats.origins == 9 scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results res = {o.url: (o.visit_type, str(o.last_update.date())) for o in scheduler_origins} # Ensure no `mramm` origins are listed, but all others are. assert res == { "https://svn.code.sf.net/p/backapps/website/code": ("svn", "2021-02-11"), "https://git.code.sf.net/p/os3dmodels/git": ("git", "2017-03-31"), "https://svn.code.sf.net/p/os3dmodels/svn": ("svn", "2017-03-31"), "https://git.code.sf.net/p/mojunk/git": ("git", "2017-12-31"), "https://git.code.sf.net/p/mojunk/git2": ("git", "2017-12-31"), "https://svn.code.sf.net/p/mojunk/svn": ("svn", "2017-12-31"), "http://hg.code.sf.net/p/random-mercurial/hg": ("hg", "2019-05-02"), "http://t12eksandbox.bzr.sourceforge.net/bzr/t12eksandbox": ( "bzr", "2011-02-09", ), "http://ocaml-lpd.bzr.sourceforge.net/bzr/ocaml-lpd/trunk": ( "bzr", "2011-04-17", ), } diff --git a/swh/lister/tuleap/lister.py b/swh/lister/tuleap/lister.py index 7e0b800..4a55499 100644 --- a/swh/lister/tuleap/lister.py +++ b/swh/lister/tuleap/lister.py @@ -1,150 +1,123 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from typing import Any, Dict, Iterator, List, Optional from urllib.parse import urljoin import iso8601 -import requests -from tenacity.before_sleep import before_sleep_log -from swh.lister.utils import http_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin -from .. import USER_AGENT from ..pattern import CredentialsType, StatelessLister logger = logging.getLogger(__name__) RepoPage = Dict[str, Any] class TuleapLister(StatelessLister[RepoPage]): """List origins from Tuleap. Tuleap provides SVN and Git repositories hosting. Tuleap API getting started: https://tuleap.net/doc/en/user-guide/integration/rest.html Tuleap API reference: https://tuleap.net/api/explorer/ Using the API we first request a list of projects, and from there request their associated repositories individually. Everything is paginated, code uses throttling at the individual GET call level.""" LISTER_NAME = "tuleap" REPO_LIST_PATH = "/api" REPO_GIT_PATH = "plugins/git/" REPO_SVN_PATH = "plugins/svn/" def __init__( self, scheduler: SchedulerInterface, url: str, instance: Optional[str] = None, credentials: CredentialsType = None, ): super().__init__( scheduler=scheduler, credentials=credentials, url=url, instance=instance, ) - self.session = requests.Session() - self.session.headers.update( - { - "Accept": "application/json", - "User-Agent": USER_AGENT, - } - ) - - @http_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) - def page_request(self, url: str, params: Dict[str, Any]) -> requests.Response: - - logger.info("Fetching URL %s with params %s", url, params) - - response = self.session.get(url, params=params) - if response.status_code != 200: - logger.warning( - "Unexpected HTTP status code %s on %s: %s", - response.status_code, - response.url, - response.content, - ) - response.raise_for_status() - - return response + self.session.headers.update({"Accept": "application/json"}) @classmethod def results_simplified(cls, url: str, repo_type: str, repo: RepoPage) -> RepoPage: if repo_type == "git": prefix_url = TuleapLister.REPO_GIT_PATH else: prefix_url = TuleapLister.REPO_SVN_PATH rep = { "project": repo["name"], "type": repo_type, "uri": urljoin(url, f"{prefix_url}{repo['path']}"), "last_update_date": repo["last_update_date"], } return rep def _get_repositories(self, url_repo) -> List[Dict[str, Any]]: - ret = self.page_request(url_repo, {}) + ret = self.http_request(url_repo) reps_list = ret.json()["repositories"] limit = int(ret.headers["X-PAGINATION-LIMIT-MAX"]) offset = int(ret.headers["X-PAGINATION-LIMIT"]) size = int(ret.headers["X-PAGINATION-SIZE"]) while offset < size: url_offset = url_repo + "?offset=" + str(offset) + "&limit=" + str(limit) - ret = self.page_request(url_offset, {}).json() + ret = self.http_request(url_offset).json() reps_list = reps_list + ret["repositories"] offset += limit return reps_list def get_pages(self) -> Iterator[RepoPage]: # base with trailing slash, path without leading slash for urljoin url_api: str = urljoin(self.url, self.REPO_LIST_PATH) url_projects = url_api + "/projects/" # Get the list of projects. - response = self.page_request(url_projects, {}) + response = self.http_request(url_projects) projects_list = response.json() limit = int(response.headers["X-PAGINATION-LIMIT-MAX"]) offset = int(response.headers["X-PAGINATION-LIMIT"]) size = int(response.headers["X-PAGINATION-SIZE"]) while offset < size: url_offset = ( url_projects + "?offset=" + str(offset) + "&limit=" + str(limit) ) - ret = self.page_request(url_offset, {}).json() + ret = self.http_request(url_offset).json() projects_list = projects_list + ret offset += limit # Get list of repositories for each project. for p in projects_list: p_id = p["id"] # Fetch Git repositories for project url_git = url_projects + str(p_id) + "/git" repos = self._get_repositories(url_git) for repo in repos: yield self.results_simplified(url_api, "git", repo) def get_origins_from_page(self, page: RepoPage) -> Iterator[ListedOrigin]: """Convert a page of Tuleap repositories into a list of ListedOrigins.""" assert self.lister_obj.id is not None yield ListedOrigin( lister_id=self.lister_obj.id, url=page["uri"], visit_type=page["type"], last_update=iso8601.parse_date(page["last_update_date"]), ) diff --git a/swh/lister/tuleap/tests/test_lister.py b/swh/lister/tuleap/tests/test_lister.py index d650dc8..722e708 100644 --- a/swh/lister/tuleap/tests/test_lister.py +++ b/swh/lister/tuleap/tests/test_lister.py @@ -1,170 +1,170 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from pathlib import Path from typing import Dict, List, Tuple import pytest import requests from swh.lister.tuleap.lister import RepoPage, TuleapLister from swh.scheduler.model import ListedOrigin TULEAP_URL = "https://tuleap.net/" TULEAP_PROJECTS_URL = TULEAP_URL + "api/projects/" TULEAP_REPO_1_URL = TULEAP_URL + "api/projects/685/git" # manjaromemodoc TULEAP_REPO_2_URL = TULEAP_URL + "api/projects/309/git" # myaurora TULEAP_REPO_3_URL = TULEAP_URL + "api/projects/1080/git" # tuleap cleanup module GIT_REPOS = ( "https://tuleap.net/plugins/git/manjaromemodoc/manjaro-memo-documentation.git", "https://tuleap.net/plugins/git/myaurora/myaurora.git", ) @pytest.fixture def tuleap_projects(datadir) -> Tuple[str, Dict[str, str], List[str]]: text = Path(datadir, "https_tuleap.net", "projects").read_text() headers = { "X-PAGINATION-LIMIT-MAX": "50", "X-PAGINATION-LIMIT": "10", "X-PAGINATION-SIZE": "2", } repo_json = json.loads(text) projects = [p["shortname"] for p in repo_json] return text, headers, projects @pytest.fixture def tuleap_repo_1(datadir) -> Tuple[str, Dict[str, str], List[RepoPage], List[str]]: text = Path(datadir, "https_tuleap.net", "repo_1").read_text() headers = { "X-PAGINATION-LIMIT-MAX": "50", "X-PAGINATION-LIMIT": "10", "X-PAGINATION-SIZE": "1", } reps = json.loads(text) page_results = [] for r in reps["repositories"]: page_results.append( TuleapLister.results_simplified(url=TULEAP_URL, repo_type="git", repo=r) ) origin_urls = [r["uri"] for r in page_results] return text, headers, page_results, origin_urls @pytest.fixture def tuleap_repo_2(datadir) -> Tuple[str, Dict[str, str], List[RepoPage], List[str]]: text = Path(datadir, "https_tuleap.net", "repo_2").read_text() headers = { "X-PAGINATION-LIMIT-MAX": "50", "X-PAGINATION-LIMIT": "10", "X-PAGINATION-SIZE": "1", } reps = json.loads(text) page_results = [] for r in reps["repositories"]: page_results.append( TuleapLister.results_simplified(url=TULEAP_URL, repo_type="git", repo=r) ) origin_urls = [r["uri"] for r in page_results] return text, headers, page_results, origin_urls @pytest.fixture def tuleap_repo_3(datadir) -> Tuple[str, Dict[str, str], List[RepoPage], List[str]]: text = Path(datadir, "https_tuleap.net", "repo_3").read_text() headers = { "X-PAGINATION-LIMIT-MAX": "50", "X-PAGINATION-LIMIT": "10", "X-PAGINATION-SIZE": "0", } reps = json.loads(text) page_results = [] for r in reps["repositories"]: page_results.append( TuleapLister.results_simplified(url=TULEAP_URL, repo_type="git", repo=r) ) origin_urls = [r["uri"] for r in page_results] return text, headers, page_results, origin_urls @pytest.fixture(autouse=True) def retry_sleep_mock(mocker): - mocker.patch.object(TuleapLister.page_request.retry, "sleep") + mocker.patch.object(TuleapLister.http_request.retry, "sleep") def check_listed_origins(lister_urls: List[str], scheduler_origins: List[ListedOrigin]): """Asserts that the two collections have the same origin URLs. Does not test last_update.""" assert set(lister_urls) == {origin.url for origin in scheduler_origins} def test_tuleap_full_listing( swh_scheduler, requests_mock, mocker, tuleap_projects, tuleap_repo_1, tuleap_repo_2, tuleap_repo_3, ): """Covers full listing of multiple pages, rate-limit, page size (required for test), checking page results and listed origins, statelessness.""" lister = TuleapLister( scheduler=swh_scheduler, url=TULEAP_URL, instance="tuleap.net" ) p_text, p_headers, p_projects = tuleap_projects r1_text, r1_headers, r1_result, r1_origin_urls = tuleap_repo_1 r2_text, r2_headers, r2_result, r2_origin_urls = tuleap_repo_2 r3_text, r3_headers, r3_result, r3_origin_urls = tuleap_repo_3 requests_mock.get(TULEAP_PROJECTS_URL, text=p_text, headers=p_headers) requests_mock.get(TULEAP_REPO_1_URL, text=r1_text, headers=r1_headers) requests_mock.get( TULEAP_REPO_2_URL, [ {"status_code": requests.codes.too_many_requests}, {"text": r2_text, "headers": r2_headers}, ], ) requests_mock.get(TULEAP_REPO_3_URL, text=r3_text, headers=r3_headers) # end test setup stats = lister.run() # start test checks assert stats.pages == 2 assert stats.origins == 2 scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results check_listed_origins( r1_origin_urls + r2_origin_urls + r3_origin_urls, scheduler_origins ) check_listed_origins(GIT_REPOS, scheduler_origins) assert lister.get_state_from_scheduler() is None @pytest.mark.parametrize("http_code", [400, 500, 502]) def test_tuleap_list_http_error(swh_scheduler, requests_mock, http_code): """Test handling of some HTTP errors commonly encountered""" lister = TuleapLister(scheduler=swh_scheduler, url=TULEAP_URL) requests_mock.get(TULEAP_PROJECTS_URL, status_code=http_code) with pytest.raises(requests.HTTPError): lister.run() scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results assert len(scheduler_origins) == 0