Changeset View
Standalone View
swh/lister/core/lister.py
- This file was added.
# Copyright (C) 2017 the Software Heritage developers | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
from swh.core import config | |||||
from swh.storage import get_storage | |||||
from swh.scheduler.backend import SchedulerBackend | |||||
import xmltodict | |||||
import gzip | |||||
import logging | |||||
import os | |||||
import random | |||||
import requests | |||||
import time | |||||
from datetime import datetime | |||||
from email.utils import parsedate | |||||
import re | |||||
from pprint import pformat | |||||
from sqlalchemy import create_engine, func | |||||
from sqlalchemy.orm import sessionmaker | |||||
import abc | |||||
from .abstractattribute import AbstractAttribute | |||||
from .db_utils import session_scope | |||||
class FetchError(RuntimeError): | |||||
def __init__(self, response): | |||||
self.response = response | |||||
def __str__(self): | |||||
return repr(self.response) | |||||
class SWHLister(config.SWHConfig): | |||||
zack: A brief description of what a "lister" in SWH context would be nice here. It's something that… | |||||
CONFIG_BASE_FILENAME = None | |||||
DEFAULT_CONFIG = { | |||||
'storage': ('dict', { | |||||
'cls': 'remote', | |||||
'args': { | |||||
'url': 'http://localhost:5000/' | |||||
}, | |||||
}), | |||||
'scheduling_db': ('str', 'dbname=softwareheritage-scheduler'), | |||||
} | |||||
ADDITIONAL_CONFIG = {} | |||||
def __init__(self): | |||||
self.config = self.parse_config_file( | |||||
base_filename=self.CONFIG_BASE_FILENAME, | |||||
additional_configs=[self.ADDITIONAL_CONFIG]) | |||||
self.storage = get_storage(**self.config['storage']) | |||||
self.scheduler = SchedulerBackend( | |||||
scheduling_db=self.config['scheduling_db'], | |||||
) | |||||
# must init with (unique?) name and API URL | |||||
Done Inline ActionsShould probably be PagingLister or a PaginatedLister (a lister that flips through pages, or a lister for a paginated resource). We need a high level description of how the lister works and what the process is in the toplevel docstring for this base class, so we know what methods we need to override in subclasses (abstractmethods give a good hint, but aren't enough without a look through the algorithm). olasd: Should probably be `PagingLister` or a `PaginatedLister` (a lister that flips through pages, or… | |||||
class PagedLister(abc.ABC, SWHLister): | |||||
LIST_API_TEMPLATE = AbstractAttribute('eg. \'/repositories?after=%s\'') | |||||
Model = AbstractAttribute('subtype of swh.lister.core.models.ModelBase') | |||||
@abc.abstractmethod | |||||
def get_model_from_repo(self, repo): | |||||
"""Converts a pythonized repo into a simple form that matches the model. | |||||
Args: | |||||
repo (dict): dict form of repo response from API server | |||||
Returns: | |||||
a dict that maps model keys to response entries | |||||
""" | |||||
pass | |||||
@abc.abstractmethod | |||||
def get_next_index_from_response(self, response): | |||||
"""Find the next page in a paged API response. | |||||
Args: | |||||
response (session response): paged response from the API server | |||||
Returns: | |||||
index of next page, likely extracted from the next href url | |||||
""" | |||||
pass | |||||
@abc.abstractmethod | |||||
def list_response_repos(self, response): | |||||
"""Convert paged API response into list of pythonized repo objects. | |||||
Args: | |||||
response (session response): paged response from the API server. | |||||
Returns: | |||||
list of pythonized repo responses | |||||
eg. [ repo for repo in response.json() ] | |||||
""" | |||||
pass | |||||
@abc.abstractmethod | |||||
def repo_origin(self, repo): | |||||
"""Extract origin url from the response for a repo. | |||||
Args: | |||||
repo: component of pythonized API response for one repo | |||||
Returns: | |||||
origin url string | |||||
""" | |||||
pass | |||||
@abc.abstractmethod | |||||
def repo_type(self, repo): | |||||
"""Extract origin type from the response for a repo. | |||||
Args: | |||||
repo: component of pythonized API response for one repo | |||||
Returns: | |||||
origin type string | |||||
""" | |||||
Done Inline ActionsThose two methods should be merged into a single method returning a swh.storage-compatible origin dict, so it's easier to extend in the future. olasd: Those two methods should be merged into a single method returning a `swh.storage`-compatible… | |||||
pass | |||||
@property | |||||
def CONFIG_BASE_FILENAME(self): | |||||
return 'lister-%s' % self.LISTER_NAME | |||||
@property | |||||
def ADDITIONAL_CONFIG(self): | |||||
return { | |||||
'lister_db_url': | |||||
('str', 'postgresql:///lister-%s' % self.LISTER_NAME), | |||||
'credentials': | |||||
('list[dict]', []), | |||||
Done Inline ActionsShould be renamed cache_responses (it's not used in production so that's a free rename). olasd: Should be renamed cache_responses (it's not used in production so that's a free rename). | |||||
'cache_response': | |||||
('bool', False), | |||||
'cache_dir': | |||||
('str', '~/.cache/swh/lister/%s' % self.LISTER_NAME), | |||||
Done Inline ActionsClever hack although it makes pyflakes yell at the ALLCAPS attribute names. Add "# noqa: N802" to silence this for now, but this might be worth a refactor in swh.core.config. olasd: Clever hack although it makes pyflakes yell at the ALLCAPS attribute names. Add "# noqa: N802"… | |||||
} | |||||
INITIAL_BACKOFF = 10 | |||||
backoff = INITIAL_BACKOFF | |||||
MAX_RETRIES = 7 | |||||
CONN_SLEEP = 10 | |||||
db_session = None | |||||
session = None | |||||
Done Inline ActionsThe lowercase attributes shouldn't be class attributes, but rather be instantiated in __init__. olasd: The lowercase attributes shouldn't be class attributes, but rather be instantiated in… | |||||
LISTER_NAME = None | |||||
def __init__(self, name, api_url, override_config=None): | |||||
super().__init__() | |||||
if override_config: | |||||
self.config.update(override_config) | |||||
self.config['cache_dir'] = os.path.expanduser(self.config['cache_dir']) | |||||
if self.config['cache_response']: | |||||
config.prepare_folders(self.config, ['cache_dir']) | |||||
self.db_engine = create_engine(self.config['lister_db_url']) | |||||
self.mk_session = sessionmaker(bind=self.db_engine) | |||||
self.LISTER_NAME = name # 'github?', 'bitbucket?', 'foo.com?' | |||||
self.API_URL = api_url # eg. 'https://api.github.com' | |||||
if name is None or api_url is None: | |||||
err = "Subclasses of PagedLister must init with: <name>, <api_url>" | |||||
Done Inline ActionsIt's counterintuitive to be overriding ALLCAPS class attributes in the constructor. olasd: It's counterintuitive to be overriding ALLCAPS class attributes in the constructor. | |||||
raise NameError(err) | |||||
def save_http_response(self, r, cache_dir): | |||||
""" Log the response from an API request to a cache dir | |||||
Args: | |||||
r: full server response | |||||
cache_dir: system path for cache dir | |||||
Returns: | |||||
nothing | |||||
""" | |||||
def escape_url_path(p): | |||||
return p.replace('/', '__') | |||||
fname = os.path.join(cache_dir, | |||||
escape_url_path(r.request.path_url) + '.gz') | |||||
with gzip.open(fname, 'w') as f: | |||||
def emit(s): | |||||
f.write(bytes(s, 'UTF-8')) | |||||
emit(pformat(r.request.path_url)) | |||||
emit('\n#\n') | |||||
emit(pformat(r.status_code)) | |||||
emit('\n#\n') | |||||
emit(pformat(r.headers)) | |||||
emit('\n#\n') | |||||
try: # json? | |||||
emit(pformat(r.json())) | |||||
except: # not json | |||||
try: # xml? | |||||
emit(pformat(xmltodict.parse(r.text))) | |||||
except: # not xml | |||||
emit(pformat(r.text)) | |||||
def api_request_headers(self): | |||||
""" Returns dictionary of necessary headers if needed on API requests | |||||
MAY BE OVERRIDDEN if headers are needed | |||||
""" | |||||
return {} | |||||
zackUnsubmitted Not Done Inline ActionsSo, it bothers me a little bit that this HTTP-specific part is in the base lister. Conceptually, to me, the lister should be agnostic w.r.t. the protocol used to list remote forges. We're already taking a big step here, so I don't think this should be blocking anything, but how critical is having this code here? And what would be the alternatives? For instance, would moving this (plus everything HTTP specific) to a new HttpAPISwhLister sub-class, that would be uses as base for implementing all listers of forges that do use an HTTP-based API make sense? Or would it be overkill? zack: So, it bothers me a little bit that this HTTP-specific part is in the base lister. Conceptually… | |||||
def reset_backoff(self): | |||||
self.backoff = self.INITIAL_BACKOFF | |||||
def back_off(self): | |||||
ret = self.backoff | |||||
self.backoff *= 10 | |||||
return ret | |||||
def rate_limit_check(self, response): | |||||
""" Check if we're hitting rate limits | |||||
MAY BE OVERRIDDEN if using non-standard rate limit alerts | |||||
Args: | |||||
response (session response): complete API query response | |||||
Returns: | |||||
seconds to delay if hitting request limits (0 if success) | |||||
""" | |||||
if response.status_code == 429: # HTTP too many requests | |||||
retry_after = response.headers.get('Retry-After', self.back_off()) | |||||
try: | |||||
# might be seconds | |||||
return float(retry_after) | |||||
except: | |||||
# might be http-date | |||||
at_date = datetime(*parsedate(retry_after)[:6]) | |||||
from_now = (at_date - datetime.today()).total_seconds() | |||||
return max(0, from_now) | |||||
else: # response ok | |||||
self.reset_backoff() | |||||
Done Inline ActionsThis function can return a "no retry" return value even when the server wants you to retry ("Retry-After" in the past or "just now"). In that case api_request will fall through and we'll try to parse the body of the 429 error. Either we need to set a minimum retry cooloff, or distinguish between "request ok" and "retry immediately". olasd: This function can return a "no retry" return value even when the server wants you to retry… | |||||
Done Inline ActionsI figure that getting a negative or 0 delay time will be due to signal latency. By the time you hear the message about when to retry, it's already ok to retry right away. But I can throw in a minimum time. What should it be? fiendish: I figure that getting a negative or 0 delay time will be due to signal latency. By the time you… | |||||
Done Inline ActionsI don't think we need a minimal delay if the server tells us to retry right away. We just need to make sure the caller interprets that as a "retry needed now" answer rather than a "go ahead and process" as it seems to be now olasd: I don't think we need a minimal delay if the server tells us to retry right away. We just need… | |||||
return 0 | |||||
def api_request(self, path, username=None, password=None, | |||||
headers=None, session=None): | |||||
""" Make API request with retries and rate limits | |||||
Args: | |||||
path (string): server API path to request (after first slash) | |||||
username (string): basic auth credential | |||||
password (string): basic auth credential | |||||
headers (dict): additional optional headers for the request | |||||
session (requests.Session object): pre-made session | |||||
Returns: | |||||
session response object | |||||
""" | |||||
if session is None: | |||||
session = self.session | |||||
if session is None: | |||||
session = requests.Session() | |||||
params = {} | |||||
params['headers'] = headers or {} | |||||
if username is not None and password is not None: | |||||
params['auth'] = (username, password) | |||||
retries_left = self.MAX_RETRIES | |||||
while retries_left > 0: | |||||
logging.debug('sending API request: %s' % path) | |||||
try: | |||||
r = session.get(self.API_URL + path, **params) | |||||
except requests.exceptions.ConnectionError: | |||||
# network-level connection error, try again | |||||
logging.warn('connection error upon %s: sleep for %d seconds' % | |||||
(path, self.CONN_SLEEP)) | |||||
time.sleep(self.CONN_SLEEP) | |||||
retries_left -= 1 | |||||
continue | |||||
# detect throttling | |||||
delay = self.rate_limit_check(r) | |||||
if delay > 0: | |||||
logging.warn('rate limited upon %s: sleep for %f seconds' % | |||||
(path, delay)) | |||||
time.sleep(delay) | |||||
else: # request ok | |||||
break | |||||
retries_left -= 1 | |||||
if not retries_left: | |||||
logging.warn('giving up on %s: max retries exceed' % path) | |||||
do_cache = self.config['cache_response'] | |||||
if do_cache: | |||||
self.save_http_response(r, self.config['cache_dir']) | |||||
if not r.ok: | |||||
raise FetchError(r) | |||||
return r | |||||
def query_uid(self, uid, db_session=None): | |||||
"""Look in the db for a repository with given uid | |||||
Args: | |||||
uid: uid to look for | |||||
db_session: optional pre-made db session | |||||
Returns: | |||||
sqlalchemy.ext.declarative.declarative_base object | |||||
with the given uid | |||||
""" | |||||
if not db_session: | |||||
db_session = self.db_session | |||||
if not db_session: | |||||
with session_scope(self.mk_session) as db_session: | |||||
return self.query_uid(uid, db_session=db_session) | |||||
return db_session.query(self.Model) \ | |||||
.filter(self.Model.uid == uid) \ | |||||
.first() | |||||
def winnow_models(self, mlist, field, to_remove): | |||||
"""Given a list of models, remove any with <field> matching | |||||
some member of a list of values. | |||||
Args: | |||||
mlist (list of model rows): the initial list of models | |||||
field (column): the column to filter on | |||||
to_remove (list): if anything in mlist has column <field> equal to | |||||
one of the values in to_remove, it will be removed from the | |||||
result | |||||
Returns: | |||||
A list of model rows starting from mlist minus any matching rows | |||||
""" | |||||
if to_remove: | |||||
return mlist.filter(~field.in_(to_remove)).all() | |||||
else: | |||||
return mlist.all() | |||||
def query_range(self, start, end, db_session=None): | |||||
"""Look in the db for a range of repositories with indexable | |||||
values between start and end | |||||
Args: | |||||
start (model indexable type): start of desired indexable range | |||||
end (model indexable type): end of desired indexable range | |||||
db_session: optional pre-made db session | |||||
Returns: | |||||
a list of sqlalchemy.ext.declarative.declarative_base objects | |||||
with indexable values within the given range | |||||
""" | |||||
if not db_session: | |||||
db_session = self.db_session | |||||
if not db_session: | |||||
with session_scope(self.mk_session) as db_session: | |||||
return self.query_range(start, end, db_session=db_session) | |||||
retlist = db_session.query(self.Model) | |||||
if start is not None: | |||||
retlist.filter(self.Model.indexable >= start) | |||||
if end is not None: | |||||
retlist.filter(self.Model.indexable <= end) | |||||
return retlist | |||||
def query_full_names(self, full_names, db_session=None): | |||||
"""Look in the db for repositories by full names | |||||
Args: | |||||
full_names: list of full name strings to look for | |||||
db_session: optional pre-made db session | |||||
Returns: | |||||
a list of sqlalchemy.ext.declarative.declarative_base objects | |||||
with full names that are in the given list | |||||
""" | |||||
if not db_session: | |||||
db_session = self.db_session | |||||
if not db_session: | |||||
with session_scope(self.mk_session) as db_session: | |||||
return self.query_full_names(full_names, | |||||
db_session=db_session) | |||||
return db_session.query(self.Model) \ | |||||
.filter(self.Model.full_name.in_(full_names)) \ | |||||
.all() | |||||
def partition_repo_indices(self, partition_size, db_session=None): | |||||
"""Describe an index-space compartmentalization of the db table | |||||
in equal sized chunks. This is used to describe min&max bounds for | |||||
parallelizing fetch tasks. | |||||
Args: | |||||
partition_size (int): desired size to make each partition | |||||
Returns: | |||||
a list of tuples (begin, end) of indexable value that | |||||
declare approximately equal-sized ranges of existing | |||||
repos | |||||
""" | |||||
if not db_session: | |||||
Done Inline ActionsEven though it's not a frequent operation (once every few months), I'd really like to avoid doing a count(*) on a table of substantial size (as PostgreSQL will make that a sequential scan of the full table). Maybe split that out into a separate overridable "get_repo_count" method so we can avoid it if possible, e.g. in the case of GitHub with its sequential indices. olasd: Even though it's not a frequent operation (once every few months), I'd really like to avoid… | |||||
db_session = self.db_session | |||||
if not db_session: | |||||
with session_scope(self.mk_session) as db_session: | |||||
return self.partition_repo_indices(partition_size, | |||||
db_session=db_session) | |||||
# number of rows | |||||
n = db_session.query(func.count('*')).select_from(self.Model).scalar() | |||||
partitions = [] | |||||
partition_size = min(partition_size, n) | |||||
prev_index = None | |||||
for i in range(0, n-1, partition_size): | |||||
# indexable column from the ith row | |||||
index = db_session.query(self.Model.indexable) \ | |||||
.order_by(self.Model.indexable).offset(i).first() | |||||
if index is not None and prev_index is not None: | |||||
partitions.append((prev_index, index)) | |||||
prev_index = index | |||||
partitions.append((prev_index, self.last_repo_index())) | |||||
return partitions | |||||
def last_repo_index(self, db_session=None): | |||||
"""Look in the db for the largest indexable value | |||||
Args: | |||||
db_session: optional pre-made db session | |||||
Returns: | |||||
the largest indexable value of all repos in the db | |||||
""" | |||||
if not db_session: | |||||
db_session = self.db_session | |||||
if not db_session: | |||||
with session_scope(self.mk_session) as db_session: | |||||
return self.last_repo_index(db_session=db_session) | |||||
t = db_session.query(func.max(self.Model.indexable)).first()[0] | |||||
return t | |||||
def inject_repo(self, model_dict, db_session=None): | |||||
"""Add/update a new repo to the db and mark it last_seen now. | |||||
Args: | |||||
model_dict: dictionary mapping model keys to values | |||||
db_session: optional pre-made db session | |||||
Returns: | |||||
new or updated sqlalchemy.ext.declarative.declarative_base | |||||
object associated with the injection | |||||
""" | |||||
if not db_session: | |||||
db_session = self.db_session | |||||
if not db_session: | |||||
with session_scope(self.mk_session) as db_session: | |||||
return self.inject_repo(model_dict, db_session=db_session) | |||||
logging.debug('injecting repo %s' % model_dict['uid']) | |||||
sql_repo = self.query_uid(model_dict['uid'], db_session) | |||||
if not sql_repo: | |||||
sql_repo = self.Model(**model_dict) | |||||
db_session.add(sql_repo) | |||||
else: | |||||
for k in model_dict: | |||||
setattr(sql_repo, k, model_dict[k]) | |||||
sql_repo.last_seen = datetime.now() | |||||
return sql_repo | |||||
def origin_dict(self, origin_type, origin_url): | |||||
""" Return special dict format for the origins list | |||||
Args: | |||||
origin_type (string) | |||||
Done Inline ActionsShould be rendered useless by the repo_type/repo_url refactoring. olasd: Should be rendered useless by the repo_type/repo_url refactoring. | |||||
Done Inline ActionsCan you clarify this annotation? fiendish: Can you clarify this annotation? | |||||
Done Inline ActionsIf the repo_type/repo_url abstract methods are replaced by one that just returns the swh.storage-compatible origin dict, then this origin_dict method is not needed anymore. olasd: If the repo_type/repo_url abstract methods are replaced by one that just returns the `swh. | |||||
origin_url (string) | |||||
Returns: | |||||
the same information in a different form | |||||
""" | |||||
return { | |||||
'type': origin_type, | |||||
'url': origin_url, | |||||
} | |||||
def task_dict(self, origin_type, origin_url): | |||||
""" Return special dict format for the tasks list | |||||
Args: | |||||
origin_type (string) | |||||
origin_url (string) | |||||
Returns: | |||||
the same information in a different form | |||||
""" | |||||
return { | |||||
'type': 'origin-update-%s' % origin_type, | |||||
Done Inline ActionsShould take the plain origin dict as argument. olasd: Should take the plain origin dict as argument. | |||||
Done Inline ActionsCan you clarify this annotation? fiendish: Can you clarify this annotation? | |||||
Done Inline ActionsIf the repo_type/repo_url abstract methods are replaced by one that just returns the swh.storage-compatible origin dict, then this task_dict method should be fed that dict directly. olasd: If the repo_type/repo_url abstract methods are replaced by one that just returns the `swh. | |||||
'arguments': { | |||||
'args': [ | |||||
origin_url, | |||||
], | |||||
'kwargs': {}, | |||||
}, | |||||
'next_run': datetime.now(), | |||||
} | |||||
def string_pattern_check(self, a, b, c=None): | |||||
if isinstance(a, str): | |||||
a_pattern = re.sub('[a-zA-Z0-9]', | |||||
'[a-zA-Z0-9]', | |||||
re.escape(a)) | |||||
if (isinstance(b, str) and (re.match(a_pattern, b) is None) | |||||
or isinstance(c, str) and (re.match(a_pattern, c) is None)): | |||||
logging.debug(a_pattern) | |||||
raise TypeError('incomparable string patterns detected') | |||||
def within_bounds(self, inner, lower=None, upper=None): | |||||
""" See if a comparable value is between optional lower/upper bounds | |||||
MAY BE OVERRIDDEN for non-directly-comparable types. | |||||
Done Inline ActionsDo we want to keep that around? olasd: Do we want to keep that around? | |||||
Done Inline ActionsCan you clarify this annotation? fiendish: Can you clarify this annotation? | |||||
Done Inline ActionsI don't think the within_bounds debug statement is needed? olasd: I don't think the `within_bounds` debug statement is needed? | |||||
Args: | |||||
inner (indexable type): the value being checked | |||||
lower (indexable type): optional lower bound | |||||
upper (indexable type): optional upper bound | |||||
Returns: | |||||
whether inner is within the constrains imposed by the optional | |||||
lower and upper bounds | |||||
""" | |||||
try: | |||||
if lower is None and upper is None: | |||||
return True | |||||
elif lower is None: | |||||
ret = inner <= upper | |||||
elif upper is None: | |||||
ret = inner >= lower | |||||
else: | |||||
ret = lower <= inner <= upper | |||||
self.string_pattern_check(inner, lower, upper) | |||||
except Exception as e: | |||||
logging.error(str(e) + ': %s, %s, %s' % | |||||
(('inner=%s%s' % (type(inner), inner)), | |||||
('lower=%s%s' % (type(lower), lower)), | |||||
('upper=%s%s' % (type(upper), upper))) | |||||
) | |||||
raise | |||||
return ret | |||||
def models_list_from_response(self, response, max_index): | |||||
"""Convert the API response into a list of basic data ready to be | |||||
injected into the db. | |||||
Args: | |||||
response: the full api response | |||||
max_index: don't process results beyond this index | |||||
Returns: | |||||
a list of dicts mapping keys in the db model for each repo | |||||
""" | |||||
repos_list = self.list_response_repos(response) | |||||
models_list = [ | |||||
self.get_model_from_repo(r) | |||||
for r in repos_list | |||||
] | |||||
models_list = [ | |||||
m for m in models_list | |||||
if self.within_bounds(m['indexable'], None, max_index) | |||||
] | |||||
return models_list | |||||
def inject_repo_data_into_db(self, models_list): | |||||
"""Inject data into the db. | |||||
Args: | |||||
models_list: list of dicts mapping keys from the db model | |||||
for each repo to be injected | |||||
Returns: | |||||
the set of ids that were injected | |||||
dict of full_name:sql_repo pairs | |||||
""" | |||||
found_ids = set() | |||||
injected_repos = {} | |||||
for m in models_list: | |||||
found_ids.add(m['uid']) | |||||
injected_repos[m['full_name']] = self.inject_repo(m) | |||||
return found_ids, injected_repos | |||||
def create_missing_origins_and_tasks(self, models_list, injected_repos): | |||||
"""Find any newly created db entries that don't yet have tasks or | |||||
origin objects assigned. | |||||
Args: | |||||
models_list: a list of dicts mapping keys in the db model for | |||||
each repo | |||||
injected_repos: dict of full_name:sql_repo pairs that have just | |||||
been created | |||||
Returns: | |||||
Nothing. Modifies injected_repos. | |||||
""" | |||||
for m in models_list: | |||||
ir = injected_repos[m['full_name']] | |||||
if not ir.origin_id: | |||||
ir.origin_id = self.storage.origin_add_one( | |||||
self.origin_dict(m['origin_type'], m['origin_url']) | |||||
) | |||||
if not ir.task_id: | |||||
ir.task_id = self.scheduler.create_tasks( | |||||
[self.task_dict(m['origin_type'], m['origin_url'])] | |||||
)['id'] | |||||
def disable_deleted_repos(self, start, end, keep_these): | |||||
"""Disable tasks for repos that no longer exist between start and end. | |||||
Args: | |||||
start: beginning of range to disable | |||||
end: end of range to disable | |||||
keep_these (uid list): do not disable repos with uids in this | |||||
""" | |||||
deleted_repos = self.winnow_models( | |||||
self.query_range(start, end), self.Model.uid, keep_these | |||||
) | |||||
tasks_to_disable = [repo for repo in deleted_repos | |||||
if repo.task_id is not None] | |||||
if tasks_to_disable: | |||||
self.scheduler.disable_tasks(tasks_to_disable) | |||||
for repo in deleted_repos: | |||||
repo.task_id = None | |||||
def fetch_pages(self, min_index=None, max_index=None): | |||||
""" Main loop | |||||
Args: | |||||
min_index (indexable type): optional index to start from | |||||
max_index (indexable type): optional index to stop at | |||||
Returns: | |||||
nothing | |||||
""" | |||||
self.session = requests.Session() | |||||
self.db_session = self.mk_session() | |||||
index = min_index or 0 | |||||
loop_count = 0 | |||||
while self.within_bounds(index, min_index, max_index): | |||||
logging.info('listing repos starting at %s' % index) | |||||
# Request (partial?) list of repositories | |||||
headers = self.api_request_headers() | |||||
creds = self.config['credentials'] | |||||
auth = random.choice(creds) if creds else {} | |||||
response = self.api_request(self.LIST_API_TEMPLATE % index, | |||||
headers=headers, **auth) | |||||
# parse, inject, task | |||||
models_list = self.models_list_from_response(response, max_index) | |||||
logging.debug('INDEX : ' + str(index) + ' : models_list : ' | |||||
+ str(models_list)) | |||||
found_ids, injected_repos = self.inject_repo_data_into_db( | |||||
models_list | |||||
) | |||||
self.create_missing_origins_and_tasks(models_list, injected_repos) | |||||
next_index = self.get_next_index_from_response(response) | |||||
# find if any repos were deleted and disable their tasks | |||||
if next_index is not None: | |||||
if self.within_bounds(next_index, None, max_index): | |||||
start, end = index, next_index | |||||
else: | |||||
start, end = index, max_index | |||||
else: | |||||
start, end = index, max_index | |||||
self.disable_deleted_repos(start, end, found_ids) | |||||
# end condition | |||||
if (next_index is None) or (next_index == index): | |||||
logging.info('stopping after index %s, no next link found' % | |||||
index) | |||||
break | |||||
else: | |||||
index = next_index | |||||
loop_count += 1 | |||||
if loop_count == 20: | |||||
logging.info('flushing updates') | |||||
loop_count = 0 | |||||
self.db_session.commit() | |||||
self.db_session = self.mk_session() | |||||
self.db_session.commit() | |||||
self.db_session = None |
A brief description of what a "lister" in SWH context would be nice here. It's something that we are going to need to write anyhow as conceptual basis for presenting the API people will need to implement to realize a new lister. I guess it could go here. One short paragraph might be enough.
Additionally though, we need here developer-oriented documentation explaining what subclass implementors should implement to realize the various functionalities of a lister (e.g., full v. incremental listing).