Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9343258
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
32 KB
Subscribers
None
View Options
diff --git a/swh/lister/core/indexing_lister.py b/swh/lister/core/indexing_lister.py
index f76c4fc..3388077 100644
--- a/swh/lister/core/indexing_lister.py
+++ b/swh/lister/core/indexing_lister.py
@@ -1,210 +1,213 @@
# Copyright (C) 2015-2017 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import logging
from sqlalchemy import func
from .lister_transports import SWHListerHttpTransport
from .lister_base import SWHListerBase
class SWHIndexingLister(SWHListerBase):
"""Lister* intermediate class for any service that follows the pattern:
- The service must report at least one stable unique identifier, known
herein as the UID value, for every listed repository.
- If the service splits the list of repositories into sublists, it must
report at least one stable and sorted index identifier for every listed
repository, known herein as the indexable value, which can be used as
part of the service endpoint query to request a sublist beginning from
that index. This might be the UID if the UID is monotonic.
- Client sends a request to list repositories starting from a given
index.
- Client receives structured (json/xml/etc) response with information about
a sequential series of repositories starting from that index and, if
necessary/available, some indication of the URL or index for fetching the
next series of repository data.
See :class:`swh.lister.core.lister_base.SWHListerBase` for more details.
This class cannot be instantiated. To create a new Lister for a source
code listing service that follows the model described above, you must
subclass this class and provide the required overrides in addition to
any unmet implementation/override requirements of this class's base.
(see parent class and member docstrings for details)
Required Overrides::
def get_next_target_from_response
"""
@abc.abstractmethod
def get_next_target_from_response(self, response):
"""Find the next server endpoint identifier given the entire response.
Implementation of this method depends on the server API spec
and the shape of the network response object returned by the
transport_request method.
Args:
response (transport response): response page from the server
Returns:
index of next page, possibly extracted from a next href url
"""
pass
# You probably don't need to override anything below this line.
def filter_before_inject(self, models_list):
"""Overrides SWHListerBase.filter_before_inject
Bounds query results by this Lister's set max_index.
"""
models_list = [
m for m in models_list
if self.is_within_bounds(m['indexable'], None, self.max_index)
]
return models_list
def db_query_range(self, start, end):
"""Look in the db for a range of repositories with indexable
values in the range [start, end]
Args:
start (model indexable type): start of desired indexable range
end (model indexable type): end of desired indexable range
Returns:
a list of sqlalchemy.ext.declarative.declarative_base objects
with indexable values within the given range
"""
retlist = self.db_session.query(self.MODEL)
if start is not None:
retlist = retlist.filter(self.MODEL.indexable >= start)
if end is not None:
retlist = retlist.filter(self.MODEL.indexable <= end)
return retlist
def db_partition_indices(self, partition_size):
"""Describe an index-space compartmentalization of the db table
in equal sized chunks. This is used to describe min&max bounds for
parallelizing fetch tasks.
Args:
partition_size (int): desired size to make each partition
Returns:
a list of tuples (begin, end) of indexable value that
declare approximately equal-sized ranges of existing
repos
"""
n = self.db_num_entries()
partitions = []
partition_size = min(partition_size, n)
prev_index = None
for i in range(0, n-1, partition_size):
# indexable column from the ith row
index = self.db_session.query(self.MODEL.indexable) \
.order_by(self.MODEL.indexable).offset(i).first()
if index is not None and prev_index is not None:
partitions.append((prev_index, index))
prev_index = index
partitions.append((prev_index, self.db_last_index()))
return partitions
def db_last_index(self):
"""Look in the db for the largest indexable value
Returns:
the largest indexable value of all repos in the db
"""
t = self.db_session.query(func.max(self.MODEL.indexable)).first()
if t:
return t[0]
else:
return None
def disable_deleted_repo_tasks(self, start, end, keep_these):
"""Disable tasks for repos that no longer exist between start and end.
Args:
start: beginning of range to disable
end: end of range to disable
keep_these (uid list): do not disable repos with uids in this list
"""
if end is None:
end = self.db_last_index()
if not self.is_within_bounds(end, None, self.max_index):
end = self.max_index
deleted_repos = self.winnow_models(
self.db_query_range(start, end), self.MODEL.uid, keep_these
)
tasks_to_disable = [repo.task_id for repo in deleted_repos
if repo.task_id is not None]
if tasks_to_disable:
self.scheduler.disable_tasks(tasks_to_disable)
for repo in deleted_repos:
repo.task_id = None
def run(self, min_bound=None, max_bound=None):
"""Main entry function. Sequentially fetches repository data
from the service according to the basic outline in the class
docstring, continually fetching sublists until either there
is no next index reference given or the given next index is greater
than the desired max_bound.
Args:
min_bound (indexable type): optional index to start from
max_bound (indexable type): optional index to stop at
Returns:
nothing
"""
index = min_bound or ''
loop_count = 0
self.min_index = min_bound
self.max_index = max_bound
while self.is_within_bounds(index, self.min_index, self.max_index):
logging.info('listing repos starting at %s' % index)
response, injected_repos = self.ingest_data(index)
+ if not response and not injected_repos:
+ logging.info('No response from api server, stopping')
+ break
next_index = self.get_next_target_from_response(response)
# Determine if any repos were deleted, and disable their tasks.
keep_these = [k for k in injected_repos.keys()]
self.disable_deleted_repo_tasks(index, next_index, keep_these)
# termination condition
if (next_index is None) or (next_index == index):
logging.info('stopping after index %s, no next link found' %
index)
break
else:
index = next_index
loop_count += 1
if loop_count == 20:
logging.info('flushing updates')
loop_count = 0
self.db_session.commit()
self.db_session = self.mk_session()
self.db_session.commit()
self.db_session = self.mk_session()
class SWHIndexingHttpLister(SWHListerHttpTransport, SWHIndexingLister):
"""Convenience class for ensuring right lookup and init order
when combining SWHIndexingLister and SWHListerHttpTransport."""
def __init__(self, api_baseurl=None, override_config=None):
SWHListerHttpTransport.__init__(self, api_baseurl=api_baseurl)
SWHIndexingLister.__init__(self, override_config=override_config)
diff --git a/swh/lister/core/lister_base.py b/swh/lister/core/lister_base.py
index b8f22f2..46ad64d 100644
--- a/swh/lister/core/lister_base.py
+++ b/swh/lister/core/lister_base.py
@@ -1,521 +1,524 @@
# Copyright (C) 2015-2018 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import datetime
import gzip
import logging
import os
import re
import time
from sqlalchemy import create_engine, func
from sqlalchemy.orm import sessionmaker
from swh.core import config
from swh.scheduler import get_scheduler
from swh.storage import get_storage
from .abstractattribute import AbstractAttribute
def utcnow():
return datetime.datetime.now(tz=datetime.timezone.utc)
class FetchError(RuntimeError):
def __init__(self, response):
self.response = response
def __str__(self):
return repr(self.response)
class SWHListerBase(abc.ABC, config.SWHConfig):
"""Lister core base class.
Generally a source code hosting service provides an API endpoint
for listing the set of stored repositories. A Lister is the discovery
service responsible for finding this list, all at once or sequentially
by parts, and queueing local tasks to fetch and ingest the referenced
repositories.
The core method in this class is ingest_data. Any subclasses should be
calling this method one or more times to fetch and ingest data from API
endpoints. See swh.lister.core.lister_base.SWHIndexingLister for
example usage.
This class cannot be instantiated. Any instantiable Lister descending
from SWHListerBase must provide at least the required overrides.
(see member docstrings for details):
Required Overrides:
MODEL
def transport_request
def transport_response_to_string
def transport_response_simplified
def transport_quota_check
Optional Overrides:
def filter_before_inject
def is_within_bounds
"""
MODEL = AbstractAttribute('Subclass type (not instance)'
' of swh.lister.core.models.ModelBase'
' customized for a specific service.')
LISTER_NAME = AbstractAttribute("Lister's name")
@abc.abstractmethod
def transport_request(self, identifier):
"""Given a target endpoint identifier to query, try once to request it.
Implementation of this method determines the network request protocol.
Args:
identifier (string): unique identifier for an endpoint query.
e.g. If the service indexes lists of repositories by date and
time of creation, this might be that as a formatted string. Or
it might be an integer UID. Or it might be nothing.
It depends on what the service needs.
Returns:
the entire request response
Raises:
Will catch internal transport-dependent connection exceptions and
raise swh.lister.core.lister_base.FetchError instead. Other
non-connection exceptions should propagate unchanged.
"""
pass
@abc.abstractmethod
def transport_response_to_string(self, response):
"""Convert the server response into a formatted string for logging.
Implementation of this method depends on the shape of the network
response object returned by the transport_request method.
Args:
response: the server response
Returns:
a pretty string of the response
"""
pass
@abc.abstractmethod
def transport_response_simplified(self, response):
"""Convert the server response into list of a dict for each repo in the
response, mapping columns in the lister's MODEL class to repo data.
Implementation of this method depends on the server API spec and the
shape of the network response object returned by the transport_request
method.
Args:
response: response object from the server.
Returns:
list of repo MODEL dicts
( eg. [{'uid': r['id'], etc.} for r in response.json()] )
"""
pass
@abc.abstractmethod
def transport_quota_check(self, response):
"""Check server response to see if we're hitting request rate limits.
Implementation of this method depends on the server communication
protocol and API spec and the shape of the network response object
returned by the transport_request method.
Args:
response (session response): complete API query response
Returns:
1) must retry request? True/False
2) seconds to delay if True
"""
pass
def filter_before_inject(self, models_list):
"""Function run after transport_response_simplified but before
injection into the local db and creation of workers. Can be
used to eliminate some of the results if necessary.
MAY BE OVERRIDDEN if an intermediate Lister class needs to filter
results before injection without requiring every child class to do so.
Args:
models_list: list of dicts returned by
transport_response_simplified.
Returns:
models_list with entries changed according to custom logic.
"""
return models_list
def do_additional_checks(self, models_list):
"""Execute some additional checks on the model list. For example, to
check for existing repositories in the db.
MAY BE OVERRIDDEN if an intermediate Lister class needs to
check some more the results before injection.
Checks are fine by default, returns the models_list as is by default.
Args:
models_list: list of dicts returned by
transport_response_simplified.
Returns:
models_list with entries if checks ok, False otherwise
"""
return models_list
def is_within_bounds(self, inner, lower=None, upper=None):
"""See if a sortable value is inside the range [lower,upper].
MAY BE OVERRIDDEN, for example if the server indexable* key is
technically sortable but not automatically so.
* - ( see: swh.lister.core.indexing_lister.SWHIndexingLister )
Args:
inner (sortable type): the value being checked
lower (sortable type): optional lower bound
upper (sortable type): optional upper bound
Returns:
whether inner is confined by the optional lower and upper bounds
"""
try:
if lower is None and upper is None:
return True
elif lower is None:
ret = inner <= upper
elif upper is None:
ret = inner >= lower
else:
ret = lower <= inner <= upper
self.string_pattern_check(inner, lower, upper)
except Exception as e:
logging.error(str(e) + ': %s, %s, %s' %
(('inner=%s%s' % (type(inner), inner)),
('lower=%s%s' % (type(lower), lower)),
('upper=%s%s' % (type(upper), upper)))
)
raise
return ret
# You probably don't need to override anything below this line.
DEFAULT_CONFIG = {
'storage': ('dict', {
'cls': 'remote',
'args': {
'url': 'http://localhost:5002/'
},
}),
'scheduler': ('dict', {
'cls': 'remote',
'args': {
'url': 'http://localhost:5008/'
},
})
}
@property
def CONFIG_BASE_FILENAME(self): # noqa: N802
return 'lister-%s' % self.LISTER_NAME
@property
def ADDITIONAL_CONFIG(self): # noqa: N802
return {
'lister_db_url':
('str', 'postgresql:///lister-%s' % self.LISTER_NAME),
'credentials':
('list[dict]', []),
'cache_responses':
('bool', False),
'cache_dir':
('str', '~/.cache/swh/lister/%s' % self.LISTER_NAME),
}
INITIAL_BACKOFF = 10
MAX_RETRIES = 7
CONN_SLEEP = 10
def __init__(self, override_config=None):
self.backoff = self.INITIAL_BACKOFF
self.config = self.parse_config_file(
base_filename=self.CONFIG_BASE_FILENAME,
additional_configs=[self.ADDITIONAL_CONFIG]
)
self.config['cache_dir'] = os.path.expanduser(self.config['cache_dir'])
if self.config['cache_responses']:
config.prepare_folders(self.config, 'cache_dir')
if override_config:
self.config.update(override_config)
self.storage = get_storage(**self.config['storage'])
self.scheduler = get_scheduler(**self.config['scheduler'])
self.db_engine = create_engine(self.config['lister_db_url'])
self.mk_session = sessionmaker(bind=self.db_engine)
self.db_session = self.mk_session()
def reset_backoff(self):
"""Reset exponential backoff timeout to initial level."""
self.backoff = self.INITIAL_BACKOFF
def back_off(self):
"""Get next exponential backoff timeout."""
ret = self.backoff
self.backoff *= 10
return ret
def safely_issue_request(self, identifier):
"""Make network request with retries, rate quotas, and response logs.
Protocol is handled by the implementation of the transport_request
method.
Args:
identifier: resource identifier
Returns:
server response
"""
retries_left = self.MAX_RETRIES
do_cache = self.config['cache_responses']
+ r = None
while retries_left > 0:
try:
r = self.transport_request(identifier)
except FetchError:
# network-level connection error, try again
logging.warn('connection error on %s: sleep for %d seconds' %
(identifier, self.CONN_SLEEP))
time.sleep(self.CONN_SLEEP)
retries_left -= 1
continue
if do_cache:
self.save_response(r)
# detect throttling
must_retry, delay = self.transport_quota_check(r)
if must_retry:
logging.warn('rate limited on %s: sleep for %f seconds' %
(identifier, delay))
time.sleep(delay)
else: # request ok
break
retries_left -= 1
if not retries_left:
logging.warn('giving up on %s: max retries exceeded' % identifier)
return r
def db_query_equal(self, key, value):
"""Look in the db for a row with key == value
Args:
key: column key to look at
value: value to look for in that column
Returns:
sqlalchemy.ext.declarative.declarative_base object
with the given key == value
"""
if isinstance(key, str):
key = self.MODEL.__dict__[key]
return self.db_session.query(self.MODEL) \
.filter(key == value).first()
def winnow_models(self, mlist, key, to_remove):
"""Given a list of models, remove any with <key> matching
some member of a list of values.
Args:
mlist (list of model rows): the initial list of models
key (column): the column to filter on
to_remove (list): if anything in mlist has column <key> equal to
one of the values in to_remove, it will be removed from the
result
Returns:
A list of model rows starting from mlist minus any matching rows
"""
if isinstance(key, str):
key = self.MODEL.__dict__[key]
if to_remove:
return mlist.filter(~key.in_(to_remove)).all()
else:
return mlist.all()
def db_num_entries(self):
"""Return the known number of entries in the lister db"""
return self.db_session.query(func.count('*')).select_from(self.MODEL) \
.scalar()
def db_inject_repo(self, model_dict):
"""Add/update a new repo to the db and mark it last_seen now.
Args:
model_dict: dictionary mapping model keys to values
Returns:
new or updated sqlalchemy.ext.declarative.declarative_base
object associated with the injection
"""
sql_repo = self.db_query_equal('uid', model_dict['uid'])
if not sql_repo:
sql_repo = self.MODEL(**model_dict)
self.db_session.add(sql_repo)
else:
for k in model_dict:
setattr(sql_repo, k, model_dict[k])
sql_repo.last_seen = utcnow()
return sql_repo
def origin_dict(self, origin_type, origin_url):
"""Return special dict format for the origins list
Args:
origin_type (string)
origin_url (string)
Returns:
the same information in a different form
"""
return {
'type': origin_type,
'url': origin_url,
}
def task_dict(self, origin_type, origin_url):
"""Return special dict format for the tasks list
Args:
origin_type (string)
origin_url (string)
Returns:
the same information in a different form
"""
return {
'type': 'origin-update-%s' % origin_type,
'arguments': {
'args': [
origin_url,
],
'kwargs': {},
},
'next_run': utcnow(),
}
def string_pattern_check(self, a, b, c=None):
"""When comparing indexable types in is_within_bounds, complex strings
may not be allowed to differ in basic structure. If they do, it
could be a sign of not understanding the data well. For instance,
an ISO 8601 time string cannot be compared against its urlencoded
equivalent, but this is an easy mistake to accidentally make. This
method acts as a friendly sanity check.
Args:
a (string): inner component of the is_within_bounds method
b (string): lower component of the is_within_bounds method
c (string): upper component of the is_within_bounds method
Returns:
nothing
Raises:
TypeError if strings a, b, and c don't conform to the same basic
pattern.
"""
if isinstance(a, str):
a_pattern = re.sub('[a-zA-Z0-9]',
'[a-zA-Z0-9]',
re.escape(a))
if (isinstance(b, str) and (re.match(a_pattern, b) is None)
or isinstance(c, str) and (re.match(a_pattern, c) is None)):
logging.debug(a_pattern)
raise TypeError('incomparable string patterns detected')
def inject_repo_data_into_db(self, models_list):
"""Inject data into the db.
Args:
models_list: list of dicts mapping keys from the db model
for each repo to be injected
Returns:
dict of uid:sql_repo pairs
"""
injected_repos = {}
for m in models_list:
injected_repos[m['uid']] = self.db_inject_repo(m)
return injected_repos
def create_missing_origins_and_tasks(self, models_list, injected_repos):
"""Find any newly created db entries that don't yet have tasks or
origin objects assigned.
Args:
models_list: a list of dicts mapping keys in the db model for
each repo
injected_repos: dict of uid:sql_repo pairs that have just
been created
Returns:
Nothing. Modifies injected_repos.
"""
for m in models_list:
ir = injected_repos[m['uid']]
if not ir.origin_id:
ir.origin_id = self.storage.origin_add_one(
self.origin_dict(m['origin_type'], m['origin_url'])
)
if not ir.task_id:
ir.task_id = self.scheduler.create_tasks(
[self.task_dict(m['origin_type'], m['origin_url'])]
)[0]['id']
def ingest_data(self, identifier, checks=False):
"""The core data fetch sequence. Request server endpoint. Simplify and
filter response list of repositories. Inject repo information into
local db. Queue loader tasks for linked repositories.
Args:
identifier: Resource identifier.
checks (bool): Additional checks required
"""
# Request (partial?) list of repositories info
response = self.safely_issue_request(identifier)
+ if not response:
+ return response, []
models_list = self.transport_response_simplified(response)
models_list = self.filter_before_inject(models_list)
if checks:
models_list = self.do_additional_checks(models_list)
if not models_list:
return response, []
# inject into local db
injected = self.inject_repo_data_into_db(models_list)
# queue workers
self.create_missing_origins_and_tasks(models_list, injected)
return response, injected
def save_response(self, response):
"""Log the response from a server request to a cache dir.
Args:
response: full server response
cache_dir: system path for cache dir
Returns:
nothing
"""
datepath = utcnow().isoformat()
fname = os.path.join(
self.config['cache_dir'],
datepath + '.gz',
)
with gzip.open(fname, 'w') as f:
f.write(bytes(
self.transport_response_to_string(response),
'UTF-8'
))
diff --git a/swh/lister/core/page_by_page_lister.py b/swh/lister/core/page_by_page_lister.py
index 8286b4a..3b9e27e 100644
--- a/swh/lister/core/page_by_page_lister.py
+++ b/swh/lister/core/page_by_page_lister.py
@@ -1,157 +1,160 @@
# Copyright (C) 2015-2018 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import logging
from .lister_transports import SWHListerHttpTransport
from .lister_base import SWHListerBase
class PageByPageLister(SWHListerBase):
"""Lister* intermediate class for any service that follows the simple
pagination page pattern.
- Client sends a request to list repositories starting from a
given page identifier.
- Client receives structured (json/xml/etc) response with
information about a sequential series of repositories (per page)
starting from a given index. And, if available, some indication
of the next page index for fetching the remaining repository
data.
See :class:`swh.lister.core.lister_base.SWHListerBase` for more
details.
This class cannot be instantiated. To create a new Lister for a
source code listing service that follows the model described
above, you must subclass this class. Then provide the required
overrides in addition to any unmet implementation/override
requirements of this class's base (see parent class and member
docstrings for details).
Required Overrides::
def get_next_target_from_response
"""
@abc.abstractmethod
def get_next_target_from_response(self, response):
"""Find the next server endpoint page given the entire response.
Implementation of this method depends on the server API spec
and the shape of the network response object returned by the
transport_request method.
For example, some api can use the headers links to provide the
next page.
Args:
response (transport response): response page from the server
Returns:
index of next page, possibly extracted from a next href url
"""
pass
@abc.abstractmethod
def get_pages_information(self):
"""Find the total number of pages.
Implementation of this method depends on the server API spec
and the shape of the network response object returned by the
transport_request method.
For example, some api can use dedicated headers:
- x-total-pages to provide the total number of pages
- x-total to provide the total number of repositories
- x-per-page to provide the number of elements per page
Returns:
tuple (total number of repositories, total number of
pages, per_page)
"""
pass
# You probably don't need to override anything below this line.
def do_additional_checks(self, models_list):
"""Potentially check for existence of repositories in models_list.
This will be called only if check_existence is flipped on in
the run method below.
"""
for m in models_list:
sql_repo = self.db_query_equal('uid', m['uid'])
if sql_repo:
return False
return models_list
def run(self, min_bound=None, max_bound=None, check_existence=False):
"""Main entry function. Sequentially fetches repository data from the
service according to the basic outline in the class
docstring. Continually fetching sublists until either there
is no next page reference given or the given next page is
greater than the desired max_page.
Args:
min_bound: optional page to start from
max_bound: optional page to stop at
check_existence (bool): optional existence check (for
incremental lister whose sort
order is inverted)
Returns:
nothing
"""
page = min_bound or 0
loop_count = 0
self.min_page = min_bound
self.max_page = max_bound
while self.is_within_bounds(page, self.min_page, self.max_page):
logging.info('listing repos starting at %s' % page)
response, injected_repos = self.ingest_data(page,
checks=check_existence)
- if not injected_repos:
+ if not response and not injected_repos:
+ logging.info('No response from api server, stopping')
+ break
+ elif not injected_repos:
logging.info('Repositories already seen, stopping')
break
next_page = self.get_next_target_from_response(response)
# termination condition
if (next_page is None) or (next_page == page):
logging.info('stopping after page %s, no next link found' %
page)
break
else:
page = next_page
loop_count += 1
if loop_count == 20:
logging.info('flushing updates')
loop_count = 0
self.db_session.commit()
self.db_session = self.mk_session()
self.db_session.commit()
self.db_session = self.mk_session()
class PageByPageHttpLister(SWHListerHttpTransport, PageByPageLister):
"""Convenience class for ensuring right lookup and init order when
combining PageByPageLister and SWHListerHttpTransport.
"""
def __init__(self, api_baseurl=None, override_config=None):
SWHListerHttpTransport.__init__(self, api_baseurl=api_baseurl)
PageByPageLister.__init__(self, override_config=override_config)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:23 PM (6 d, 8 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3314214
Attached To
rDLS Listers
Event Timeline
Log In to Comment