diff --git a/swh/lister/bitbucket/lister.py b/swh/lister/bitbucket/lister.py --- a/swh/lister/bitbucket/lister.py +++ b/swh/lister/bitbucket/lister.py @@ -3,83 +3,140 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from datetime import datetime, timezone +from dataclasses import asdict, dataclass +from datetime import datetime import logging -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Iterator, List, Optional from urllib import parse import iso8601 -from requests import Response +import requests -from swh.lister.bitbucket.models import BitBucketModel -from swh.lister.core.indexing_lister import IndexingHttpLister +from swh.scheduler.interface import SchedulerInterface +from swh.scheduler.model import ListedOrigin + +from .. import USER_AGENT +from ..pattern import CredentialsType, Lister logger = logging.getLogger(__name__) -class BitBucketLister(IndexingHttpLister): - PATH_TEMPLATE = "/repositories?after=%s" - MODEL = BitBucketModel +@dataclass +class BitbucketListerState: + """State of my lister""" + + last_repo_cdate: Optional[datetime] = None + """Date and time of the last repository listed on an incremental pass""" + + +class BitbucketLister(Lister[BitbucketListerState, List[Dict[str, Any]]]): + """List origins from Bitbucket. + + """ + LISTER_NAME = "bitbucket" - DEFAULT_URL = "https://api.bitbucket.org/2.0" - instance = "bitbucket" - default_min_bound = datetime.fromtimestamp(0, timezone.utc) # type: Any + INSTANCE = "bitbucket" + + API_URL = "https://api.bitbucket.org/2.0" + QUERY_TEMPLATE = "/repositories?pagelen={}&after={{}}" + PAGE_SIZE = 100 def __init__( - self, url: str = None, override_config=None, per_page: int = 100 - ) -> None: - super().__init__(url=url, override_config=override_config) - per_page = self.config.get("per_page", per_page) - - self.PATH_TEMPLATE = "%s&pagelen=%s" % (self.PATH_TEMPLATE, per_page) - - def get_model_from_repo(self, repo: Dict) -> Dict[str, Any]: - return { - "uid": repo["uuid"], - "indexable": iso8601.parse_date(repo["created_on"]), - "name": repo["name"], - "full_name": repo["full_name"], - "html_url": repo["links"]["html"]["href"], - "origin_url": repo["links"]["clone"][0]["href"], - "origin_type": repo["scm"], - } - - def get_next_target_from_response(self, response: Response) -> Optional[datetime]: - """This will read the 'next' link from the api response if any - and return it as a datetime. - - Args: - response (Response): requests' response from api call - - Returns: - next date as a datetime + self, + scheduler: SchedulerInterface, + per_page: int = 100, + credentials: CredentialsType = None, + ): + super().__init__( + scheduler=scheduler, + credentials=credentials, + url=self.API_URL, + instance=self.INSTANCE, + ) + + self.url_template = f"{self.API_URL}{self.QUERY_TEMPLATE}".format(per_page) + + self.session = requests.Session() + self.session.headers.update( + {"Accept": "application/json", "User-Agent": USER_AGENT} + ) + + def state_from_dict(self, d: Dict[str, Any]) -> BitbucketListerState: + d["last_repo_cdate"] = iso8601.parse_date(d["last_repo_cdate"]) + return BitbucketListerState(**d) + + def state_to_dict(self, state: BitbucketListerState) -> Dict[str, Any]: + d = asdict(state) + d["last_repo_cdate"] = d["last_repo_cdate"].isoformat() + return d + + def get_pages(self) -> Iterator[List[Dict[str, Any]]]: + + last_repo_cdate: Optional[str] = "1970-01-01" + if self.state is not None and self.state.last_repo_cdate is not None: + last_repo_cdate = parse.quote(self.state.last_repo_cdate.isoformat()) + + while last_repo_cdate is not None: + url = self.url_template.format(last_repo_cdate) + logger.debug("Page URL:", url) + + response = self.session.get(url) + + # handle HTTP errors + if response.status_code != 200: + logger.warning( + "Got unexpected status_code %s: %s", + response.status_code, + response.content, + ) + break + + body = response.json() + next_page_url = body.get("next") + if next_page_url is not None: + next_page_url = parse.urlparse(next_page_url) + if not next_page_url.query: + logger.warning("Failed to parse url %s", next_page_url) + break + last_repo_cdate = parse.parse_qs(next_page_url.query)["after"][0] + else: + last_repo_cdate = None + + yield body["values"] + + def get_origins_from_page( + self, page: List[Dict[str, Any]] + ) -> Iterator[ListedOrigin]: + """Convert a page of Bitbucket repositories into a list of ListedOrigins. """ - body = response.json() - next_ = body.get("next") - if next_ is not None: - next_ = parse.urlparse(next_) - return iso8601.parse_date(parse.parse_qs(next_.query)["after"][0]) - return None - - def transport_response_simplified(self, response: Response) -> List[Dict[str, Any]]: - repos = response.json()["values"] - return [self.get_model_from_repo(repo) for repo in repos] - - def request_uri(self, identifier: datetime) -> str: # type: ignore - identifier_str = parse.quote(identifier.isoformat()) - return super().request_uri(identifier_str or "1970-01-01") - - def is_within_bounds( - self, inner: int, lower: Optional[int] = None, upper: Optional[int] = None - ) -> bool: - # values are expected to be datetimes - if lower is None and upper is None: - ret = True - elif lower is None: - ret = inner <= upper # type: ignore - elif upper is None: - ret = inner >= lower - else: - ret = lower <= inner <= upper - return ret + assert self.lister_obj.id is not None + + for repo in page: + last_update = iso8601.parse_date(repo["updated_on"]) + origin_url = repo["links"]["clone"][0]["href"] + origin_type = repo["scm"] + + yield ListedOrigin( + lister_id=self.lister_obj.id, + url=origin_url, + visit_type=origin_type, + last_update=last_update, + ) + + def commit_page(self, page: List[Dict[str, Any]]): + """Update the currently stored state using the latest listed page""" + + last_repo = page[-1] + last_repo_cdate = iso8601.parse_date(last_repo["created_on"]) + if last_repo_cdate > self.state.last_repo_cdate: + self.state.last_repo_cdate = last_repo_cdate + + def finalize(self): + + scheduler_state = self.get_state_from_scheduler() + + # Update the lister state in the backend only if the last seen id of + # the current run is higher than that stored in the database. + if self.state.last_repo_cdate > scheduler_state.last_repo_cdate: + self.updated = True diff --git a/swh/lister/bitbucket/tasks.py b/swh/lister/bitbucket/tasks.py --- a/swh/lister/bitbucket/tasks.py +++ b/swh/lister/bitbucket/tasks.py @@ -6,7 +6,7 @@ from celery import group, shared_task -from .lister import BitBucketLister +from .lister import BitbucketLister GROUP_SPLIT = 10000 @@ -14,13 +14,13 @@ @shared_task(name=__name__ + ".IncrementalBitBucketLister") def list_bitbucket_incremental(**lister_args): """Incremental update of the BitBucket forge""" - lister = BitBucketLister(**lister_args) - return lister.run(min_bound=lister.db_last_index(), max_bound=None) + lister = BitbucketLister(**lister_args) + return lister.run().dict() @shared_task(name=__name__ + ".RangeBitBucketLister") def _range_bitbucket_lister(start, end, **lister_args): - lister = BitBucketLister(**lister_args) + lister = BitbucketLister(**lister_args) return lister.run(min_bound=start, max_bound=end) @@ -31,7 +31,7 @@ It's not to be called for an initial listing. """ - lister = BitBucketLister(**lister_args) + lister = BitbucketLister(**lister_args) ranges = lister.db_partition_indices(split or GROUP_SPLIT) if not ranges: self.log.info("Nothing to list") @@ -46,7 +46,6 @@ promise.save() # so that we can restore the GroupResult in tests except (NotImplementedError, AttributeError): self.log.info("Unable to call save_group with current result backend.") - # FIXME: what to do in terms of return here? return promise.id diff --git a/swh/lister/bitbucket/tests/test_lister.py b/swh/lister/bitbucket/tests/test_lister.py --- a/swh/lister/bitbucket/tests/test_lister.py +++ b/swh/lister/bitbucket/tests/test_lister.py @@ -11,7 +11,7 @@ import iso8601 import requests_mock -from swh.lister.bitbucket.lister import BitBucketLister +from swh.lister.bitbucket.lister import BitbucketLister from swh.lister.core.tests.test_lister import HttpListerTester @@ -23,8 +23,8 @@ return iso8601.parse_date(unquote(req_index)) -class BitBucketListerTester(HttpListerTester, unittest.TestCase): - Lister = BitBucketLister +class BitbucketListerTester(HttpListerTester, unittest.TestCase): + Lister = BitbucketLister test_re = re.compile(r"/repositories\?after=([^?&]+)") lister_subdir = "bitbucket" good_api_response_file = "data/https_api.bitbucket.org/response.json"