diff --git a/swh/lister/golang/lister.py b/swh/lister/golang/lister.py --- a/swh/lister/golang/lister.py +++ b/swh/lister/golang/lister.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from dataclasses import dataclass from datetime import datetime import json import logging @@ -17,14 +18,22 @@ from swh.scheduler.model import ListedOrigin from .. import USER_AGENT -from ..pattern import CredentialsType, StatelessLister +from ..pattern import CredentialsType, Lister logger = logging.getLogger(__name__) + +@dataclass +class GolangStateType: + last_seen: Optional[datetime] = None + """Last timestamp of a package version we have saved. + Used as a starting point for an incremental listing.""" + + GolangPageType = List[Dict[str, Any]] -class GolangLister(StatelessLister[GolangPageType]): +class GolangLister(Lister[GolangStateType, GolangPageType]): """ List all Golang modules and send associated origins to scheduler. @@ -38,7 +47,10 @@ LISTER_NAME = "Golang" def __init__( - self, scheduler: SchedulerInterface, credentials: CredentialsType = None, + self, + scheduler: SchedulerInterface, + incremental: bool = False, + credentials: CredentialsType = None, ): super().__init__( scheduler=scheduler, @@ -51,6 +63,29 @@ self.session.headers.update( {"Accept": "application/json", "User-Agent": USER_AGENT} ) + self.incremental = incremental + + def state_from_dict(self, d: Dict[str, Any]) -> GolangStateType: + as_string = d.get("last_seen") + last_seen = iso8601.parse_date(as_string) if as_string is not None else None + return GolangStateType(last_seen=last_seen) + + def state_to_dict(self, state: GolangStateType) -> Dict[str, Any]: + return { + "last_seen": state.last_seen.isoformat() + if state.last_seen is not None + else None + } + + def finalize(self): + if self.state.last_seen is not None: + scheduler_state = self.get_state_from_scheduler() + + if ( + scheduler_state.last_seen is None + or self.state.last_seen > scheduler_state.last_seen + ): + self.updated = True @throttling_retry( retry=retry_policy_generic, @@ -108,17 +143,25 @@ return page, since def get_pages(self) -> Iterator[GolangPageType]: - page, since = self.get_single_page() - last_since = since + since = None + if self.incremental: + since = self.state.last_seen + page, since = self.get_single_page(since=since) + if since == self.state.last_seen: + # The index returns packages whose timestamp are greater or + # equal to the date provided as parameter, which will create + # an infinite loop if not stopped here. + return [], since + if since is not None: + self.state.last_seen = since + while page: yield page page, since = self.get_single_page(since=since) - if last_since == since: - # The index returns packages whose timestamp are greater or - # equal to the date provided as parameter, which will create - # an infinite loop if not stopped here. - return [] - last_since = since + if since == self.state.last_seen: + return [], since + if since is not None: + self.state.last_seen = since def get_origins_from_page(self, page: GolangPageType) -> Iterator[ListedOrigin]: """ diff --git a/swh/lister/golang/tasks.py b/swh/lister/golang/tasks.py --- a/swh/lister/golang/tasks.py +++ b/swh/lister/golang/tasks.py @@ -13,6 +13,13 @@ return GolangLister.from_configfile(**lister_args).run().dict() +@shared_task(name=__name__ + ".IncrementalGolangLister") +def list_golang_incremental(**lister_args): + """Incremental update of Golang packages""" + lister = GolangLister.from_configfile(incremental=True, **lister_args) + return lister.run().dict() + + @shared_task(name=__name__ + ".ping") def _ping(): return "OK" diff --git a/swh/lister/golang/tests/test_lister.py b/swh/lister/golang/tests/test_lister.py --- a/swh/lister/golang/tests/test_lister.py +++ b/swh/lister/golang/tests/test_lister.py @@ -3,11 +3,12 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime from pathlib import Path import iso8601 -from swh.lister.golang.lister import GolangLister +from swh.lister.golang.lister import GolangLister, GolangStateType from swh.lister.tests.test_utils import assert_sleep_calls from swh.lister.utils import WAIT_EXP_BASE @@ -88,3 +89,105 @@ assert stats.pages == 3 assert stats.origins == 18 + + +def test_golang_lister_incremental(swh_scheduler, requests_mock, datadir, mocker): + # first listing, should return one origin per package + lister = GolangLister(scheduler=swh_scheduler, incremental=True) + mock = mocker.spy(lister, "get_single_page") + + responses = [ + {"text": Path(datadir, "page-1.txt").read_text(), "status_code": 200}, + ] + requests_mock.get(GolangLister.GOLANG_MODULES_INDEX_URL, responses) + + stats = lister.run() + + page1_last_timestamp = datetime.datetime( + 2019, 4, 11, 18, 47, 29, 390564, tzinfo=datetime.timezone.utc + ) + page2_last_timestamp = datetime.datetime( + 2019, 4, 15, 13, 54, 35, 250835, tzinfo=datetime.timezone.utc + ) + page3_last_timestamp = datetime.datetime( + 2019, 4, 18, 2, 7, 41, 336899, tzinfo=datetime.timezone.utc + ) + mock.assert_has_calls( + [ + # First call has no state + mocker.call(since=None), + # Second call is the last timestamp in the listed page + mocker.call(since=page1_last_timestamp), + ] + ) + + assert lister.get_state_from_scheduler() == GolangStateType( + last_seen=page1_last_timestamp + ) + + assert stats.pages == 1 + assert stats.origins == 5 + + # Incremental should list nothing + lister = GolangLister(scheduler=swh_scheduler, incremental=True) + mock = mocker.spy(lister, "get_single_page") + stats = lister.run() + mock.assert_has_calls([mocker.call(since=page1_last_timestamp)]) + assert stats.pages == 0 + assert stats.origins == 0 + + # Add more responses + responses = [ + {"text": Path(datadir, "page-2.txt").read_text(), "status_code": 200}, + ] + + requests_mock.get(GolangLister.GOLANG_MODULES_INDEX_URL, responses) + + # Incremental should list new page + lister = GolangLister(scheduler=swh_scheduler, incremental=True) + mock = mocker.spy(lister, "get_single_page") + stats = lister.run() + mock.assert_has_calls( + [ + mocker.call(since=page1_last_timestamp), + mocker.call(since=page2_last_timestamp), + ] + ) + assert stats.pages == 1 + assert stats.origins == 4 + + # Incremental should list nothing again + lister = GolangLister(scheduler=swh_scheduler, incremental=True) + mock = mocker.spy(lister, "get_single_page") + stats = lister.run() + assert stats.pages == 0 + assert stats.origins == 0 + mock.assert_has_calls([mocker.call(since=page2_last_timestamp)]) + + # Add yet more responses + responses = [ + {"text": Path(datadir, "page-3.txt").read_text(), "status_code": 200}, + ] + + requests_mock.get(GolangLister.GOLANG_MODULES_INDEX_URL, responses) + + # Incremental should list new page again + lister = GolangLister(scheduler=swh_scheduler, incremental=True) + mock = mocker.spy(lister, "get_single_page") + stats = lister.run() + assert stats.pages == 1 + assert stats.origins == 9 + mock.assert_has_calls( + [ + mocker.call(since=page2_last_timestamp), + mocker.call(since=page3_last_timestamp), + ] + ) + + # Incremental should list nothing one last time + lister = GolangLister(scheduler=swh_scheduler, incremental=True) + mock = mocker.spy(lister, "get_single_page") + stats = lister.run() + assert stats.pages == 0 + assert stats.origins == 0 + mock.assert_has_calls([mocker.call(since=page3_last_timestamp)]) diff --git a/swh/lister/golang/tests/test_tasks.py b/swh/lister/golang/tests/test_tasks.py --- a/swh/lister/golang/tests/test_tasks.py +++ b/swh/lister/golang/tests/test_tasks.py @@ -30,3 +30,23 @@ lister.from_configfile.assert_called_once_with() lister.run.assert_called_once_with() + + +def test_golang_incremental_listing_task( + swh_scheduler_celery_app, swh_scheduler_celery_worker, mocker +): + lister = mocker.patch("swh.lister.golang.tasks.GolangLister") + lister.from_configfile.return_value = lister + stats = ListerStats(pages=1, origins=28000) + lister.run.return_value = stats + + res = swh_scheduler_celery_app.send_task( + "swh.lister.golang.tasks.IncrementalGolangLister" + ) + assert res + res.wait() + assert res.successful() + assert res.result == stats.dict() + + lister.from_configfile.assert_called_once_with(incremental=True) + lister.run.assert_called_once_with()