Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/gitlab/lister.py
# Copyright (C) 2018-2021 The Software Heritage developers | # Copyright (C) 2018-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from dataclasses import asdict, dataclass | from dataclasses import asdict, dataclass | ||||
from datetime import datetime, timezone | |||||
import logging | import logging | ||||
import random | import random | ||||
from typing import Any, Dict, Iterator, Optional, Tuple | from typing import Any, Dict, Iterator, Optional, Tuple | ||||
from urllib.parse import parse_qs, urlencode, urlparse | from urllib.parse import parse_qs, urlencode, urlparse | ||||
import iso8601 | import iso8601 | ||||
import requests | import requests | ||||
from requests.exceptions import HTTPError | from requests.exceptions import HTTPError | ||||
Show All 11 Lines | |||||
# Some instance provides hg_git type which can be ingested as hg origins | # Some instance provides hg_git type which can be ingested as hg origins | ||||
VCS_MAPPING = {"hg_git": "hg"} | VCS_MAPPING = {"hg_git": "hg"} | ||||
@dataclass | @dataclass | ||||
class GitLabListerState: | class GitLabListerState: | ||||
"""State of the GitLabLister""" | """State of the GitLabLister""" | ||||
last_seen_next_link: Optional[str] = None | last_listing_date: Optional[str] = None | ||||
"""Last link header (not visited yet) during an incremental pass | """Last date when listing started during an incremental pass | ||||
""" | """ | ||||
Repository = Dict[str, Any] | Repository = Dict[str, Any] | ||||
@dataclass | @dataclass | ||||
class PageResult: | class PageResult: | ||||
▲ Show 20 Lines • Show All 67 Lines • ▼ Show 20 Lines | ): | ||||
scheduler=scheduler, | scheduler=scheduler, | ||||
url=url.rstrip("/"), | url=url.rstrip("/"), | ||||
instance=instance, | instance=instance, | ||||
credentials=credentials, | credentials=credentials, | ||||
) | ) | ||||
self.incremental = incremental | self.incremental = incremental | ||||
self.last_page: Optional[str] = None | self.last_page: Optional[str] = None | ||||
self.per_page = 100 | self.per_page = 100 | ||||
self.listing_date = datetime.now(timezone.utc).isoformat() | |||||
self.session = requests.Session() | self.session = requests.Session() | ||||
self.session.headers.update( | self.session.headers.update( | ||||
{"Accept": "application/json", "User-Agent": USER_AGENT} | {"Accept": "application/json", "User-Agent": USER_AGENT} | ||||
) | ) | ||||
if len(self.credentials) > 0: | if len(self.credentials) > 0: | ||||
cred = random.choice(self.credentials) | cred = random.choice(self.credentials) | ||||
logger.info( | logger.info( | ||||
"Using %s credentials from user %s", self.instance, cred["username"] | "Using %s credentials from user %s", self.instance, cred["username"] | ||||
) | ) | ||||
api_token = cred["password"] | api_token = cred["password"] | ||||
if api_token: | if api_token: | ||||
self.session.headers["Authorization"] = f"Bearer {api_token}" | self.session.headers["Authorization"] = f"Bearer {api_token}" | ||||
def state_from_dict(self, d: Dict[str, Any]) -> GitLabListerState: | def state_from_dict(self, d: Dict[str, Any]) -> GitLabListerState: | ||||
return GitLabListerState(**d) | return GitLabListerState(last_listing_date=d.get("last_listing_date")) | ||||
def state_to_dict(self, state: GitLabListerState) -> Dict[str, Any]: | def state_to_dict(self, state: GitLabListerState) -> Dict[str, Any]: | ||||
return asdict(state) | return asdict(state) | ||||
@throttling_retry( | @throttling_retry( | ||||
retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING) | retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING) | ||||
) | ) | ||||
def get_page_result(self, url: str) -> PageResult: | def get_page_result(self, url: str) -> PageResult: | ||||
Show All 38 Lines | def page_url(self, id_after: Optional[int] = None) -> str: | ||||
"pagination": "keyset", | "pagination": "keyset", | ||||
"order_by": "id", | "order_by": "id", | ||||
"sort": "asc", | "sort": "asc", | ||||
"simple": "true", | "simple": "true", | ||||
"per_page": f"{self.per_page}", | "per_page": f"{self.per_page}", | ||||
} | } | ||||
if id_after is not None: | if id_after is not None: | ||||
parameters["id_after"] = str(id_after) | parameters["id_after"] = str(id_after) | ||||
if self.incremental and self.state and self.state.last_listing_date: | |||||
parameters["last_activity_after"] = self.state.last_listing_date | |||||
return f"{self.url}/projects?{urlencode(parameters)}" | return f"{self.url}/projects?{urlencode(parameters)}" | ||||
def get_pages(self) -> Iterator[PageResult]: | def get_pages(self) -> Iterator[PageResult]: | ||||
next_page: Optional[str] | |||||
if self.incremental and self.state and self.state.last_seen_next_link: | |||||
next_page = self.state.last_seen_next_link | |||||
else: | |||||
next_page = self.page_url() | next_page = self.page_url() | ||||
while next_page: | while next_page: | ||||
self.last_page = next_page | self.last_page = next_page | ||||
page_result = self.get_page_result(next_page) | page_result = self.get_page_result(next_page) | ||||
yield page_result | yield page_result | ||||
next_page = page_result.next_page | next_page = page_result.next_page | ||||
def get_origins_from_page(self, page_result: PageResult) -> Iterator[ListedOrigin]: | def get_origins_from_page(self, page_result: PageResult) -> Iterator[ListedOrigin]: | ||||
assert self.lister_obj.id is not None | assert self.lister_obj.id is not None | ||||
repositories = page_result.repositories if page_result.repositories else [] | repositories = page_result.repositories if page_result.repositories else [] | ||||
for repo in repositories: | for repo in repositories: | ||||
visit_type = repo.get("vcs_type", "git") | visit_type = repo.get("vcs_type", "git") | ||||
visit_type = VCS_MAPPING.get(visit_type, visit_type) | visit_type = VCS_MAPPING.get(visit_type, visit_type) | ||||
yield ListedOrigin( | yield ListedOrigin( | ||||
lister_id=self.lister_obj.id, | lister_id=self.lister_obj.id, | ||||
url=repo["http_url_to_repo"], | url=repo["http_url_to_repo"], | ||||
visit_type=visit_type, | visit_type=visit_type, | ||||
last_update=iso8601.parse_date(repo["last_activity_at"]), | last_update=iso8601.parse_date(repo["last_activity_at"]), | ||||
) | ) | ||||
def commit_page(self, page_result: PageResult) -> None: | |||||
"""Update currently stored state using the latest listed "next" page if relevant. | |||||
Relevancy is determined by the next_page link whose 'page' id must be strictly | |||||
superior to the currently stored one. | |||||
Note: this is a noop for full listing mode | |||||
""" | |||||
if self.incremental: | |||||
# link: https://${project-api}/?...&page=2x... | |||||
next_page = page_result.next_page | |||||
if not next_page and self.last_page: | |||||
next_page = self.last_page | |||||
if next_page: | |||||
id_after = _parse_id_after(next_page) | |||||
previous_next_page = self.state.last_seen_next_link | |||||
previous_id_after = _parse_id_after(previous_next_page) | |||||
if previous_next_page is None or ( | |||||
previous_id_after and id_after and previous_id_after < id_after | |||||
): | |||||
self.state.last_seen_next_link = next_page | |||||
def finalize(self) -> None: | def finalize(self) -> None: | ||||
"""finalize the lister state when relevant (see `fn:commit_page` for details) | """finalize the lister state when relevant (see `fn:commit_page` for details) | ||||
Note: this is a noop for full listing mode | Note: this is a noop for full listing mode | ||||
""" | """ | ||||
next_page = self.state.last_seen_next_link | if self.incremental: | ||||
if self.incremental and next_page: | self.state.last_listing_date = self.listing_date | ||||
# link: https://${project-api}/?...&page=2x... | |||||
next_id_after = _parse_id_after(next_page) | |||||
scheduler_state = self.get_state_from_scheduler() | |||||
previous_next_id_after = _parse_id_after( | |||||
scheduler_state.last_seen_next_link | |||||
) | |||||
if (not previous_next_id_after and next_id_after) or ( | |||||
previous_next_id_after | |||||
and next_id_after | |||||
and previous_next_id_after < next_id_after | |||||
): | |||||
self.updated = True | self.updated = True |