diff --git a/swh/lister/maven/lister.py b/swh/lister/maven/lister.py index bc1c2b6..4ccc532 100644 --- a/swh/lister/maven/lister.py +++ b/swh/lister/maven/lister.py @@ -1,390 +1,390 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import asdict, dataclass from datetime import datetime, timezone import logging import re from typing import Any, Dict, Iterator, Optional from urllib.parse import urljoin import requests from tenacity.before_sleep import before_sleep_log import xmltodict from swh.lister.utils import throttling_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin from .. import USER_AGENT from ..pattern import CredentialsType, Lister logger = logging.getLogger(__name__) RepoPage = Dict[str, Any] @dataclass class MavenListerState: """State of the MavenLister""" last_seen_doc: int = -1 """Last doc ID ingested during an incremental pass """ last_seen_pom: int = -1 """Last doc ID related to a pom and ingested during an incremental pass """ class MavenLister(Lister[MavenListerState, RepoPage]): """List origins from a Maven repository. Maven Central provides artifacts for Java builds. It includes POM files and source archives, which we download to get the source code of artifacts and links to their scm repository. This lister yields origins of types: git/svn/hg or whatever the Artifacts use as repository type, plus maven types for the maven loader (tgz, jar).""" LISTER_NAME = "maven" def __init__( self, scheduler: SchedulerInterface, url: str, index_url: str = None, instance: Optional[str] = None, credentials: CredentialsType = None, incremental: bool = True, ): """Lister class for Maven repositories. Args: url: main URL of the Maven repository, i.e. url of the base index used to fetch maven artifacts. For Maven central use https://repo1.maven.org/maven2/ index_url: the URL to download the exported text indexes from. Would typically be a local host running the export docker image. See README.md in this directory for more information. instance: Name of maven instance. Defaults to url's network location if unset. incremental: bool, defaults to True. Defines if incremental listing is activated or not. """ self.BASE_URL = url self.INDEX_URL = index_url self.incremental = incremental super().__init__( scheduler=scheduler, credentials=credentials, url=url, instance=instance, ) self.session = requests.Session() self.session.headers.update( { "Accept": "application/json", "User-Agent": USER_AGENT, } ) self.jar_origins: Dict[str, ListedOrigin] = {} def state_from_dict(self, d: Dict[str, Any]) -> MavenListerState: return MavenListerState(**d) def state_to_dict(self, state: MavenListerState) -> Dict[str, Any]: return asdict(state) @throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) def page_request(self, url: str, params: Dict[str, Any]) -> requests.Response: logger.info("Fetching URL %s with params %s", url, params) response = self.session.get(url, params=params) if response.status_code != 200: logger.warning( "Unexpected HTTP status code %s on %s: %s", response.status_code, response.url, response.content, ) response.raise_for_status() return response def get_pages(self) -> Iterator[RepoPage]: """Retrieve and parse exported maven indexes to identify all pom files and src archives. """ # Example of returned RepoPage's: # [ # { # "type": "maven", # "url": "https://maven.xwiki.org/..-5.4.2-sources.jar", # "time": 1626109619335, # "gid": "org.xwiki.platform", # "aid": "xwiki-platform-wikistream-events-xwiki", # "version": "5.4.2" # }, # { # "type": "scm", # "url": "scm:git:git://github.com/openengsb/openengsb-framework.git", # "project": "openengsb-framework", # }, # ... # ] # Download the main text index file. logger.info("Downloading computed index from %s.", self.INDEX_URL) assert self.INDEX_URL is not None response = requests.get(self.INDEX_URL, stream=True) if response.status_code != 200: logger.error("Index %s not found, stopping", self.INDEX_URL) response.raise_for_status() # Prepare regexes to parse index exports. # Parse doc id. # Example line: "doc 13" re_doc = re.compile(r"^doc (?P\d+)$") # Parse gid, aid, version, classifier, extension. # Example line: " value al.aldi|sprova4j|0.1.0|sources|jar" re_val = re.compile( r"^\s{4}value (?P[^|]+)\|(?P[^|]+)\|(?P[^|]+)\|" + r"(?P[^|]+)\|(?P[^|]+)$" ) # Parse last modification time. # Example line: " value jar|1626109619335|14316|2|2|0|jar" re_time = re.compile( r"^\s{4}value ([^|]+)\|(?P[^|]+)\|([^|]+)\|([^|]+)\|([^|]+)" + r"\|([^|]+)\|([^|]+)$" ) # Read file line by line and process it out_pom: Dict = {} jar_src: Dict = {} doc_id: int = 0 jar_src["doc"] = None url_src = None iterator = response.iter_lines(chunk_size=1024) for line_bytes in iterator: # Read the index text export and get URLs and SCMs. line = line_bytes.decode(errors="ignore") m_doc = re_doc.match(line) if m_doc is not None: doc_id = int(m_doc.group("doc")) # jar_src["doc"] contains the id of the current document, whatever # its type (scm or jar). jar_src["doc"] = doc_id else: m_val = re_val.match(line) if m_val is not None: (gid, aid, version, classifier, ext) = m_val.groups() ext = ext.strip() path = "/".join(gid.split(".")) if classifier == "NA" and ext.lower() == "pom": # If incremental mode, we don't record any line that is # before our last recorded doc id. if ( self.incremental and self.state and self.state.last_seen_pom and self.state.last_seen_pom >= doc_id ): continue url_path = f"{path}/{aid}/{version}/{aid}-{version}.{ext}" url_pom = urljoin( self.BASE_URL, url_path, ) out_pom[url_pom] = doc_id elif ( classifier.lower() == "sources" or ("src" in classifier) ) and ext.lower() in ("zip", "jar"): url_path = ( f"{path}/{aid}/{version}/{aid}-{version}-{classifier}.{ext}" ) url_src = urljoin(self.BASE_URL, url_path) jar_src["gid"] = gid jar_src["aid"] = aid jar_src["version"] = version else: m_time = re_time.match(line) if m_time is not None and url_src is not None: time = m_time.group("mtime") jar_src["time"] = int(time) artifact_metadata_d = { "type": "maven", "url": url_src, **jar_src, } logger.debug( "* Yielding jar %s: %s", url_src, artifact_metadata_d ) yield artifact_metadata_d url_src = None logger.info("Found %s poms.", len(out_pom)) # Now fetch pom files and scan them for scm info. logger.info("Fetching poms..") for pom in out_pom: try: response = self.page_request(pom, {}) - project = xmltodict.parse(response.content.decode()) + project = xmltodict.parse(response.content) project_d = project.get("project", {}) scm_d = project_d.get("scm") if scm_d is not None: connection = scm_d.get("connection") if connection is not None: artifact_metadata_d = { "type": "scm", "doc": out_pom[pom], "url": connection, } logger.debug("* Yielding pom %s: %s", pom, artifact_metadata_d) yield artifact_metadata_d else: logger.debug("No scm.connection in pom %s", pom) else: logger.debug("No scm in pom %s", pom) except requests.HTTPError: logger.warning( "POM info page could not be fetched, skipping project '%s'", pom, ) except xmltodict.expat.ExpatError as error: logger.info("Could not parse POM %s XML: %s. Next.", pom, error) def get_origins_from_page(self, page: RepoPage) -> Iterator[ListedOrigin]: """Convert a page of Maven repositories into a list of ListedOrigins.""" assert self.lister_obj.id is not None scm_types_ok = ("git", "svn", "hg", "cvs", "bzr") if page["type"] == "scm": # If origin is a scm url: detect scm type and yield. # Note that the official format is: # scm:git:git://github.com/openengsb/openengsb-framework.git # but many, many projects directly put the repo url, so we have to # detect the content to match it properly. m_scm = re.match(r"^scm:(?P[^:]+):(?P.*)$", page["url"]) if m_scm is not None: scm_type = m_scm.group("type") if scm_type in scm_types_ok: scm_url = m_scm.group("url") origin = ListedOrigin( lister_id=self.lister_obj.id, url=scm_url, visit_type=scm_type, ) yield origin else: if page["url"].endswith(".git"): origin = ListedOrigin( lister_id=self.lister_obj.id, url=page["url"], visit_type="git", ) yield origin else: # Origin is gathering source archives: last_update_dt = None last_update_iso = "" try: last_update_seconds = str(page["time"])[:-3] last_update_dt = datetime.fromtimestamp(int(last_update_seconds)) last_update_dt = last_update_dt.astimezone(timezone.utc) except (OverflowError, ValueError): logger.warning("- Failed to convert datetime %s.", last_update_seconds) if last_update_dt: last_update_iso = last_update_dt.isoformat() # Origin URL will target page holding sources for all versions of # an artifactId (package name) inside a groupId (namespace) path = "/".join(page["gid"].split(".")) origin_url = urljoin(self.BASE_URL, f"{path}/{page['aid']}") artifact = { **{k: v for k, v in page.items() if k != "doc"}, "time": last_update_iso, "base_url": self.BASE_URL, } if origin_url not in self.jar_origins: # Create ListedOrigin instance if we did not see that origin yet jar_origin = ListedOrigin( lister_id=self.lister_obj.id, url=origin_url, visit_type=page["type"], last_update=last_update_dt, extra_loader_arguments={"artifacts": [artifact]}, ) self.jar_origins[origin_url] = jar_origin else: # Update list of source artifacts for that origin otherwise jar_origin = self.jar_origins[origin_url] artifacts = jar_origin.extra_loader_arguments["artifacts"] if artifact not in artifacts: artifacts.append(artifact) if ( jar_origin.last_update and last_update_dt and last_update_dt > jar_origin.last_update ): jar_origin.last_update = last_update_dt if not self.incremental or ( self.state and page["doc"] > self.state.last_seen_doc ): # Yield origin with updated source artifacts, multiple instances of # ListedOrigin for the same origin URL but with different artifacts # list will be sent to the scheduler but it will deduplicate them and # take the latest one to upsert in database yield jar_origin def commit_page(self, page: RepoPage) -> None: """Update currently stored state using the latest listed doc. Note: this is a noop for full listing mode """ if self.incremental and self.state: # We need to differentiate the two state counters according # to the type of origin. if page["type"] == "maven" and page["doc"] > self.state.last_seen_doc: self.state.last_seen_doc = page["doc"] elif page["type"] == "scm" and page["doc"] > self.state.last_seen_pom: self.state.last_seen_doc = page["doc"] self.state.last_seen_pom = page["doc"] def finalize(self) -> None: """Finalize the lister state, set update if any progress has been made. Note: this is a noop for full listing mode """ if self.incremental and self.state: last_seen_doc = self.state.last_seen_doc last_seen_pom = self.state.last_seen_pom scheduler_state = self.get_state_from_scheduler() if last_seen_doc and last_seen_pom: if (scheduler_state.last_seen_doc < last_seen_doc) or ( scheduler_state.last_seen_pom < last_seen_pom ): self.updated = True diff --git a/swh/lister/maven/tests/test_lister.py b/swh/lister/maven/tests/test_lister.py index d8e30ab..331461e 100644 --- a/swh/lister/maven/tests/test_lister.py +++ b/swh/lister/maven/tests/test_lister.py @@ -1,319 +1,334 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from pathlib import Path import iso8601 import pytest import requests from swh.lister.maven.lister import MavenLister MVN_URL = "https://repo1.maven.org/maven2/" # main maven repo url INDEX_URL = "http://indexes/export.fld" # index directory url URL_POM_1 = MVN_URL + "al/aldi/sprova4j/0.1.0/sprova4j-0.1.0.pom" URL_POM_2 = MVN_URL + "al/aldi/sprova4j/0.1.1/sprova4j-0.1.1.pom" URL_POM_3 = MVN_URL + "com/arangodb/arangodb-graphql/1.2/arangodb-graphql-1.2.pom" LIST_GIT = ( "git://github.com/aldialimucaj/sprova4j.git", "https://github.com/aldialimucaj/sprova4j.git", ) LIST_GIT_INCR = ("git://github.com/ArangoDB-Community/arangodb-graphql-java.git",) LIST_SRC = (MVN_URL + "al/aldi/sprova4j",) LIST_SRC_DATA = ( { "type": "maven", "url": "https://repo1.maven.org/maven2/al/aldi/sprova4j" + "/0.1.0/sprova4j-0.1.0-sources.jar", "time": "2021-07-12T17:06:59+00:00", "gid": "al.aldi", "aid": "sprova4j", "version": "0.1.0", "base_url": MVN_URL, }, { "type": "maven", "url": "https://repo1.maven.org/maven2/al/aldi/sprova4j" + "/0.1.1/sprova4j-0.1.1-sources.jar", "time": "2021-07-12T17:37:05+00:00", "gid": "al.aldi", "aid": "sprova4j", "version": "0.1.1", "base_url": MVN_URL, }, ) @pytest.fixture -def maven_index_full(datadir) -> str: - return Path(datadir, "http_indexes", "export_full.fld").read_text() +def maven_index_full(datadir) -> bytes: + return Path(datadir, "http_indexes", "export_full.fld").read_bytes() @pytest.fixture -def maven_index_incr_first(datadir) -> str: - return Path(datadir, "http_indexes", "export_incr_first.fld").read_text() +def maven_index_incr_first(datadir) -> bytes: + return Path(datadir, "http_indexes", "export_incr_first.fld").read_bytes() @pytest.fixture -def maven_pom_1(datadir) -> str: - return Path(datadir, "https_maven.org", "sprova4j-0.1.0.pom").read_text() +def maven_pom_1(datadir) -> bytes: + return Path(datadir, "https_maven.org", "sprova4j-0.1.0.pom").read_bytes() @pytest.fixture -def maven_index_null_mtime(datadir) -> str: - return Path(datadir, "http_indexes", "export_null_mtime.fld").read_text() +def maven_index_null_mtime(datadir) -> bytes: + return Path(datadir, "http_indexes", "export_null_mtime.fld").read_bytes() @pytest.fixture -def maven_pom_1_malformed(datadir) -> str: - return Path(datadir, "https_maven.org", "sprova4j-0.1.0.malformed.pom").read_text() +def maven_pom_1_malformed(datadir) -> bytes: + return Path(datadir, "https_maven.org", "sprova4j-0.1.0.malformed.pom").read_bytes() @pytest.fixture -def maven_pom_2(datadir) -> str: - return Path(datadir, "https_maven.org", "sprova4j-0.1.1.pom").read_text() +def maven_pom_2(datadir) -> bytes: + return Path(datadir, "https_maven.org", "sprova4j-0.1.1.pom").read_bytes() @pytest.fixture -def maven_pom_3(datadir) -> str: - return Path(datadir, "https_maven.org", "arangodb-graphql-1.2.pom").read_text() +def maven_pom_3(datadir) -> bytes: + return Path(datadir, "https_maven.org", "arangodb-graphql-1.2.pom").read_bytes() @pytest.fixture(autouse=True) def network_requests_mock( requests_mock, maven_index_full, maven_pom_1, maven_pom_2, maven_pom_3 ): - requests_mock.get(INDEX_URL, text=maven_index_full) - requests_mock.get(URL_POM_1, text=maven_pom_1) - requests_mock.get(URL_POM_2, text=maven_pom_2) - requests_mock.get(URL_POM_3, text=maven_pom_3) + requests_mock.get(INDEX_URL, content=maven_index_full) + requests_mock.get(URL_POM_1, content=maven_pom_1) + requests_mock.get(URL_POM_2, content=maven_pom_2) + requests_mock.get(URL_POM_3, content=maven_pom_3) def test_maven_full_listing(swh_scheduler): """Covers full listing of multiple pages, checking page results and listed origins, statelessness.""" # Run the lister. lister = MavenLister( scheduler=swh_scheduler, url=MVN_URL, instance="maven.org", index_url=INDEX_URL, incremental=False, ) stats = lister.run() # Start test checks. assert stats.pages == 5 scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results origin_urls = [origin.url for origin in scheduler_origins] # 3 git origins + 1 maven origin with 2 releases (one per jar) assert len(origin_urls) == 4 assert sorted(origin_urls) == sorted(LIST_GIT + LIST_GIT_INCR + LIST_SRC) for origin in scheduler_origins: if origin.visit_type == "maven": for src in LIST_SRC_DATA: last_update_src = iso8601.parse_date(src["time"]) assert last_update_src <= origin.last_update assert origin.extra_loader_arguments["artifacts"] == list(LIST_SRC_DATA) scheduler_state = lister.get_state_from_scheduler() assert scheduler_state is not None assert scheduler_state.last_seen_doc == -1 assert scheduler_state.last_seen_pom == -1 def test_maven_full_listing_malformed( swh_scheduler, requests_mock, maven_pom_1_malformed, ): """Covers full listing of multiple pages, checking page results with a malformed scm entry in pom.""" lister = MavenLister( scheduler=swh_scheduler, url=MVN_URL, instance="maven.org", index_url=INDEX_URL, incremental=False, ) # Set up test. - requests_mock.get(URL_POM_1, text=maven_pom_1_malformed) + requests_mock.get(URL_POM_1, content=maven_pom_1_malformed) # Then run the lister. stats = lister.run() # Start test checks. assert stats.pages == 5 scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results origin_urls = [origin.url for origin in scheduler_origins] # 2 git origins + 1 maven origin with 2 releases (one per jar) assert len(origin_urls) == 3 assert sorted(origin_urls) == sorted((LIST_GIT[1],) + LIST_GIT_INCR + LIST_SRC) for origin in scheduler_origins: if origin.visit_type == "maven": for src in LIST_SRC_DATA: last_update_src = iso8601.parse_date(src["time"]) assert last_update_src <= origin.last_update assert origin.extra_loader_arguments["artifacts"] == list(LIST_SRC_DATA) scheduler_state = lister.get_state_from_scheduler() assert scheduler_state is not None assert scheduler_state.last_seen_doc == -1 assert scheduler_state.last_seen_pom == -1 def test_maven_incremental_listing( swh_scheduler, requests_mock, maven_index_full, maven_index_incr_first, ): """Covers full listing of multiple pages, checking page results and listed origins, with a second updated run for statefulness.""" lister = MavenLister( scheduler=swh_scheduler, url=MVN_URL, instance="maven.org", index_url=INDEX_URL, incremental=True, ) # Set up test. - requests_mock.get(INDEX_URL, text=maven_index_incr_first) + requests_mock.get(INDEX_URL, content=maven_index_incr_first) # Then run the lister. stats = lister.run() # Start test checks. assert lister.incremental assert lister.updated assert stats.pages == 2 scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results origin_urls = [origin.url for origin in scheduler_origins] # 1 git origins + 1 maven origin with 1 release (one per jar) assert len(origin_urls) == 2 assert sorted(origin_urls) == sorted((LIST_GIT[0],) + LIST_SRC) for origin in scheduler_origins: if origin.visit_type == "maven": last_update_src = iso8601.parse_date(LIST_SRC_DATA[0]["time"]) assert last_update_src == origin.last_update assert origin.extra_loader_arguments["artifacts"] == [LIST_SRC_DATA[0]] # Second execution of the lister, incremental mode lister = MavenLister( scheduler=swh_scheduler, url=MVN_URL, instance="maven.org", index_url=INDEX_URL, incremental=True, ) scheduler_state = lister.get_state_from_scheduler() assert scheduler_state is not None assert scheduler_state.last_seen_doc == 1 assert scheduler_state.last_seen_pom == 1 # Set up test. - requests_mock.get(INDEX_URL, text=maven_index_full) + requests_mock.get(INDEX_URL, content=maven_index_full) # Then run the lister. stats = lister.run() # Start test checks. assert lister.incremental assert lister.updated assert stats.pages == 4 scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results origin_urls = [origin.url for origin in scheduler_origins] assert sorted(origin_urls) == sorted(LIST_SRC + LIST_GIT + LIST_GIT_INCR) for origin in scheduler_origins: if origin.visit_type == "maven": for src in LIST_SRC_DATA: last_update_src = iso8601.parse_date(src["time"]) assert last_update_src <= origin.last_update assert origin.extra_loader_arguments["artifacts"] == list(LIST_SRC_DATA) scheduler_state = lister.get_state_from_scheduler() assert scheduler_state is not None assert scheduler_state.last_seen_doc == 4 assert scheduler_state.last_seen_pom == 4 @pytest.mark.parametrize("http_code", [400, 404, 500, 502]) def test_maven_list_http_error_on_index_read(swh_scheduler, requests_mock, http_code): """should stop listing if the lister fails to retrieve the main index url.""" lister = MavenLister(scheduler=swh_scheduler, url=MVN_URL, index_url=INDEX_URL) requests_mock.get(INDEX_URL, status_code=http_code) with pytest.raises(requests.HTTPError): # listing cannot continues so stop lister.run() scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results assert len(scheduler_origins) == 0 @pytest.mark.parametrize("http_code", [400, 404, 500, 502]) def test_maven_list_http_error_artifacts( swh_scheduler, requests_mock, http_code, ): """should continue listing when failing to retrieve artifacts.""" # Test failure of artefacts retrieval. requests_mock.get(URL_POM_1, status_code=http_code) lister = MavenLister(scheduler=swh_scheduler, url=MVN_URL, index_url=INDEX_URL) # on artifacts though, that raises but continue listing lister.run() # If the maven_index_full step succeeded but not the get_pom step, # then we get only one maven-jar origin and one git origin. scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results assert len(scheduler_origins) == 3 def test_maven_lister_null_mtime(swh_scheduler, requests_mock, maven_index_null_mtime): - requests_mock.get(INDEX_URL, text=maven_index_null_mtime) + requests_mock.get(INDEX_URL, content=maven_index_null_mtime) # Run the lister. lister = MavenLister( scheduler=swh_scheduler, url=MVN_URL, instance="maven.org", index_url=INDEX_URL, incremental=False, ) stats = lister.run() # Start test checks. assert stats.pages == 1 scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results assert len(scheduler_origins) == 1 assert scheduler_origins[0].last_update is None + + +def test_maven_list_pom_bad_encoding(swh_scheduler, requests_mock, maven_pom_1): + """should continue listing when failing to decode pom file.""" + # Test failure of pom parsing by reencoding a UTF-8 pom file to a not expected one + requests_mock.get(URL_POM_1, content=maven_pom_1.decode("utf-8").encode("utf-32")) + + lister = MavenLister(scheduler=swh_scheduler, url=MVN_URL, index_url=INDEX_URL) + + lister.run() + + # If the maven_index_full step succeeded but not the pom parsing step, + # then we get only one maven-jar origin and one git origin. + scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results + assert len(scheduler_origins) == 3