diff --git a/swh/lister/golang/lister.py b/swh/lister/golang/lister.py index e0fb2db..0d6b2b9 100644 --- a/swh/lister/golang/lister.py +++ b/swh/lister/golang/lister.py @@ -1,145 +1,188 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from dataclasses import dataclass from datetime import datetime import json import logging from typing import Any, Dict, Iterator, List, Optional, Tuple import iso8601 import requests from tenacity import before_sleep_log from swh.lister.utils import retry_policy_generic, throttling_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin from .. import USER_AGENT -from ..pattern import CredentialsType, StatelessLister +from ..pattern import CredentialsType, Lister logger = logging.getLogger(__name__) + +@dataclass +class GolangStateType: + last_seen: Optional[datetime] = None + """Last timestamp of a package version we have saved. + Used as a starting point for an incremental listing.""" + + GolangPageType = List[Dict[str, Any]] -class GolangLister(StatelessLister[GolangPageType]): +class GolangLister(Lister[GolangStateType, GolangPageType]): """ List all Golang modules and send associated origins to scheduler. The lister queries the Golang module index, whose documentation can be found at https://index.golang.org """ GOLANG_MODULES_INDEX_URL = "https://index.golang.org/index" # `limit` seems to be... limited to 2000. GOLANG_MODULES_INDEX_LIMIT = 2000 LISTER_NAME = "Golang" def __init__( - self, scheduler: SchedulerInterface, credentials: CredentialsType = None, + self, + scheduler: SchedulerInterface, + incremental: bool = False, + credentials: CredentialsType = None, ): super().__init__( scheduler=scheduler, url=self.GOLANG_MODULES_INDEX_URL, instance="Golang", credentials=credentials, ) self.session = requests.Session() self.session.headers.update( {"Accept": "application/json", "User-Agent": USER_AGENT} ) + self.incremental = incremental + + def state_from_dict(self, d: Dict[str, Any]) -> GolangStateType: + as_string = d.get("last_seen") + last_seen = iso8601.parse_date(as_string) if as_string is not None else None + return GolangStateType(last_seen=last_seen) + + def state_to_dict(self, state: GolangStateType) -> Dict[str, Any]: + return { + "last_seen": state.last_seen.isoformat() + if state.last_seen is not None + else None + } + + def finalize(self): + if self.incremental and self.state.last_seen is not None: + scheduler_state = self.get_state_from_scheduler() + + if ( + scheduler_state.last_seen is None + or self.state.last_seen > scheduler_state.last_seen + ): + self.updated = True @throttling_retry( retry=retry_policy_generic, before_sleep=before_sleep_log(logger, logging.WARNING), ) def api_request(self, url: str) -> List[str]: logger.debug("Fetching URL %s", url) response = self.session.get(url) if response.status_code not in (200, 304): # Log response content to ease debugging logger.warning( "Unexpected HTTP status code %s for URL %s", response.status_code, response.url, ) response.raise_for_status() return response.text.split() def get_single_page( self, since: Optional[datetime] = None ) -> Tuple[GolangPageType, Optional[datetime]]: """Return a page from the API and the timestamp of its last entry. Since all entries are sorted by chronological order, the timestamp is useful both for pagination and later for incremental runs.""" url = f"{self.url}?limit={self.GOLANG_MODULES_INDEX_LIMIT}" if since is not None: # The Golang index does not understand `+00:00` for some reason # and expects the "timezone zero" notation instead. This works # because all times are UTC. utc_offset = since.utcoffset() assert ( utc_offset is not None and utc_offset.total_seconds() == 0 ), "Non-UTC datetime" as_date = since.isoformat().replace("+00:00", "Z") url = f"{url}&since={as_date}" entries = self.api_request(url) page: GolangPageType = [] if not entries: return page, since for as_json in entries: entry = json.loads(as_json) timestamp = iso8601.parse_date(entry["Timestamp"]) # We've already parsed it and we'll need the datetime later, save it entry["Timestamp"] = timestamp page.append(entry) # The index is guaranteed to be sorted in chronological order since = timestamp return page, since def get_pages(self) -> Iterator[GolangPageType]: - page, since = self.get_single_page() - last_since = since + since = None + if self.incremental: + since = self.state.last_seen + page, since = self.get_single_page(since=since) + if since == self.state.last_seen: + # The index returns packages whose timestamp are greater or + # equal to the date provided as parameter, which will create + # an infinite loop if not stopped here. + return [], since + if since is not None: + self.state.last_seen = since + while page: yield page page, since = self.get_single_page(since=since) - if last_since == since: - # The index returns packages whose timestamp are greater or - # equal to the date provided as parameter, which will create - # an infinite loop if not stopped here. - return [] - last_since = since + if since == self.state.last_seen: + return [], since + if since is not None: + self.state.last_seen = since def get_origins_from_page(self, page: GolangPageType) -> Iterator[ListedOrigin]: """ Iterate on all Golang projects and yield ListedOrigin instances. """ assert self.lister_obj.id is not None for module in page: path = module["Path"] # The loader will be expected to use the golang proxy to do the # actual downloading. We're using `pkg.go.dev` so that the URL points # to somewhere useful for a human instead of an (incomplete) API path. origin_url = f"https://pkg.go.dev/{path}" # Since the Go index lists versions and not just packages, there will # be duplicates. Fortunately, `ListedOrigins` are "upserted" server-side, # so only the last timestamp will be used, with no duplicates. # Performance should not be an issue as they are sent to the db in bulk. yield ListedOrigin( lister_id=self.lister_obj.id, url=origin_url, visit_type="golang", last_update=module["Timestamp"], ) diff --git a/swh/lister/golang/tasks.py b/swh/lister/golang/tasks.py index bc7b895..3bbba0d 100644 --- a/swh/lister/golang/tasks.py +++ b/swh/lister/golang/tasks.py @@ -1,18 +1,25 @@ # Copyright (C) 2022 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import shared_task from .lister import GolangLister @shared_task(name=__name__ + ".FullGolangLister") def list_golang(**lister_args): "List the Golang module registry" return GolangLister.from_configfile(**lister_args).run().dict() +@shared_task(name=__name__ + ".IncrementalGolangLister") +def list_golang_incremental(**lister_args): + """Incremental update of Golang packages""" + lister = GolangLister.from_configfile(incremental=True, **lister_args) + return lister.run().dict() + + @shared_task(name=__name__ + ".ping") def _ping(): return "OK" diff --git a/swh/lister/golang/tests/test_lister.py b/swh/lister/golang/tests/test_lister.py index 9e9096a..4ebe3a5 100644 --- a/swh/lister/golang/tests/test_lister.py +++ b/swh/lister/golang/tests/test_lister.py @@ -1,90 +1,193 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime from pathlib import Path import iso8601 -from swh.lister.golang.lister import GolangLister +from swh.lister.golang.lister import GolangLister, GolangStateType from swh.lister.tests.test_utils import assert_sleep_calls from swh.lister.utils import WAIT_EXP_BASE # https://pkg.go.dev prefix omitted expected_listed = [ ("collectd.org", "2019-04-11T18:47:25.450546+00:00"), ("github.com/blang/semver", "2019-04-15T13:54:39.107258+00:00",), ("github.com/bmizerany/pat", "2019-04-11T18:47:29.390564+00:00",), ("github.com/djherbis/buffer", "2019-04-11T18:47:29.974874+00:00",), ("github.com/djherbis/nio", "2019-04-11T18:47:32.283312+00:00",), ("github.com/gobuffalo/buffalo-plugins", "2019-04-15T13:54:34.222985+00:00",), ("github.com/gobuffalo/buffalo-pop", "2019-04-15T13:54:39.135792+00:00",), ("github.com/gobuffalo/clara", "2019-04-15T13:54:40.651916+00:00",), ("github.com/gobuffalo/genny", "2019-04-15T13:54:37.841547+00:00",), ("github.com/gobuffalo/packr", "2019-04-15T13:54:35.688900+00:00",), ("github.com/markbates/refresh", "2019-04-15T13:54:35.250835+00:00",), ("github.com/mitchellh/go-homedir", "2019-04-15T13:54:35.678214+00:00",), ("github.com/nats-io/nuid", "2019-04-11T18:47:28.102348+00:00",), ("github.com/oklog/ulid", "2019-04-11T18:47:23.234198+00:00",), ("github.com/pkg/errors", "2019-04-18T02:07:41.336899+00:00",), ("golang.org/x/sys", "2019-04-15T13:54:37.555525+00:00",), ("golang.org/x/text", "2019-04-10T19:08:52.997264+00:00"), # only one x/tools listed even though there are two version, and only the # latest one's timestamp is used. ("golang.org/x/tools", "2019-04-15T13:54:41.905064+00:00",), ] def _generate_responses(datadir, requests_mock): responses = [] for file in Path(datadir).glob("page-*.txt"): # Test that throttling and server errors are retries responses.append({"text": "", "status_code": 429}) responses.append({"text": "", "status_code": 500}) # Also test that the lister appropriately gets out of the infinite loop responses.append({"text": file.read_text(), "status_code": 200}) requests_mock.get(GolangLister.GOLANG_MODULES_INDEX_URL, responses) def test_golang_lister(swh_scheduler, mocker, requests_mock, datadir): # first listing, should return one origin per package lister = GolangLister(scheduler=swh_scheduler) # Exponential retries take a long time, so stub time.sleep mocked_sleep = mocker.patch.object(lister.api_request.retry, "sleep") _generate_responses(datadir, requests_mock) stats = lister.run() assert stats.pages == 3 # The two `golang.org/x/tools` versions are *not* listed as separate origins assert stats.origins == 18 scheduler_origins = sorted( swh_scheduler.get_listed_origins(lister.lister_obj.id).results, key=lambda x: x.url, ) for scheduled, (url, timestamp) in zip(scheduler_origins, expected_listed): assert scheduled.url == f"https://pkg.go.dev/{url}" assert scheduled.last_update == iso8601.parse_date(timestamp) assert scheduled.visit_type == "golang" assert len(scheduler_origins) == len(expected_listed) # Test `time.sleep` is called with exponential retries assert_sleep_calls( mocker, mocked_sleep, [1, WAIT_EXP_BASE, 1, WAIT_EXP_BASE, 1, WAIT_EXP_BASE] ) # doing it all again (without incremental) should give us the same result lister = GolangLister(scheduler=swh_scheduler) mocked_sleep = mocker.patch.object(lister.api_request.retry, "sleep") _generate_responses(datadir, requests_mock) stats = lister.run() assert stats.pages == 3 assert stats.origins == 18 + + +def test_golang_lister_incremental(swh_scheduler, requests_mock, datadir, mocker): + # first listing, should return one origin per package + lister = GolangLister(scheduler=swh_scheduler, incremental=True) + mock = mocker.spy(lister, "get_single_page") + + responses = [ + {"text": Path(datadir, "page-1.txt").read_text(), "status_code": 200}, + ] + requests_mock.get(GolangLister.GOLANG_MODULES_INDEX_URL, responses) + + stats = lister.run() + + page1_last_timestamp = datetime.datetime( + 2019, 4, 11, 18, 47, 29, 390564, tzinfo=datetime.timezone.utc + ) + page2_last_timestamp = datetime.datetime( + 2019, 4, 15, 13, 54, 35, 250835, tzinfo=datetime.timezone.utc + ) + page3_last_timestamp = datetime.datetime( + 2019, 4, 18, 2, 7, 41, 336899, tzinfo=datetime.timezone.utc + ) + mock.assert_has_calls( + [ + # First call has no state + mocker.call(since=None), + # Second call is the last timestamp in the listed page + mocker.call(since=page1_last_timestamp), + ] + ) + + assert lister.get_state_from_scheduler() == GolangStateType( + last_seen=page1_last_timestamp + ) + + assert stats.pages == 1 + assert stats.origins == 5 + + # Incremental should list nothing + lister = GolangLister(scheduler=swh_scheduler, incremental=True) + mock = mocker.spy(lister, "get_single_page") + stats = lister.run() + mock.assert_has_calls([mocker.call(since=page1_last_timestamp)]) + assert stats.pages == 0 + assert stats.origins == 0 + + # Add more responses + responses = [ + {"text": Path(datadir, "page-2.txt").read_text(), "status_code": 200}, + ] + + requests_mock.get(GolangLister.GOLANG_MODULES_INDEX_URL, responses) + + # Incremental should list new page + lister = GolangLister(scheduler=swh_scheduler, incremental=True) + mock = mocker.spy(lister, "get_single_page") + stats = lister.run() + mock.assert_has_calls( + [ + mocker.call(since=page1_last_timestamp), + mocker.call(since=page2_last_timestamp), + ] + ) + assert stats.pages == 1 + assert stats.origins == 4 + + # Incremental should list nothing again + lister = GolangLister(scheduler=swh_scheduler, incremental=True) + mock = mocker.spy(lister, "get_single_page") + stats = lister.run() + assert stats.pages == 0 + assert stats.origins == 0 + mock.assert_has_calls([mocker.call(since=page2_last_timestamp)]) + + # Add yet more responses + responses = [ + {"text": Path(datadir, "page-3.txt").read_text(), "status_code": 200}, + ] + + requests_mock.get(GolangLister.GOLANG_MODULES_INDEX_URL, responses) + + # Incremental should list new page again + lister = GolangLister(scheduler=swh_scheduler, incremental=True) + mock = mocker.spy(lister, "get_single_page") + stats = lister.run() + assert stats.pages == 1 + assert stats.origins == 9 + mock.assert_has_calls( + [ + mocker.call(since=page2_last_timestamp), + mocker.call(since=page3_last_timestamp), + ] + ) + + # Incremental should list nothing one last time + lister = GolangLister(scheduler=swh_scheduler, incremental=True) + mock = mocker.spy(lister, "get_single_page") + stats = lister.run() + assert stats.pages == 0 + assert stats.origins == 0 + mock.assert_has_calls([mocker.call(since=page3_last_timestamp)]) diff --git a/swh/lister/golang/tests/test_tasks.py b/swh/lister/golang/tests/test_tasks.py index 414cce4..92458cc 100644 --- a/swh/lister/golang/tests/test_tasks.py +++ b/swh/lister/golang/tests/test_tasks.py @@ -1,32 +1,52 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.lister.pattern import ListerStats def test_ping(swh_scheduler_celery_app, swh_scheduler_celery_worker): res = swh_scheduler_celery_app.send_task("swh.lister.golang.tasks.ping") assert res res.wait() assert res.successful() assert res.result == "OK" def test_golang_full_listing_task( swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker ): lister = mocker.patch("swh.lister.golang.tasks.GolangLister") lister.from_configfile.return_value = lister stats = ListerStats(pages=1, origins=28000) lister.run.return_value = stats res = swh_scheduler_celery_app.send_task("swh.lister.golang.tasks.FullGolangLister") assert res res.wait() assert res.successful() assert res.result == stats.dict() lister.from_configfile.assert_called_once_with() lister.run.assert_called_once_with() + + +def test_golang_incremental_listing_task( + swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker +): + lister = mocker.patch("swh.lister.golang.tasks.GolangLister") + lister.from_configfile.return_value = lister + stats = ListerStats(pages=1, origins=28000) + lister.run.return_value = stats + + res = swh_scheduler_celery_app.send_task( + "swh.lister.golang.tasks.IncrementalGolangLister" + ) + assert res + res.wait() + assert res.successful() + assert res.result == stats.dict() + + lister.from_configfile.assert_called_once_with(incremental=True) + lister.run.assert_called_once_with()