Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/launchpad/lister.py
- This file was added.
# Copyright (C) 2017-2020 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
from typing import Any | |||||
from swh.lister.core.lister_base import ListerBase | |||||
from .models import LaunchpadModel | |||||
from itertools import count | |||||
from launchpadlib.launchpad import Launchpad # type: ignore | |||||
from lazr.restfulclient.resource import Collection # type: ignore | |||||
class LaunchpadLister(ListerBase): | |||||
MODEL = LaunchpadModel | |||||
LISTER_NAME = 'launchpad' | |||||
instance = 'launchpad' # There is only 1 instance of such lister | |||||
default_min_bound = 0 # type: Any | |||||
launchpad: Launchpad | |||||
flush_packet_db = 20 | |||||
def __init__(self, override_config=None): | |||||
super().__init__(override_config=override_config) | |||||
self.launchpad = Launchpad.login_anonymously( | |||||
'softwareheritage', 'production', version='devel') | |||||
self.lp_projects: Collection = self.launchpad.projects.latest() | |||||
ardumont: What are those parameters?
You might want to use some defined at [1]
[1] https://forge. | |||||
Done Inline ActionsThese arguments are what the launchpad instance need to be set up as described here https://help.launchpad.net/API/launchpadlib legau: These arguments are what the launchpad instance need to be set up as described here https… | |||||
def get_model_from_repo(self, repo): | |||||
return { | |||||
'uid': repo.unique_name, | |||||
'name': repo.name, | |||||
'full_name': repo.name, | |||||
'origin_url': repo.git_https_url, | |||||
'html_url': repo.web_link, | |||||
'origin_type': 'git' | |||||
} | |||||
def lib_response_simplified(self, response: list): | |||||
return [self.get_model_from_repo(repo) | |||||
for repo in response] | |||||
def get_git_repos(self, start, size=20): | |||||
get_repos = self.launchpad.git_repositories.getRepositories | |||||
batch = self.lp_projects[start:start+size] | |||||
next_start = start+len(batch) | |||||
return ([repo for project in batch | |||||
for repo in get_repos(target=project) | |||||
if project.vcs == "Git"], next_start) | |||||
def ingest_data(self, identifier, checks=False): | |||||
"""The core data fetch sequence. Request server endpoint. Simplify and | |||||
filter response list of repositories. Inject repo information into | |||||
local db. Queue loader tasks for linked repositories. | |||||
Args: | |||||
identifier: Resource identifier. | |||||
checks (bool): Additional checks required | |||||
""" | |||||
response = self.get_git_repos(identifier) | |||||
models_list = self.lib_response_simplified(response[0]) | |||||
models_list = self.filter_before_inject(models_list) | |||||
if checks: | |||||
models_list = self.do_additional_checks(models_list) | |||||
if not models_list: | |||||
return response, [] | |||||
# inject into local db | |||||
injected = self.inject_repo_data_into_db(models_list) | |||||
# queue workers | |||||
self.schedule_missing_tasks(models_list, injected) | |||||
return response, injected | |||||
def run(self, min_bound=0, max_bound=None): | |||||
"""Main entry function. Sequentially fetches repository data | |||||
from the service according to the basic outline in the class | |||||
docstring, continually fetching sublists until either there | |||||
is no next index reference given or the given next index is greater | |||||
than the desired max_bound. | |||||
Args: | |||||
min_bound (indexable type): optional index to start from | |||||
max_bound (indexable type): optional index to stop at | |||||
Returns: | |||||
nothing | |||||
""" | |||||
status = 'uneventful' | |||||
def ingest_git_repos(): | |||||
index = min_bound | |||||
for i in count(1): | |||||
response, injected_repos = self.ingest_data(i) | |||||
if not response and not injected_repos: | |||||
self.logger.info('No response from api server, stopping') | |||||
return | |||||
next_index = response[1] | |||||
# termination condition | |||||
if next_index > max_bound or next_index == index: | |||||
return | |||||
index = next_index | |||||
# logger.debug('Index: %s', index) | |||||
yield i | |||||
for i in ingest_git_repos(): | |||||
if (i % self.flush_packet_db) == 0: | |||||
# logger.debug('Flushing updates at index %s', i) | |||||
self.db_session.commit() | |||||
self.db_session = self.mk_session() | |||||
status = 'eventful' | |||||
self.db_session.commit() | |||||
self.db_session = self.mk_session() | |||||
return {'status': status} |
What are those parameters?
You might want to use some defined at [1]
[1] https://forge.softwareheritage.org/source/swh-lister/browse/master/swh/lister/__init__.py$13-19