Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/launchpad/lister.py
# Copyright (C) 2017-2020 The Software Heritage developers | # Copyright (C) 2020-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import datetime, timedelta | from dataclasses import asdict, dataclass | ||||
from itertools import count | from datetime import datetime | ||||
from typing import Any, Dict, List, Optional, Tuple, Union | import logging | ||||
from typing import Any, Dict, Iterator, Optional | |||||
from launchpadlib.launchpad import Launchpad # type: ignore | import iso8601 | ||||
from lazr.restfulclient.resource import Collection, Entry # type: ignore | from launchpadlib.launchpad import Launchpad | ||||
from sqlalchemy import func | from lazr.restfulclient.resource import Collection | ||||
from swh.lister.core.lister_base import ListerBase | from swh.scheduler.interface import SchedulerInterface | ||||
from swh.scheduler.model import ListedOrigin | |||||
from .models import LaunchpadModel | from ..pattern import CredentialsType, Lister | ||||
logger = logging.getLogger(__name__) | |||||
LaunchpadPageType = Iterator[Collection] | |||||
@dataclass | |||||
class LaunchpadListerState: | |||||
"""State of Launchpad lister""" | |||||
date_last_modified: Optional[datetime] = None | |||||
"""modification date of last updated repository since last listing""" | |||||
class LaunchpadLister(Lister[LaunchpadListerState, LaunchpadPageType]): | |||||
""" | |||||
List git repositories from Launchpad. | |||||
Args: | |||||
scheduler: instance of SchedulerInterface | |||||
incremental: defines if incremental listing should be used, in that case | |||||
only modified or new repositories since last incremental listing operation | |||||
will be returned | |||||
""" | |||||
class LaunchpadLister(ListerBase): | |||||
MODEL = LaunchpadModel | |||||
LISTER_NAME = "launchpad" | LISTER_NAME = "launchpad" | ||||
instance = "launchpad" | |||||
flush_packet_db = 20 | |||||
def __init__(self, override_config=None): | def __init__( | ||||
super().__init__(override_config=override_config) | self, | ||||
launchpad = Launchpad.login_anonymously( | scheduler: SchedulerInterface, | ||||
"softwareheritage", "production", version="devel" | incremental: bool = False, | ||||
credentials: CredentialsType = None, | |||||
): | |||||
super().__init__( | |||||
scheduler=scheduler, | |||||
url="https://launchpad.net/", | |||||
instance="launchpad", | |||||
credentials=credentials, | |||||
) | ) | ||||
self.get_repos = launchpad.git_repositories.getRepositories | self.incremental = incremental | ||||
self.date_last_modified = None | |||||
def get_model_from_repo(self, repo: Entry) -> Dict[str, Union[str, datetime]]: | def state_from_dict(self, d: Dict[str, Any]) -> LaunchpadListerState: | ||||
return { | date_last_modified = d.get("date_last_modified") | ||||
"uid": repo.unique_name, | if date_last_modified is not None: | ||||
"name": repo.name, | d["date_last_modified"] = iso8601.parse_date(date_last_modified) | ||||
"full_name": repo.name, | return LaunchpadListerState(**d) | ||||
"origin_url": repo.git_https_url, | |||||
"html_url": repo.web_link, | def state_to_dict(self, state: LaunchpadListerState) -> Dict[str, Any]: | ||||
"origin_type": "git", | d = asdict(state) | ||||
"date_last_modified": repo.date_last_modified, | date_last_modified = d.get("date_last_modified") | ||||
} | if date_last_modified is not None: | ||||
ardumont: Should the computation about updating/restoring the state within those methods?
What's the… | |||||
Done Inline ActionsThose methods are only there to serialize/deserialize the lister state in order to be able to sent it through RPC. anlambert: Those methods are only there to serialize/deserialize the lister state in order to be able to… | |||||
Not Done Inline Actionsindeed, i mixed the 2 dates notions we have. The one used for the incremental nature of the lister (pagination "index" date used to store the state, what we are using) and the "last_update" for one repo to list. moving along ;) Thanks. ardumont: indeed, i mixed the 2 dates notions we have.
The one used for the incremental nature of the… | |||||
d["date_last_modified"] = date_last_modified.isoformat() | |||||
def lib_response_simplified( | return d | ||||
self, response: Collection | |||||
) -> List[Dict[str, Union[str, datetime]]]: | |||||
return [ | |||||
self.get_model_from_repo(repo) for repo in response[: len(response.entries)] | |||||
] | |||||
def db_last_threshold(self) -> Optional[datetime]: | |||||
t = self.db_session.query(func.max(self.MODEL.date_last_modified)).first() | |||||
if t: | |||||
return t[0] | |||||
else: | |||||
return None | |||||
def ingest_data_lp( | |||||
self, identifier: Optional[datetime], checks: bool = False | |||||
) -> Tuple[Collection, dict]: | |||||
""" The core data fetch sequence. Request launchpadlib endpoint. Simplify and | |||||
filter response list of repositories. Inject repo information into | |||||
local db. Queue loader tasks for linked repositories. | |||||
Args: | def get_pages(self) -> Iterator[LaunchpadPageType]: | ||||
identifier: Resource identifier. | """ | ||||
checks: Additional checks required | Yields an iterator on all git repositories hosted on Launchpad sorted | ||||
by last modification date in ascending order. | |||||
""" | """ | ||||
response = self.get_repos( | launchpad = Launchpad.login_anonymously( | ||||
order_by="most neglected first", modified_since_date=identifier | "softwareheritage", "production", version="devel" | ||||
) | |||||
date_last_modified = None | |||||
if self.incremental: | |||||
date_last_modified = self.state.date_last_modified | |||||
get_repos = launchpad.git_repositories.getRepositories | |||||
yield get_repos( | |||||
order_by="most neglected first", modified_since_date=date_last_modified | |||||
) | ) | ||||
models_list = self.lib_response_simplified(response) | |||||
models_list = self.filter_before_inject(models_list) | |||||
if checks: | |||||
models_list = self.do_additional_checks(models_list) | |||||
if not models_list: | |||||
return response, {} | |||||
# inject into local db | |||||
injected = self.inject_repo_data_into_db(models_list) | |||||
# queue workers | |||||
self.schedule_missing_tasks(models_list, injected) | |||||
return response, injected | |||||
def run(self, max_bound: Optional[datetime] = None) -> Dict[str, Any]: | |||||
""" Main entry function. Sequentially fetches repository data | |||||
from the service according to the basic outline in the class | |||||
docstring, continually fetching sublists until either there | |||||
is no next index reference given or the given next index is greater | |||||
than the desired max_bound. | |||||
Args: | def get_origins_from_page(self, page: LaunchpadPageType) -> Iterator[ListedOrigin]: | ||||
max_bound : optional date to start at | """ | ||||
Returns: | Iterate on all git repositories and yield ListedOrigin instances. | ||||
Dict containing listing status | """ | ||||
""" | assert self.lister_obj.id is not None | ||||
status = "uneventful" | |||||
def ingest_git_repos(): | |||||
threshold = max_bound | |||||
for i in count(1): | |||||
response, injected_repos = self.ingest_data_lp(threshold) | |||||
if not response and not injected_repos: | |||||
return | |||||
# batch is empty | for repo in page: | ||||
if len(response.entries) == 0: | |||||
return | origin_url = repo.git_https_url | ||||
first: datetime = response[0].date_last_modified | # filter out origins with invalid URL | ||||
last: datetime = response[len(response.entries) - 1].date_last_modified | if not origin_url.startswith("https://"): | ||||
continue | |||||
last_update = repo.date_last_modified | |||||
self.date_last_modified = last_update | |||||
Not Done Inline ActionsWhy not constrain the iso8601 conversion back and forth here? ardumont: Why not constrain the iso8601 conversion back and forth here? | |||||
Done Inline ActionsBecause we want to manipulate datetime objects while the lister runs. anlambert: Because we want to manipulate `datetime` objects while the lister runs. | |||||
Not Done Inline Actionsright, i misread that part. ardumont: right, i misread that part. | |||||
logger.debug("Found origin %s last updated on %s", origin_url, last_update) | |||||
yield ListedOrigin( | |||||
lister_id=self.lister_obj.id, | |||||
visit_type="git", | |||||
url=origin_url, | |||||
last_update=last_update, | |||||
) | |||||
def finalize(self) -> None: | |||||
if self.date_last_modified is None: | |||||
return | |||||
next_date = last - timedelta(seconds=15) | if self.incremental and ( | ||||
self.state.date_last_modified is None | |||||
or self.date_last_modified > self.state.date_last_modified | |||||
): | |||||
self.state.date_last_modified = self.date_last_modified | |||||
if next_date <= first: | self.updated = True | ||||
delta = last - first | |||||
next_date = last - delta / 2 | |||||
threshold = next_date | |||||
yield i | |||||
for i in ingest_git_repos(): | |||||
if (i % self.flush_packet_db) == 0: | |||||
self.db_session.commit() | |||||
self.db_session = self.mk_session() | |||||
status = "eventful" | |||||
self.db_session.commit() | |||||
self.db_session = self.mk_session() | |||||
return {"status": status} |
Should the computation about updating/restoring the state within those methods?
What's the point of the finalize method then?
Also shouldn't there be comparison between date to keep only the most recent of the modified date?