diff --git a/docs/tutorial.rst b/docs/tutorial.rst
index 8d91e86..c92325d 100644
--- a/docs/tutorial.rst
+++ b/docs/tutorial.rst
@@ -1,367 +1,367 @@
:orphan:
.. _lister-tutorial:
Tutorial: list the content of your favorite forge in just a few steps
=====================================================================
(the `original version
`_
of this article appeared on the Software Heritage blog)
Back in November 2016, Nicolas Dandrimont wrote about structural code changes
`leading to a massive (+15 million!) upswing in the number of repositories
archived by Software Heritage
`_
through a combination of automatic linkage between the listing and loading
scheduler, new understanding of how to deal with extremely large repository
hosts like `GitHub `_, and activating a new set of
repositories that had previously been skipped over.
In the post, Nicolas outlined the three major phases of work in Software
Heritage's preservation process (listing, scheduling updates, loading) and
highlighted that the ability to preserve the world's free software heritage
depends on our ability to find and list the repositories.
At the time, Software Heritage was only able to list projects on
GitHub. Focusing early on GitHub, one of the largest and most active forge in
the world, allowed for a big value-to-effort ratio and a rapid launch for the
archive. As the old Italian proverb goes, "Il meglio è nemico del bene," or in
modern English parlance, "Perfect is the enemy of good," right? Right. So the
plan from the beginning was to implement a lister for GitHub, then maybe
implement another one, and then take a few giant steps backward and squint our
eyes.
Why? Because source code hosting services don't behave according to a unified
standard. Each new service requires dedicated development time to implement a
new scraping client for the non-transferable requirements and intricacies of
that service's API. At the time, doing it in an extensible and adaptable way
required a level of exposure to the myriad differences between these services
that we just didn't think we had yet.
Nicolas' post closed by saying "We haven't carved out a stable API yet that
allows you to just fill in the blanks, as we only have the GitHub lister
currently, and a proven API will emerge organically only once we have some
diversity."
That has since changed. As of March 6, 2017, the Software Heritage **lister
code has been aggressively restructured, abstracted, and commented** to make
creating new listers significantly easier. There may yet be a few kinks to iron
out, but **now making a new lister is practically like filling in the blanks**.
Fundamentally, a basic lister must follow these steps:
1. Issue a network request for a service endpoint.
2. Convert the response into a canonical format.
3. Populate a work queue for fetching and ingesting source repositories.
Steps 1 and 3 are generic problems, so they can get generic solutions hidden
away in the base code, most of which never needs to change. That leaves us to
implement step 2, which can be trivially done now for services with a clean web
APIs.
In the new code, we've tried to hide away as much generic functionality as
possible, turning it into set-and-forget plumbing between a few simple
customized elements. Different hosting services might use different network
protocols, rate-limit messages, or pagination schemes, but, as long as there is
some way to get a list of the hosted repositories, we think that the new base
code will make getting those repositories much easier.
First, let me give you the 30,000 foot view…
The old GitHub-specific lister code looked like this (265 lines of Python):
.. figure:: images/old_github_lister.png
By contrast, the new GitHub-specific code looks like this (34 lines of Python):
.. figure:: images/new_github_lister.png
And the new BitBucket-specific code is even shorter and looks like this (24 lines of Python):
.. figure:: images/new_bitbucket_lister.png
And now this is common shared code in a few abstract base classes, with some new features and loads of docstring comments (in red):
.. figure:: images/new_base.png
So how does the lister code work now, and **how might a contributing developer
go about making a new one**
The first thing to know is that we now have a generic lister base class and ORM
model. A subclass of the lister base should already be able to do almost
everything needed to complete a listing task for a single service
request/response cycle with the following implementation requirements:
1. A member variable must be declared called ``MODEL``, which is equal to a
subclass (Note: type, not instance) of the base ORM model. The reasons for
using a subclass is mostly just because different services use different
incompatible primary identifiers for their repositories. The model
subclasses are typically only one or two additional variable declarations.
2. A method called ``transport_request`` must be implemented, which takes the
complete target identifier (e.g., a URL) and tries to request it one time
using whatever transport protocol is required for interacting with the
service. It should not attempt to retry on timeouts or do anything else with
the response (that is already done for you). It should just either return
the response or raise a ``FetchError`` exception.
3. A method called ``transport_response_to_string`` must be implemented, which
takes the entire response of the request in (1) and converts it to a string
for logging purposes.
4. A method called ``transport_quota_check`` must be implemented, which takes
the entire response of the request in (1) and checks to see if the process
has run afoul of any query quotas or rate limits. If the service says to
wait before making more requests, the method should return ``True`` and also
the number of seconds to wait, otherwise it returns ``False``.
5. A method called ``transport_response_simplified`` must be implemented, which
also takes the entire response of the request in (1) and converts it to a
Python list of dicts (one dict for each repository) with keys given
according to the aforementioned ``MODEL`` class members.
Because 1, 2, 3, and 4 are basically dependent only on the chosen network
protocol, we also have an HTTP mix-in module, which supplements the lister base
and provides default implementations for those methods along with optional
request header injection using the Python Requests library. The
``transport_quota_check`` method as provided follows the IETF standard for
communicating rate limits with `HTTP code 429
`_ which some hosting services
have chosen not to follow, so it's possible that a specific lister will need to
override it.
On top of all of that, we also provide another layer over the base lister class
which adds support for sequentially looping over indices. What are indices?
Well, some services (`BitBucket `_ and GitHub for
example) don't send you the entire list of all of their repositories at once,
because that server response would be unwieldy. Instead they paginate their
results, and they also allow you to query their APIs like this:
``https://server_address.tld/query_type?start_listing_from_id=foo``. Changing
the value of 'foo' lets you fetch a set of repositories starting from there. We
call 'foo' an index, and we call a service that works this way an indexing
service. GitHub uses the repository unique identifier and BitBucket uses the
repository creation time, but a service can really use anything as long as the
values monotonically increase with new repositories. A good indexing service
also includes the URL of the next page with a later 'foo' in its responses. For
these indexing services we provide another intermediate lister called the
-indexing lister. Instead of inheriting from :class:`SWHListerBase
-`, the lister class would inherit
-from :class:`SWHIndexingLister
-`. Along with the
+indexing lister. Instead of inheriting from :class:`ListerBase
+`, the lister class would inherit
+from :class:`IndexingLister
+`. Along with the
requirements of the lister base, the indexing lister base adds one extra
requirement:
1. A method called ``get_next_target_from_response`` must be defined, which
takes a complete request response and returns the index ('foo' above) of the
next page.
So those are all the basic requirements. There are, of course, a few other
little bits and pieces (covered for now in the code's docstring comments), but
for the most part that's it. It sounds like a lot of information to absorb and
implement, but remember that most of the implementation requirements mentioned
above are already provided for 99% of services by the HTTP mix-in module. It
looks much simpler when we look at the actual implementations of the two
new-style indexing listers we currently have…
When developing a new lister, it's important to test. For this, add the tests
(check `swh/lister/*/tests/`) and register the celery tasks in the main
conftest.py (`swh/lister/core/tests/conftest.py`).
Another important step is to actually run it within the
docker-dev (:ref:`run-lister-tutorial`).
This is the entire source code for the BitBucket repository lister::
# Copyright (C) 2017 the Software Heritage developers
# License: GNU General Public License version 3 or later
# See top-level LICENSE file for more information
from urllib import parse
from swh.lister.bitbucket.models import BitBucketModel
- from swh.lister.core.indexing_lister import SWHIndexingHttpLister
+ from swh.lister.core.indexing_lister import IndexingHttpLister
- class BitBucketLister(SWHIndexingHttpLister):
+ class BitBucketLister(IndexingHttpLister):
PATH_TEMPLATE = '/repositories?after=%s'
MODEL = BitBucketModel
def get_model_from_repo(self, repo):
return {'uid': repo['uuid'],
'indexable': repo['created_on'],
'name': repo['name'],
'full_name': repo['full_name'],
'html_url': repo['links']['html']['href'],
'origin_url': repo['links']['clone'][0]['href'],
'origin_type': repo['scm'],
'description': repo['description']}
def get_next_target_from_response(self, response):
body = response.json()
if 'next' in body:
return parse.unquote(body['next'].split('after=')[1])
else:
return None
def transport_response_simplified(self, response):
repos = response.json()['values']
return [self.get_model_from_repo(repo) for repo in repos]
And this is the entire source code for the GitHub repository lister::
# Copyright (C) 2017 the Software Heritage developers
# License: GNU General Public License version 3 or later
# See top-level LICENSE file for more information
import time
- from swh.lister.core.indexing_lister import SWHIndexingHttpLister
+ from swh.lister.core.indexing_lister import IndexingHttpLister
from swh.lister.github.models import GitHubModel
- class GitHubLister(SWHIndexingHttpLister):
+ class GitHubLister(IndexingHttpLister):
PATH_TEMPLATE = '/repositories?since=%d'
MODEL = GitHubModel
def get_model_from_repo(self, repo):
return {'uid': repo['id'],
'indexable': repo['id'],
'name': repo['name'],
'full_name': repo['full_name'],
'html_url': repo['html_url'],
'origin_url': repo['html_url'],
'origin_type': 'git',
'description': repo['description']}
def get_next_target_from_response(self, response):
if 'next' in response.links:
next_url = response.links['next']['url']
return int(next_url.split('since=')[1])
else:
return None
def transport_response_simplified(self, response):
repos = response.json()
return [self.get_model_from_repo(repo) for repo in repos]
def request_headers(self):
return {'Accept': 'application/vnd.github.v3+json'}
def transport_quota_check(self, response):
remain = int(response.headers['X-RateLimit-Remaining'])
if response.status_code == 403 and remain == 0:
reset_at = int(response.headers['X-RateLimit-Reset'])
delay = min(reset_at - time.time(), 3600)
return True, delay
else:
return False, 0
We can see that there are some common elements:
-* Both use the HTTP transport mixin (:class:`SWHIndexingHttpLister
- `) just combines
- :class:`SWHListerHttpTransport
- ` and
- :class:`SWHIndexingLister
- `) to get most of the
+* Both use the HTTP transport mixin (:class:`IndexingHttpLister
+ `) just combines
+ :class:`ListerHttpTransport
+ ` and
+ :class:`IndexingLister
+ `) to get most of the
network request functionality for free.
* Both also define ``MODEL`` and ``PATH_TEMPLATE`` variables. It should be
clear to developers that ``PATH_TEMPLATE``, when combined with the base
service URL (e.g., ``https://some_service.com``) and passed a value (the
'foo' index described earlier) results in a complete identifier for making
API requests to these services. It is required by our HTTP module.
* Both services respond using JSON, so both implementations of
``transport_response_simplified`` are similar and quite short.
We can also see that there are a few differences:
* GitHub sends the next URL as part of the response header, while BitBucket
sends it in the response body.
* GitHub differentiates API versions with a request header (our HTTP
transport mix-in will automatically use any headers provided by an
optional request_headers method that we implement here), while
BitBucket has it as part of their base service URL. BitBucket uses
the IETF standard HTTP 429 response code for their rate limit
notifications (the HTTP transport mix-in automatically handles
that), while GitHub uses their own custom response headers that need
special treatment.
* But look at them! 58 lines of Python code, combined, to absorb all
repositories from two of the largest and most influential source code hosting
services.
Ok, so what is going on behind the scenes?
To trace the operation of the code, let's start with a sample instantiation and
progress from there to see which methods get called when. What follows will be
a series of extremely reductionist pseudocode methods. This is not what the
code actually looks like (it's not even real code), but it does have the same
basic flow. Bear with me while I try to lay out lister operation in a
quasi-linear way…::
# main task
ghl = GitHubLister(lister_name='github.com',
api_baseurl='https://github.com')
ghl.run()
-⇓ (SWHIndexingLister.run)::
+⇓ (IndexingLister.run)::
- # SWHIndexingLister.run
+ # IndexingLister.run
identifier = None
do
- response, repos = SWHListerBase.ingest_data(identifier)
+ response, repos = ListerBase.ingest_data(identifier)
identifier = GitHubLister.get_next_target_from_response(response)
while(identifier)
-⇓ (SWHListerBase.ingest_data)::
+⇓ (ListerBase.ingest_data)::
- # SWHListerBase.ingest_data
+ # ListerBase.ingest_data
- response = SWHListerBase.safely_issue_request(identifier)
+ response = ListerBase.safely_issue_request(identifier)
repos = GitHubLister.transport_response_simplified(response)
- injected = SWHListerBase.inject_repo_data_into_db(repos)
+ injected = ListerBase.inject_repo_data_into_db(repos)
return response, injected
-⇓ (SWHListerBase.safely_issue_request)::
+⇓ (ListerBase.safely_issue_request)::
- # SWHListerBase.safely_issue_request
+ # ListerBase.safely_issue_request
repeat:
- resp = SWHListerHttpTransport.transport_request(identifier)
- retry, delay = SWHListerHttpTransport.transport_quota_check(resp)
+ resp = ListerHttpTransport.transport_request(identifier)
+ retry, delay = ListerHttpTransport.transport_quota_check(resp)
if retry:
sleep(delay)
until((not retry) or too_many_retries)
return resp
-⇓ (SWHListerHttpTransport.transport_request)::
+⇓ (ListerHttpTransport.transport_request)::
- # SWHListerHttpTransport.transport_request
+ # ListerHttpTransport.transport_request
- path = SWHListerBase.api_baseurl
- + SWHListerHttpTransport.PATH_TEMPLATE % identifier
- headers = SWHListerHttpTransport.request_headers()
+ path = ListerBase.api_baseurl
+ + ListerHttpTransport.PATH_TEMPLATE % identifier
+ headers = ListerHttpTransport.request_headers()
return http.get(path, headers)
(Oh look, there's our ``PATH_TEMPLATE``)
-⇓ (SWHListerHttpTransport.request_headers)::
+⇓ (ListerHttpTransport.request_headers)::
- # SWHListerHttpTransport.request_headers
+ # ListerHttpTransport.request_headers
override → GitHubLister.request_headers
-↑↑ (SWHListerBase.safely_issue_request)
+↑↑ (ListerBase.safely_issue_request)
-⇓ (SWHListerHttpTransport.transport_quota_check)::
+⇓ (ListerHttpTransport.transport_quota_check)::
- # SWHListerHttpTransport.transport_quota_check
+ # ListerHttpTransport.transport_quota_check
override → GitHubLister.transport_quota_check
And then we're done. From start to finish, I hope this helps you understand how
the few customized pieces fit into the new shared plumbing.
Now you can go and write up a lister for a code hosting site we don't have yet!
diff --git a/swh/lister/bitbucket/lister.py b/swh/lister/bitbucket/lister.py
index e674cc4..a51fea7 100644
--- a/swh/lister/bitbucket/lister.py
+++ b/swh/lister/bitbucket/lister.py
@@ -1,71 +1,69 @@
# Copyright (C) 2017-2019 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from urllib import parse
import logging
import iso8601
from swh.lister.bitbucket.models import BitBucketModel
-from swh.lister.core.indexing_lister import SWHIndexingHttpLister
+from swh.lister.core.indexing_lister import IndexingHttpLister
logger = logging.getLogger(__name__)
-class BitBucketLister(SWHIndexingHttpLister):
+class BitBucketLister(IndexingHttpLister):
PATH_TEMPLATE = '/repositories?after=%s'
MODEL = BitBucketModel
LISTER_NAME = 'bitbucket'
instance = 'bitbucket'
def get_model_from_repo(self, repo):
return {
'uid': repo['uuid'],
'indexable': repo['created_on'],
'name': repo['name'],
'full_name': repo['full_name'],
'html_url': repo['links']['html']['href'],
'origin_url': repo['links']['clone'][0]['href'],
'origin_type': repo['scm'],
}
def get_next_target_from_response(self, response):
body = response.json()
if 'next' in body:
return parse.unquote(body['next'].split('after=')[1])
- else:
- return None
def transport_response_simplified(self, response):
repos = response.json()['values']
return [self.get_model_from_repo(repo) for repo in repos]
def request_uri(self, identifier):
return super().request_uri(identifier or '1970-01-01')
def is_within_bounds(self, inner, lower=None, upper=None):
# values are expected to be str dates
try:
inner = iso8601.parse_date(inner)
if lower:
lower = iso8601.parse_date(lower)
if upper:
upper = iso8601.parse_date(upper)
if lower is None and upper is None:
return True
elif lower is None:
ret = inner <= upper
elif upper is None:
ret = inner >= lower
else:
ret = lower <= inner <= upper
except Exception as e:
logger.error(str(e) + ': %s, %s, %s' %
(('inner=%s%s' % (type(inner), inner)),
('lower=%s%s' % (type(lower), lower)),
('upper=%s%s' % (type(upper), upper)))
)
raise
return ret
diff --git a/swh/lister/core/indexing_lister.py b/swh/lister/core/indexing_lister.py
index b98e737..3cc545f 100644
--- a/swh/lister/core/indexing_lister.py
+++ b/swh/lister/core/indexing_lister.py
@@ -1,242 +1,242 @@
# Copyright (C) 2015-2017 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import logging
from itertools import count
import dateutil
from sqlalchemy import func
-from .lister_transports import SWHListerHttpTransport
-from .lister_base import SWHListerBase
+from .lister_transports import ListerHttpTransport
+from .lister_base import ListerBase
logger = logging.getLogger(__name__)
-class SWHIndexingLister(SWHListerBase):
+class IndexingLister(ListerBase):
"""Lister* intermediate class for any service that follows the pattern:
- The service must report at least one stable unique identifier, known
herein as the UID value, for every listed repository.
- If the service splits the list of repositories into sublists, it must
report at least one stable and sorted index identifier for every listed
repository, known herein as the indexable value, which can be used as
part of the service endpoint query to request a sublist beginning from
that index. This might be the UID if the UID is monotonic.
- Client sends a request to list repositories starting from a given
index.
- Client receives structured (json/xml/etc) response with information about
a sequential series of repositories starting from that index and, if
necessary/available, some indication of the URL or index for fetching the
next series of repository data.
- See :class:`swh.lister.core.lister_base.SWHListerBase` for more details.
+ See :class:`swh.lister.core.lister_base.ListerBase` for more details.
This class cannot be instantiated. To create a new Lister for a source
code listing service that follows the model described above, you must
subclass this class and provide the required overrides in addition to
any unmet implementation/override requirements of this class's base.
(see parent class and member docstrings for details)
Required Overrides::
def get_next_target_from_response
"""
@abc.abstractmethod
def get_next_target_from_response(self, response):
"""Find the next server endpoint identifier given the entire response.
Implementation of this method depends on the server API spec
and the shape of the network response object returned by the
transport_request method.
Args:
response (transport response): response page from the server
Returns:
index of next page, possibly extracted from a next href url
"""
pass
# You probably don't need to override anything below this line.
def filter_before_inject(self, models_list):
- """Overrides SWHListerBase.filter_before_inject
+ """Overrides ListerBase.filter_before_inject
Bounds query results by this Lister's set max_index.
"""
models_list = [
m for m in models_list
if self.is_within_bounds(m['indexable'], None, self.max_index)
]
return models_list
def db_query_range(self, start, end):
"""Look in the db for a range of repositories with indexable
values in the range [start, end]
Args:
start (model indexable type): start of desired indexable range
end (model indexable type): end of desired indexable range
Returns:
a list of sqlalchemy.ext.declarative.declarative_base objects
with indexable values within the given range
"""
retlist = self.db_session.query(self.MODEL)
if start is not None:
retlist = retlist.filter(self.MODEL.indexable >= start)
if end is not None:
retlist = retlist.filter(self.MODEL.indexable <= end)
return retlist
def db_partition_indices(self, partition_size):
"""Describe an index-space compartmentalization of the db table
in equal sized chunks. This is used to describe min&max bounds for
parallelizing fetch tasks.
Args:
partition_size (int): desired size to make each partition
Returns:
a list of tuples (begin, end) of indexable value that
declare approximately equal-sized ranges of existing
repos
"""
n = max(self.db_num_entries(), 10)
partition_size = min(partition_size, n)
n_partitions = n // partition_size
min_index = self.db_first_index()
max_index = self.db_last_index()
if not min_index or not max_index:
raise ValueError("Can't partition an empty range")
if isinstance(min_index, str):
def format_bound(bound):
return bound.isoformat()
min_index = dateutil.parser.parse(min_index)
max_index = dateutil.parser.parse(max_index)
else:
def format_bound(bound):
return bound
partition_width = (max_index - min_index) / n_partitions
partitions = [
[
format_bound(min_index + i * partition_width),
format_bound(min_index + (i+1) * partition_width),
] for i in range(n_partitions)
]
# Remove bounds for lowest and highest partition
partitions[0][0] = None
partitions[-1][1] = None
return [tuple(partition) for partition in partitions]
def db_first_index(self):
"""Look in the db for the smallest indexable value
Returns:
the smallest indexable value of all repos in the db
"""
t = self.db_session.query(func.min(self.MODEL.indexable)).first()
if t:
return t[0]
else:
return None
def db_last_index(self):
"""Look in the db for the largest indexable value
Returns:
the largest indexable value of all repos in the db
"""
t = self.db_session.query(func.max(self.MODEL.indexable)).first()
if t:
return t[0]
else:
return None
def disable_deleted_repo_tasks(self, start, end, keep_these):
"""Disable tasks for repos that no longer exist between start and end.
Args:
start: beginning of range to disable
end: end of range to disable
keep_these (uid list): do not disable repos with uids in this list
"""
if end is None:
end = self.db_last_index()
if not self.is_within_bounds(end, None, self.max_index):
end = self.max_index
deleted_repos = self.winnow_models(
self.db_query_range(start, end), self.MODEL.uid, keep_these
)
tasks_to_disable = [repo.task_id for repo in deleted_repos
if repo.task_id is not None]
if tasks_to_disable:
self.scheduler.disable_tasks(tasks_to_disable)
for repo in deleted_repos:
repo.task_id = None
def run(self, min_bound=None, max_bound=None):
"""Main entry function. Sequentially fetches repository data
from the service according to the basic outline in the class
docstring, continually fetching sublists until either there
is no next index reference given or the given next index is greater
than the desired max_bound.
Args:
min_bound (indexable type): optional index to start from
max_bound (indexable type): optional index to stop at
Returns:
nothing
"""
self.min_index = min_bound
self.max_index = max_bound
def ingest_indexes():
index = min_bound or ''
for i in count(1):
response, injected_repos = self.ingest_data(index)
if not response and not injected_repos:
logger.info('No response from api server, stopping')
return
next_index = self.get_next_target_from_response(response)
# Determine if any repos were deleted, and disable their tasks.
keep_these = list(injected_repos.keys())
self.disable_deleted_repo_tasks(index, next_index, keep_these)
# termination condition
if next_index is None or next_index == index:
logger.info('stopping after index %s, no next link found' %
index)
return
index = next_index
yield i
for i in ingest_indexes():
if (i % 20) == 0:
logger.info('flushing updates')
self.db_session.commit()
self.db_session = self.mk_session()
self.db_session.commit()
self.db_session = self.mk_session()
-class SWHIndexingHttpLister(SWHListerHttpTransport, SWHIndexingLister):
+class IndexingHttpLister(ListerHttpTransport, IndexingLister):
"""Convenience class for ensuring right lookup and init order
- when combining SWHIndexingLister and SWHListerHttpTransport."""
+ when combining IndexingLister and ListerHttpTransport."""
def __init__(self, api_baseurl=None, override_config=None):
- SWHListerHttpTransport.__init__(self, api_baseurl=api_baseurl)
- SWHIndexingLister.__init__(self, override_config=override_config)
+ ListerHttpTransport.__init__(self, api_baseurl=api_baseurl)
+ IndexingLister.__init__(self, override_config=override_config)
diff --git a/swh/lister/core/lister_base.py b/swh/lister/core/lister_base.py
index e21003c..66bfd0e 100644
--- a/swh/lister/core/lister_base.py
+++ b/swh/lister/core/lister_base.py
@@ -1,517 +1,517 @@
# Copyright (C) 2015-2018 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import datetime
import gzip
import json
import logging
import os
import re
import time
from sqlalchemy import create_engine, func
from sqlalchemy.orm import sessionmaker
from swh.core import config
from swh.scheduler import get_scheduler, utils
from swh.storage import get_storage
from .abstractattribute import AbstractAttribute
logger = logging.getLogger(__name__)
def utcnow():
return datetime.datetime.now(tz=datetime.timezone.utc)
class FetchError(RuntimeError):
def __init__(self, response):
self.response = response
def __str__(self):
return repr(self.response)
-class SWHListerBase(abc.ABC, config.SWHConfig):
+class ListerBase(abc.ABC, config.SWHConfig):
"""Lister core base class.
Generally a source code hosting service provides an API endpoint
for listing the set of stored repositories. A Lister is the discovery
service responsible for finding this list, all at once or sequentially
by parts, and queueing local tasks to fetch and ingest the referenced
repositories.
The core method in this class is ingest_data. Any subclasses should be
calling this method one or more times to fetch and ingest data from API
- endpoints. See swh.lister.core.lister_base.SWHIndexingLister for
+ endpoints. See swh.lister.core.lister_base.IndexingLister for
example usage.
This class cannot be instantiated. Any instantiable Lister descending
- from SWHListerBase must provide at least the required overrides.
+ from ListerBase must provide at least the required overrides.
(see member docstrings for details):
Required Overrides:
MODEL
def transport_request
def transport_response_to_string
def transport_response_simplified
def transport_quota_check
Optional Overrides:
def filter_before_inject
def is_within_bounds
"""
MODEL = AbstractAttribute('Subclass type (not instance)'
' of swh.lister.core.models.ModelBase'
' customized for a specific service.')
LISTER_NAME = AbstractAttribute("Lister's name")
def transport_request(self, identifier):
"""Given a target endpoint identifier to query, try once to request it.
Implementation of this method determines the network request protocol.
Args:
identifier (string): unique identifier for an endpoint query.
e.g. If the service indexes lists of repositories by date and
time of creation, this might be that as a formatted string. Or
it might be an integer UID. Or it might be nothing.
It depends on what the service needs.
Returns:
the entire request response
Raises:
Will catch internal transport-dependent connection exceptions and
raise swh.lister.core.lister_base.FetchError instead. Other
non-connection exceptions should propagate unchanged.
"""
pass
def transport_response_to_string(self, response):
"""Convert the server response into a formatted string for logging.
Implementation of this method depends on the shape of the network
response object returned by the transport_request method.
Args:
response: the server response
Returns:
a pretty string of the response
"""
pass
def transport_response_simplified(self, response):
"""Convert the server response into list of a dict for each repo in the
response, mapping columns in the lister's MODEL class to repo data.
Implementation of this method depends on the server API spec and the
shape of the network response object returned by the transport_request
method.
Args:
response: response object from the server.
Returns:
list of repo MODEL dicts
( eg. [{'uid': r['id'], etc.} for r in response.json()] )
"""
pass
def transport_quota_check(self, response):
"""Check server response to see if we're hitting request rate limits.
Implementation of this method depends on the server communication
protocol and API spec and the shape of the network response object
returned by the transport_request method.
Args:
response (session response): complete API query response
Returns:
1) must retry request? True/False
2) seconds to delay if True
"""
pass
def filter_before_inject(self, models_list):
"""Filter models_list entries prior to injection in the db.
This is ran directly after `transport_response_simplified`.
Default implementation is to have no filtering.
Args:
models_list: list of dicts returned by
transport_response_simplified.
Returns:
models_list with entries changed according to custom logic.
"""
return models_list
def do_additional_checks(self, models_list):
"""Execute some additional checks on the model list (after the
filtering).
Default implementation is to run no check at all and to return
the input as is.
Args:
models_list: list of dicts returned by
transport_response_simplified.
Returns:
models_list with entries if checks ok, False otherwise
"""
return models_list
def is_within_bounds(self, inner, lower=None, upper=None):
"""See if a sortable value is inside the range [lower,upper].
MAY BE OVERRIDDEN, for example if the server indexable* key is
technically sortable but not automatically so.
- * - ( see: swh.lister.core.indexing_lister.SWHIndexingLister )
+ * - ( see: swh.lister.core.indexing_lister.IndexingLister )
Args:
inner (sortable type): the value being checked
lower (sortable type): optional lower bound
upper (sortable type): optional upper bound
Returns:
whether inner is confined by the optional lower and upper bounds
"""
try:
if lower is None and upper is None:
return True
elif lower is None:
ret = inner <= upper
elif upper is None:
ret = inner >= lower
else:
ret = lower <= inner <= upper
self.string_pattern_check(inner, lower, upper)
except Exception as e:
logger.error(str(e) + ': %s, %s, %s' %
(('inner=%s%s' % (type(inner), inner)),
('lower=%s%s' % (type(lower), lower)),
('upper=%s%s' % (type(upper), upper)))
)
raise
return ret
# You probably don't need to override anything below this line.
DEFAULT_CONFIG = {
'storage': ('dict', {
'cls': 'remote',
'args': {
'url': 'http://localhost:5002/'
},
}),
'scheduler': ('dict', {
'cls': 'remote',
'args': {
'url': 'http://localhost:5008/'
},
}),
'lister': ('dict', {
'cls': 'local',
'args': {
'db': 'postgresql:///lister',
},
}),
}
@property
def CONFIG_BASE_FILENAME(self): # noqa: N802
return 'lister_%s' % self.LISTER_NAME
@property
def ADDITIONAL_CONFIG(self): # noqa: N802
return {
'credentials': ('dict', {}),
'cache_responses': ('bool', False),
'cache_dir': ('str', '~/.cache/swh/lister/%s' % self.LISTER_NAME),
}
INITIAL_BACKOFF = 10
MAX_RETRIES = 7
CONN_SLEEP = 10
def __init__(self, override_config=None):
self.backoff = self.INITIAL_BACKOFF
logger.debug('Loading config from %s' % self.CONFIG_BASE_FILENAME)
self.config = self.parse_config_file(
base_filename=self.CONFIG_BASE_FILENAME,
additional_configs=[self.ADDITIONAL_CONFIG]
)
self.config['cache_dir'] = os.path.expanduser(self.config['cache_dir'])
if self.config['cache_responses']:
config.prepare_folders(self.config, 'cache_dir')
if override_config:
self.config.update(override_config)
logger.debug('%s CONFIG=%s' % (self, self.config))
self.storage = get_storage(**self.config['storage'])
self.scheduler = get_scheduler(**self.config['scheduler'])
self.db_engine = create_engine(self.config['lister']['args']['db'])
self.mk_session = sessionmaker(bind=self.db_engine)
self.db_session = self.mk_session()
def reset_backoff(self):
"""Reset exponential backoff timeout to initial level."""
self.backoff = self.INITIAL_BACKOFF
def back_off(self):
"""Get next exponential backoff timeout."""
ret = self.backoff
self.backoff *= 10
return ret
def safely_issue_request(self, identifier):
"""Make network request with retries, rate quotas, and response logs.
Protocol is handled by the implementation of the transport_request
method.
Args:
identifier: resource identifier
Returns:
server response
"""
retries_left = self.MAX_RETRIES
do_cache = self.config['cache_responses']
r = None
while retries_left > 0:
try:
r = self.transport_request(identifier)
except FetchError:
# network-level connection error, try again
logger.warning(
'connection error on %s: sleep for %d seconds' %
(identifier, self.CONN_SLEEP))
time.sleep(self.CONN_SLEEP)
retries_left -= 1
continue
if do_cache:
self.save_response(r)
# detect throttling
must_retry, delay = self.transport_quota_check(r)
if must_retry:
logger.warning(
'rate limited on %s: sleep for %f seconds' %
(identifier, delay))
time.sleep(delay)
else: # request ok
break
retries_left -= 1
if not retries_left:
logger.warning(
'giving up on %s: max retries exceeded' % identifier)
return r
def db_query_equal(self, key, value):
"""Look in the db for a row with key == value
Args:
key: column key to look at
value: value to look for in that column
Returns:
sqlalchemy.ext.declarative.declarative_base object
with the given key == value
"""
if isinstance(key, str):
key = self.MODEL.__dict__[key]
return self.db_session.query(self.MODEL) \
.filter(key == value).first()
def winnow_models(self, mlist, key, to_remove):
"""Given a list of models, remove any with matching
some member of a list of values.
Args:
mlist (list of model rows): the initial list of models
key (column): the column to filter on
to_remove (list): if anything in mlist has column equal to
one of the values in to_remove, it will be removed from the
result
Returns:
A list of model rows starting from mlist minus any matching rows
"""
if isinstance(key, str):
key = self.MODEL.__dict__[key]
if to_remove:
return mlist.filter(~key.in_(to_remove)).all()
else:
return mlist.all()
def db_num_entries(self):
"""Return the known number of entries in the lister db"""
return self.db_session.query(func.count('*')).select_from(self.MODEL) \
.scalar()
def db_inject_repo(self, model_dict):
"""Add/update a new repo to the db and mark it last_seen now.
Args:
model_dict: dictionary mapping model keys to values
Returns:
new or updated sqlalchemy.ext.declarative.declarative_base
object associated with the injection
"""
sql_repo = self.db_query_equal('uid', model_dict['uid'])
if not sql_repo:
sql_repo = self.MODEL(**model_dict)
self.db_session.add(sql_repo)
else:
for k in model_dict:
setattr(sql_repo, k, model_dict[k])
sql_repo.last_seen = utcnow()
return sql_repo
def task_dict(self, origin_type, origin_url, **kwargs):
"""Return special dict format for the tasks list
Args:
origin_type (string)
origin_url (string)
Returns:
the same information in a different form
"""
_type = 'load-%s' % origin_type
_policy = 'recurring'
return utils.create_task_dict(_type, _policy, origin_url)
def string_pattern_check(self, a, b, c=None):
"""When comparing indexable types in is_within_bounds, complex strings
may not be allowed to differ in basic structure. If they do, it
could be a sign of not understanding the data well. For instance,
an ISO 8601 time string cannot be compared against its urlencoded
equivalent, but this is an easy mistake to accidentally make. This
method acts as a friendly sanity check.
Args:
a (string): inner component of the is_within_bounds method
b (string): lower component of the is_within_bounds method
c (string): upper component of the is_within_bounds method
Returns:
nothing
Raises:
TypeError if strings a, b, and c don't conform to the same basic
pattern.
"""
if isinstance(a, str):
a_pattern = re.sub('[a-zA-Z0-9]',
'[a-zA-Z0-9]',
re.escape(a))
if (isinstance(b, str) and (re.match(a_pattern, b) is None)
or isinstance(c, str) and (re.match(a_pattern, c) is None)):
logger.debug(a_pattern)
raise TypeError('incomparable string patterns detected')
def inject_repo_data_into_db(self, models_list):
"""Inject data into the db.
Args:
models_list: list of dicts mapping keys from the db model
for each repo to be injected
Returns:
dict of uid:sql_repo pairs
"""
injected_repos = {}
for m in models_list:
injected_repos[m['uid']] = self.db_inject_repo(m)
return injected_repos
def schedule_missing_tasks(self, models_list, injected_repos):
"""Find any newly created db entries that do not have been scheduled
yet.
Args:
models_list ([Model]): List of dicts mapping keys in the db model
for each repo
injected_repos ([dict]): Dict of uid:sql_repo pairs that have just
been created
Returns:
Nothing. Modifies injected_repos.
"""
tasks = {}
def _task_key(m):
return '%s-%s' % (
m['type'],
json.dumps(m['arguments'], sort_keys=True)
)
for m in models_list:
ir = injected_repos[m['uid']]
if not ir.task_id:
task_dict = self.task_dict(**m)
tasks[_task_key(task_dict)] = (ir, m, task_dict)
new_tasks = self.scheduler.create_tasks(
(task_dicts for (_, _, task_dicts) in tasks.values()))
for task in new_tasks:
ir, m, _ = tasks[_task_key(task)]
ir.task_id = task['id']
def ingest_data(self, identifier, checks=False):
"""The core data fetch sequence. Request server endpoint. Simplify and
filter response list of repositories. Inject repo information into
local db. Queue loader tasks for linked repositories.
Args:
identifier: Resource identifier.
checks (bool): Additional checks required
"""
# Request (partial?) list of repositories info
response = self.safely_issue_request(identifier)
if not response:
return response, []
models_list = self.transport_response_simplified(response)
models_list = self.filter_before_inject(models_list)
if checks:
models_list = self.do_additional_checks(models_list)
if not models_list:
return response, []
# inject into local db
injected = self.inject_repo_data_into_db(models_list)
# queue workers
self.schedule_missing_tasks(models_list, injected)
return response, injected
def save_response(self, response):
"""Log the response from a server request to a cache dir.
Args:
response: full server response
cache_dir: system path for cache dir
Returns:
nothing
"""
datepath = utcnow().isoformat()
fname = os.path.join(
self.config['cache_dir'],
datepath + '.gz',
)
with gzip.open(fname, 'w') as f:
f.write(bytes(
self.transport_response_to_string(response),
'UTF-8'
))
diff --git a/swh/lister/core/lister_transports.py b/swh/lister/core/lister_transports.py
index a2202bd..fac3b7e 100644
--- a/swh/lister/core/lister_transports.py
+++ b/swh/lister/core/lister_transports.py
@@ -1,222 +1,222 @@
# Copyright (C) 2017-2018 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import random
from datetime import datetime
from email.utils import parsedate
from pprint import pformat
import logging
import requests
import xmltodict
try:
from swh.lister._version import __version__
except ImportError:
__version__ = 'devel'
from .abstractattribute import AbstractAttribute
from .lister_base import FetchError
logger = logging.getLogger(__name__)
-class SWHListerHttpTransport(abc.ABC):
+class ListerHttpTransport(abc.ABC):
"""Use the Requests library for making Lister endpoint requests.
- To be used in conjunction with SWHListerBase or a subclass of it.
+ To be used in conjunction with ListerBase or a subclass of it.
"""
PATH_TEMPLATE = AbstractAttribute('string containing a python string'
' format pattern that produces the API'
' endpoint path for listing stored'
' repositories when given an index.'
' eg. "/repositories?after=%s".'
'To be implemented in the API-specific'
' class inheriting this.')
EXPECTED_STATUS_CODES = (200, 429, 403, 404)
def request_headers(self):
"""Returns dictionary of any request headers needed by the server.
MAY BE OVERRIDDEN if request headers are needed.
"""
return {
'User-Agent': 'Software Heritage lister (%s)' % self.lister_version
}
def request_instance_credentials(self):
"""Returns dictionary of any credentials configuration needed by the
forge instance to list.
Returns:
dict of credentials per instance lister or {} if none.
"""
all_creds = self.config.get('credentials')
if not all_creds:
return {}
lister_creds = all_creds.get(self.LISTER_NAME, {})
creds = lister_creds.get(self.instance, {})
return creds
def request_uri(self, identifier):
"""Get the full request URI given the transport_request identifier.
MAY BE OVERRIDDEN if something more complex than the PATH_TEMPLATE is
required.
"""
path = self.PATH_TEMPLATE % identifier
return self.api_baseurl + path
def request_params(self, identifier):
"""Get the full parameters passed to requests given the
transport_request identifier.
This uses credentials if any are provided. The 'credentials'
configuration is expected to be a dict of multiple levels. The first
level is the lister's name, the second is the lister's instance name.
For example:
credentials:
github: # github lister
github: # has only one instance (so far)
- username: some
password: somekey
- username: one
password: onekey
- ...
gitlab: # gitlab lister
riseup: # has many instances
- username: someone
password: ...
- ...
gitlab:
- username: someone
password: ...
- ...
MAY BE OVERRIDDEN if something more complex than the request headers
is needed.
"""
params = {}
params['headers'] = self.request_headers() or {}
creds = self.request_instance_credentials()
if not creds:
return params
auth = random.choice(creds) if creds else None
if auth:
params['auth'] = (auth['username'], auth['password'])
return params
def transport_quota_check(self, response):
- """Implements SWHListerBase.transport_quota_check with standard 429
+ """Implements ListerBase.transport_quota_check with standard 429
code check for HTTP with Requests library.
MAY BE OVERRIDDEN if the server notifies about rate limits in a
non-standard way that doesn't use HTTP 429 and the Retry-After
response header. ( https://tools.ietf.org/html/rfc6585#section-4 )
"""
if response.status_code == 429: # HTTP too many requests
retry_after = response.headers.get('Retry-After', self.back_off())
try:
# might be seconds
return True, float(retry_after)
except Exception:
# might be http-date
at_date = datetime(*parsedate(retry_after)[:6])
from_now = (at_date - datetime.today()).total_seconds() + 5
return True, max(0, from_now)
else: # response ok
self.reset_backoff()
return False, 0
def __init__(self, api_baseurl=None):
if not api_baseurl:
raise NameError('HTTP Lister Transport requires api_baseurl.')
self.api_baseurl = api_baseurl # eg. 'https://api.github.com'
self.session = requests.Session()
self.lister_version = __version__
def _transport_action(self, identifier, method='get'):
"""Permit to ask information to the api prior to actually executing
query.
"""
path = self.request_uri(identifier)
params = self.request_params(identifier)
try:
if method == 'head':
response = self.session.head(path, **params)
else:
response = self.session.get(path, **params)
except requests.exceptions.ConnectionError as e:
logger.warning('Failed to fetch %s: %s', path, e)
raise FetchError(e)
else:
if response.status_code not in self.EXPECTED_STATUS_CODES:
raise FetchError(response)
return response
def transport_head(self, identifier):
"""Retrieve head information on api.
"""
return self._transport_action(identifier, method='head')
def transport_request(self, identifier):
- """Implements SWHListerBase.transport_request for HTTP using Requests.
+ """Implements ListerBase.transport_request for HTTP using Requests.
Retrieve get information on api.
"""
return self._transport_action(identifier)
def transport_response_to_string(self, response):
- """Implements SWHListerBase.transport_response_to_string for HTTP given
+ """Implements ListerBase.transport_response_to_string for HTTP given
Requests responses.
"""
s = pformat(response.request.path_url)
s += '\n#\n' + pformat(response.request.headers)
s += '\n#\n' + pformat(response.status_code)
s += '\n#\n' + pformat(response.headers)
s += '\n#\n'
try: # json?
s += pformat(response.json())
except Exception: # not json
try: # xml?
s += pformat(xmltodict.parse(response.text))
except Exception: # not xml
s += pformat(response.text)
return s
-class ListerOnePageApiTransport(SWHListerHttpTransport):
+class ListerOnePageApiTransport(ListerHttpTransport):
"""Leverage requests library to retrieve basic html page and parse
result.
- To be used in conjunction with SWHListerBase or a subclass of it.
+ To be used in conjunction with ListerBase or a subclass of it.
"""
PAGE = AbstractAttribute("The server api's unique page to retrieve and "
"parse for information")
PATH_TEMPLATE = None # we do not use it
def __init__(self, api_baseurl=None):
self.session = requests.Session()
self.lister_version = __version__
def request_uri(self, _):
"""Get the full request URI given the transport_request identifier.
"""
return self.PAGE
diff --git a/swh/lister/core/page_by_page_lister.py b/swh/lister/core/page_by_page_lister.py
index 3b9e27e..f05b3a5 100644
--- a/swh/lister/core/page_by_page_lister.py
+++ b/swh/lister/core/page_by_page_lister.py
@@ -1,160 +1,160 @@
# Copyright (C) 2015-2018 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import logging
-from .lister_transports import SWHListerHttpTransport
-from .lister_base import SWHListerBase
+from .lister_transports import ListerHttpTransport
+from .lister_base import ListerBase
-class PageByPageLister(SWHListerBase):
+class PageByPageLister(ListerBase):
"""Lister* intermediate class for any service that follows the simple
pagination page pattern.
- Client sends a request to list repositories starting from a
given page identifier.
- Client receives structured (json/xml/etc) response with
information about a sequential series of repositories (per page)
starting from a given index. And, if available, some indication
of the next page index for fetching the remaining repository
data.
- See :class:`swh.lister.core.lister_base.SWHListerBase` for more
+ See :class:`swh.lister.core.lister_base.ListerBase` for more
details.
This class cannot be instantiated. To create a new Lister for a
source code listing service that follows the model described
above, you must subclass this class. Then provide the required
overrides in addition to any unmet implementation/override
requirements of this class's base (see parent class and member
docstrings for details).
Required Overrides::
def get_next_target_from_response
"""
@abc.abstractmethod
def get_next_target_from_response(self, response):
"""Find the next server endpoint page given the entire response.
Implementation of this method depends on the server API spec
and the shape of the network response object returned by the
transport_request method.
For example, some api can use the headers links to provide the
next page.
Args:
response (transport response): response page from the server
Returns:
index of next page, possibly extracted from a next href url
"""
pass
@abc.abstractmethod
def get_pages_information(self):
"""Find the total number of pages.
Implementation of this method depends on the server API spec
and the shape of the network response object returned by the
transport_request method.
For example, some api can use dedicated headers:
- x-total-pages to provide the total number of pages
- x-total to provide the total number of repositories
- x-per-page to provide the number of elements per page
Returns:
tuple (total number of repositories, total number of
pages, per_page)
"""
pass
# You probably don't need to override anything below this line.
def do_additional_checks(self, models_list):
"""Potentially check for existence of repositories in models_list.
This will be called only if check_existence is flipped on in
the run method below.
"""
for m in models_list:
sql_repo = self.db_query_equal('uid', m['uid'])
if sql_repo:
return False
return models_list
def run(self, min_bound=None, max_bound=None, check_existence=False):
"""Main entry function. Sequentially fetches repository data from the
service according to the basic outline in the class
docstring. Continually fetching sublists until either there
is no next page reference given or the given next page is
greater than the desired max_page.
Args:
min_bound: optional page to start from
max_bound: optional page to stop at
check_existence (bool): optional existence check (for
incremental lister whose sort
order is inverted)
Returns:
nothing
"""
page = min_bound or 0
loop_count = 0
self.min_page = min_bound
self.max_page = max_bound
while self.is_within_bounds(page, self.min_page, self.max_page):
logging.info('listing repos starting at %s' % page)
response, injected_repos = self.ingest_data(page,
checks=check_existence)
if not response and not injected_repos:
logging.info('No response from api server, stopping')
break
elif not injected_repos:
logging.info('Repositories already seen, stopping')
break
next_page = self.get_next_target_from_response(response)
# termination condition
if (next_page is None) or (next_page == page):
logging.info('stopping after page %s, no next link found' %
page)
break
else:
page = next_page
loop_count += 1
if loop_count == 20:
logging.info('flushing updates')
loop_count = 0
self.db_session.commit()
self.db_session = self.mk_session()
self.db_session.commit()
self.db_session = self.mk_session()
-class PageByPageHttpLister(SWHListerHttpTransport, PageByPageLister):
+class PageByPageHttpLister(ListerHttpTransport, PageByPageLister):
"""Convenience class for ensuring right lookup and init order when
- combining PageByPageLister and SWHListerHttpTransport.
+ combining PageByPageLister and ListerHttpTransport.
"""
def __init__(self, api_baseurl=None, override_config=None):
- SWHListerHttpTransport.__init__(self, api_baseurl=api_baseurl)
+ ListerHttpTransport.__init__(self, api_baseurl=api_baseurl)
PageByPageLister.__init__(self, override_config=override_config)
diff --git a/swh/lister/core/simple_lister.py b/swh/lister/core/simple_lister.py
index 40c47b2..32b95d4 100644
--- a/swh/lister/core/simple_lister.py
+++ b/swh/lister/core/simple_lister.py
@@ -1,74 +1,74 @@
# Copyright (C) 2018 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from swh.core import utils
-from .lister_base import SWHListerBase
+from .lister_base import ListerBase
-class SimpleLister(SWHListerBase):
+class SimpleLister(ListerBase):
"""Lister* intermediate class for any service that follows the simple,
'list in oneshot information' pattern.
- Client sends a request to list repositories in oneshot
- Client receives structured (json/xml/etc) response with
information and stores those in db
"""
def list_packages(self, *args):
"""Listing packages method.
"""
pass
def ingest_data(self, identifier, checks=False):
"""Rework the base ingest_data.
Request server endpoint which gives all in one go.
Simplify and filter response list of repositories. Inject
repo information into local db. Queue loader tasks for
linked repositories.
Args:
identifier: Resource identifier (unused)
checks (bool): Additional checks required (unused)
"""
response = self.safely_issue_request(identifier)
response = self.list_packages(response)
if not response:
return response, []
models_list = self.transport_response_simplified(response)
models_list = self.filter_before_inject(models_list)
all_injected = []
for models in utils.grouper(models_list, n=10000):
models = list(models)
logging.debug('models: %s' % len(models))
# inject into local db
injected = self.inject_repo_data_into_db(models)
# queue workers
self.schedule_missing_tasks(models, injected)
all_injected.append(injected)
# flush
self.db_session.commit()
self.db_session = self.mk_session()
return response, all_injected
def run(self):
"""Query the server which answers in one query. Stores the
information, dropping actual redundant information we
already have.
Returns:
nothing
"""
dump_not_used_identifier = 0
response, injected_repos = self.ingest_data(dump_not_used_identifier)
if not response and not injected_repos:
logging.info('No response from api server, stopping')
diff --git a/swh/lister/core/tests/test_lister.py b/swh/lister/core/tests/test_lister.py
index a63260f..9dadae7 100644
--- a/swh/lister/core/tests/test_lister.py
+++ b/swh/lister/core/tests/test_lister.py
@@ -1,234 +1,234 @@
# Copyright (C) 2017-2018 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import time
from unittest import TestCase
from unittest.mock import Mock, patch
import requests_mock
from sqlalchemy import create_engine
from testing.postgresql import Postgresql
from swh.lister.core.abstractattribute import AbstractAttribute
def noop(*args, **kwargs):
pass
@requests_mock.Mocker()
class HttpListerTesterBase(abc.ABC):
"""Base testing class for subclasses of
- swh.lister.core.indexing_lister.SWHIndexingHttpLister.
+ swh.lister.core.indexing_lister.IndexingHttpLister.
swh.lister.core.page_by_page_lister.PageByPageHttpLister
See swh.lister.github.tests.test_gh_lister for an example of how
to customize for a specific listing service.
"""
Lister = AbstractAttribute('The lister class to test')
test_re = AbstractAttribute('Compiled regex matching the server url. Must'
' capture the index value.')
lister_subdir = AbstractAttribute('bitbucket, github, etc.')
good_api_response_file = AbstractAttribute('Example good response body')
bad_api_response_file = AbstractAttribute('Example bad response body')
first_index = AbstractAttribute('First index in good_api_response')
entries_per_page = AbstractAttribute('Number of results in good response')
LISTER_NAME = 'fake-lister'
# May need to override this if the headers are used for something
def response_headers(self, request):
return {}
# May need to override this if the server uses non-standard rate limiting
# method.
# Please keep the requested retry delay reasonably low.
def mock_rate_quota(self, n, request, context):
self.rate_limit += 1
context.status_code = 429
context.headers['Retry-After'] = '1'
return '{"error":"dummy"}'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.rate_limit = 1
self.response = None
self.fl = None
self.helper = None
if self.__class__ != HttpListerTesterBase:
self.run = TestCase.run.__get__(self, self.__class__)
else:
self.run = noop
def request_index(self, request):
m = self.test_re.search(request.path_url)
if m and (len(m.groups()) > 0):
return m.group(1)
else:
return None
def mock_response(self, request, context):
self.fl.reset_backoff()
self.rate_limit = 1
context.status_code = 200
custom_headers = self.response_headers(request)
context.headers.update(custom_headers)
if self.request_index(request) == str(self.first_index):
with open('swh/lister/%s/tests/%s' % (self.lister_subdir,
self.good_api_response_file),
'r', encoding='utf-8') as r:
return r.read()
else:
with open('swh/lister/%s/tests/%s' % (self.lister_subdir,
self.bad_api_response_file),
'r', encoding='utf-8') as r:
return r.read()
def mock_limit_n_response(self, n, request, context):
self.fl.reset_backoff()
if self.rate_limit <= n:
return self.mock_rate_quota(n, request, context)
else:
return self.mock_response(request, context)
def mock_limit_once_response(self, request, context):
return self.mock_limit_n_response(1, request, context)
def mock_limit_twice_response(self, request, context):
return self.mock_limit_n_response(2, request, context)
def get_fl(self, override_config=None):
"""Retrieve an instance of fake lister (fl).
"""
if override_config or self.fl is None:
self.fl = self.Lister(api_baseurl='https://fakeurl',
override_config=override_config)
self.fl.INITIAL_BACKOFF = 1
self.fl.reset_backoff()
return self.fl
def get_api_response(self):
fl = self.get_fl()
if self.response is None:
self.response = fl.safely_issue_request(self.first_index)
return self.response
def test_is_within_bounds(self, http_mocker):
fl = self.get_fl()
self.assertFalse(fl.is_within_bounds(1, 2, 3))
self.assertTrue(fl.is_within_bounds(2, 1, 3))
self.assertTrue(fl.is_within_bounds(1, 1, 1))
self.assertTrue(fl.is_within_bounds(1, None, None))
self.assertTrue(fl.is_within_bounds(1, None, 2))
self.assertTrue(fl.is_within_bounds(1, 0, None))
self.assertTrue(fl.is_within_bounds("b", "a", "c"))
self.assertFalse(fl.is_within_bounds("a", "b", "c"))
self.assertTrue(fl.is_within_bounds("a", None, "c"))
self.assertTrue(fl.is_within_bounds("a", None, None))
self.assertTrue(fl.is_within_bounds("b", "a", None))
self.assertFalse(fl.is_within_bounds("a", "b", None))
self.assertTrue(fl.is_within_bounds("aa:02", "aa:01", "aa:03"))
self.assertFalse(fl.is_within_bounds("aa:12", None, "aa:03"))
with self.assertRaises(TypeError):
fl.is_within_bounds(1.0, "b", None)
with self.assertRaises(TypeError):
fl.is_within_bounds("A:B", "A::B", None)
def test_api_request(self, http_mocker):
http_mocker.get(self.test_re, text=self.mock_limit_twice_response)
with patch.object(time, 'sleep', wraps=time.sleep) as sleepmock:
self.get_api_response()
self.assertEqual(sleepmock.call_count, 2)
def test_repos_list(self, http_mocker):
http_mocker.get(self.test_re, text=self.mock_response)
li = self.get_fl().transport_response_simplified(
self.get_api_response()
)
self.assertIsInstance(li, list)
self.assertEqual(len(li), self.entries_per_page)
def test_model_map(self, http_mocker):
http_mocker.get(self.test_re, text=self.mock_response)
fl = self.get_fl()
li = fl.transport_response_simplified(self.get_api_response())
di = li[0]
self.assertIsInstance(di, dict)
pubs = [k for k in vars(fl.MODEL).keys() if not k.startswith('_')]
for k in pubs:
if k not in ['last_seen', 'task_id', 'id']:
self.assertIn(k, di)
def disable_scheduler(self, fl):
fl.schedule_missing_tasks = Mock(return_value=None)
def disable_db(self, fl):
fl.winnow_models = Mock(return_value=[])
fl.db_inject_repo = Mock(return_value=fl.MODEL())
fl.disable_deleted_repo_tasks = Mock(return_value=None)
def test_fetch_none_nodb(self, http_mocker):
http_mocker.get(self.test_re, text=self.mock_response)
fl = self.get_fl()
self.disable_scheduler(fl)
self.disable_db(fl)
fl.run(min_bound=1, max_bound=1) # stores no results
def test_fetch_one_nodb(self, http_mocker):
http_mocker.get(self.test_re, text=self.mock_response)
fl = self.get_fl()
self.disable_scheduler(fl)
self.disable_db(fl)
fl.run(min_bound=self.first_index, max_bound=self.first_index)
def test_fetch_multiple_pages_nodb(self, http_mocker):
http_mocker.get(self.test_re, text=self.mock_response)
fl = self.get_fl()
self.disable_scheduler(fl)
self.disable_db(fl)
fl.run(min_bound=self.first_index)
def init_db(self, db, model):
engine = create_engine(db.url())
model.metadata.create_all(engine)
class HttpListerTester(HttpListerTesterBase, abc.ABC):
last_index = AbstractAttribute('Last index in good_api_response')
@requests_mock.Mocker()
def test_fetch_multiple_pages_yesdb(self, http_mocker):
http_mocker.get(self.test_re, text=self.mock_response)
initdb_args = Postgresql.DEFAULT_SETTINGS['initdb_args']
initdb_args = ' '.join([initdb_args, '-E UTF-8'])
db = Postgresql(initdb_args=initdb_args)
fl = self.get_fl(override_config={
'lister': {
'cls': 'local',
'args': {'db': db.url()}
}
})
self.init_db(db, fl.MODEL)
self.disable_scheduler(fl)
fl.run(min_bound=self.first_index)
self.assertEqual(fl.db_last_index(), self.last_index)
partitions = fl.db_partition_indices(5)
self.assertGreater(len(partitions), 0)
for k in partitions:
self.assertLessEqual(len(k), 5)
self.assertGreater(len(k), 0)
diff --git a/swh/lister/debian/lister.py b/swh/lister/debian/lister.py
index a0dc11a..44b766e 100644
--- a/swh/lister/debian/lister.py
+++ b/swh/lister/debian/lister.py
@@ -1,237 +1,237 @@
# Copyright (C) 2017 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import bz2
from collections import defaultdict
import datetime
import gzip
import lzma
import logging
from debian.deb822 import Sources
from sqlalchemy.orm import joinedload, load_only
from sqlalchemy.schema import CreateTable, DropTable
from swh.storage.schemata.distribution import (
AreaSnapshot, Distribution, DistributionSnapshot, Package,
TempPackage,
)
-from swh.lister.core.lister_base import SWHListerBase, FetchError
-from swh.lister.core.lister_transports import SWHListerHttpTransport
+from swh.lister.core.lister_base import ListerBase, FetchError
+from swh.lister.core.lister_transports import ListerHttpTransport
decompressors = {
'gz': lambda f: gzip.GzipFile(fileobj=f),
'bz2': bz2.BZ2File,
'xz': lzma.LZMAFile,
}
-class DebianLister(SWHListerHttpTransport, SWHListerBase):
+class DebianLister(ListerHttpTransport, ListerBase):
MODEL = Package
PATH_TEMPLATE = None
LISTER_NAME = 'debian'
instance = 'debian'
def __init__(self, override_config=None):
- SWHListerHttpTransport.__init__(self, api_baseurl="bogus")
- SWHListerBase.__init__(self, override_config=override_config)
+ ListerHttpTransport.__init__(self, api_baseurl="bogus")
+ ListerBase.__init__(self, override_config=override_config)
def transport_request(self, identifier):
- """Subvert SWHListerHttpTransport.transport_request, to try several
+ """Subvert ListerHttpTransport.transport_request, to try several
index URIs in turn.
The Debian repository format supports several compression algorithms
across the ages, so we try several URIs.
Once we have found a working URI, we break and set `self.decompressor`
to the one that matched.
Returns:
a requests Response object.
Raises:
FetchError: when all the URIs failed to be retrieved.
"""
response = None
compression = None
for uri, compression in self.area.index_uris():
response = super().transport_request(uri)
if response.status_code == 200:
break
else:
raise FetchError(
"Could not retrieve index for %s" % self.area
)
self.decompressor = decompressors.get(compression)
return response
def request_uri(self, identifier):
# In the overridden transport_request, we pass
- # SWHListerBase.transport_request() the full URI as identifier, so we
+ # ListerBase.transport_request() the full URI as identifier, so we
# need to return it here.
return identifier
def request_params(self, identifier):
# Enable streaming to allow wrapping the response in the decompressor
# in transport_response_simplified.
params = super().request_params(identifier)
params['stream'] = True
return params
def transport_response_simplified(self, response):
"""Decompress and parse the package index fetched in `transport_request`.
For each package, we "pivot" the file list entries (Files,
Checksums-Sha1, Checksums-Sha256), to return a files dict mapping
filenames to their checksums.
"""
if self.decompressor:
data = self.decompressor(response.raw)
else:
data = response.raw
for src_pkg in Sources.iter_paragraphs(data.readlines()):
files = defaultdict(dict)
for field in src_pkg._multivalued_fields:
if field.startswith('checksums-'):
sum_name = field[len('checksums-'):]
else:
sum_name = 'md5sum'
if field in src_pkg:
for entry in src_pkg[field]:
name = entry['name']
files[name]['name'] = entry['name']
files[name]['size'] = int(entry['size'], 10)
files[name][sum_name] = entry[sum_name]
yield {
'name': src_pkg['Package'],
'version': src_pkg['Version'],
'directory': src_pkg['Directory'],
'files': files,
}
def inject_repo_data_into_db(self, models_list):
"""Generate the Package entries that didn't previously exist.
- Contrary to SWHListerBase, we don't actually insert the data in
+ Contrary to ListerBase, we don't actually insert the data in
database. `schedule_missing_tasks` does it once we have the
origin and task identifiers.
"""
by_name_version = {}
temp_packages = []
area_id = self.area.id
for model in models_list:
name = model['name']
version = model['version']
temp_packages.append({
'area_id': area_id,
'name': name,
'version': version,
})
by_name_version[name, version] = model
# Add all the listed packages to a temporary table
self.db_session.execute(CreateTable(TempPackage.__table__))
self.db_session.bulk_insert_mappings(TempPackage, temp_packages)
def exists_tmp_pkg(db_session, model):
return (
db_session.query(model)
.filter(Package.area_id == TempPackage.area_id)
.filter(Package.name == TempPackage.name)
.filter(Package.version == TempPackage.version)
.exists()
)
# Filter out the packages that already exist in the main Package table
new_packages = self.db_session\
.query(TempPackage)\
.options(load_only('name', 'version'))\
.filter(~exists_tmp_pkg(self.db_session, Package))\
.all()
self.old_area_packages = self.db_session.query(Package).filter(
exists_tmp_pkg(self.db_session, TempPackage)
).all()
self.db_session.execute(DropTable(TempPackage.__table__))
added_packages = []
for package in new_packages:
model = by_name_version[package.name, package.version]
added_packages.append(Package(area=self.area,
**model))
self.db_session.add_all(added_packages)
return added_packages
def schedule_missing_tasks(self, models_list, added_packages):
"""We create tasks at the end of the full snapshot processing"""
return
def create_tasks_for_snapshot(self, snapshot):
tasks = [
snapshot.task_for_package(name, versions)
for name, versions in snapshot.get_packages().items()
]
return self.scheduler.create_tasks(tasks)
def run(self, distribution, date=None):
"""Run the lister for a given (distribution, area) tuple.
Args:
distribution (str): name of the distribution (e.g. "Debian")
date (datetime.datetime): date the snapshot is taken (defaults to
now)
"""
distribution = self.db_session\
.query(Distribution)\
.options(joinedload(Distribution.areas))\
.filter(Distribution.name == distribution)\
.one_or_none()
if not distribution:
raise ValueError("Distribution %s is not registered" %
distribution)
if not distribution.type == 'deb':
raise ValueError("Distribution %s is not a Debian derivative" %
distribution)
date = date or datetime.datetime.now(tz=datetime.timezone.utc)
logging.debug('Creating snapshot for distribution %s on date %s' %
(distribution, date))
snapshot = DistributionSnapshot(date=date, distribution=distribution)
self.db_session.add(snapshot)
for area in distribution.areas:
if not area.active:
continue
self.area = area
logging.debug('Processing area %s' % area)
_, new_area_packages = self.ingest_data(None)
area_snapshot = AreaSnapshot(snapshot=snapshot, area=area)
self.db_session.add(area_snapshot)
area_snapshot.packages.extend(new_area_packages)
area_snapshot.packages.extend(self.old_area_packages)
self.create_tasks_for_snapshot(snapshot)
self.db_session.commit()
return True
diff --git a/swh/lister/github/lister.py b/swh/lister/github/lister.py
index 866d0e5..b37a9fc 100644
--- a/swh/lister/github/lister.py
+++ b/swh/lister/github/lister.py
@@ -1,52 +1,52 @@
# Copyright (C) 2017-2019 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import re
import time
-from swh.lister.core.indexing_lister import SWHIndexingHttpLister
+from swh.lister.core.indexing_lister import IndexingHttpLister
from swh.lister.github.models import GitHubModel
-class GitHubLister(SWHIndexingHttpLister):
+class GitHubLister(IndexingHttpLister):
PATH_TEMPLATE = '/repositories?since=%d'
MODEL = GitHubModel
API_URL_INDEX_RE = re.compile(r'^.*/repositories\?since=(\d+)')
LISTER_NAME = 'github'
instance = 'github' # There is only 1 instance of such lister
def get_model_from_repo(self, repo):
return {
'uid': repo['id'],
'indexable': repo['id'],
'name': repo['name'],
'full_name': repo['full_name'],
'html_url': repo['html_url'],
'origin_url': repo['html_url'],
'origin_type': 'git',
'fork': repo['fork'],
}
def transport_quota_check(self, response):
reqs_remaining = int(response.headers['X-RateLimit-Remaining'])
if response.status_code == 403 and reqs_remaining == 0:
reset_at = int(response.headers['X-RateLimit-Reset'])
delay = min(reset_at - time.time(), 3600)
return True, delay
else:
return False, 0
def get_next_target_from_response(self, response):
if 'next' in response.links:
next_url = response.links['next']['url']
return int(self.API_URL_INDEX_RE.match(next_url).group(1))
else:
return None
def transport_response_simplified(self, response):
repos = response.json()
return [self.get_model_from_repo(repo) for repo in repos]
def request_headers(self):
return {'Accept': 'application/vnd.github.v3+json'}
diff --git a/swh/lister/npm/lister.py b/swh/lister/npm/lister.py
index bd8c4a6..c7e9d29 100644
--- a/swh/lister/npm/lister.py
+++ b/swh/lister/npm/lister.py
@@ -1,157 +1,157 @@
# Copyright (C) 2018-2019 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from urllib.parse import quote
-from swh.lister.core.indexing_lister import SWHIndexingHttpLister
+from swh.lister.core.indexing_lister import IndexingHttpLister
from swh.lister.npm.models import NpmModel
from swh.scheduler.utils import create_task_dict
-class NpmListerBase(SWHIndexingHttpLister):
+class NpmListerBase(IndexingHttpLister):
"""List packages available in the npm registry in a paginated way
"""
MODEL = NpmModel
LISTER_NAME = 'npm'
instance = 'npm'
def __init__(self, api_baseurl='https://replicate.npmjs.com',
per_page=1000, override_config=None):
super().__init__(api_baseurl=api_baseurl,
override_config=override_config)
self.per_page = per_page + 1
self.PATH_TEMPLATE += '&limit=%s' % self.per_page
@property
def ADDITIONAL_CONFIG(self):
"""(Override) Add extra configuration
"""
default_config = super().ADDITIONAL_CONFIG
default_config['loading_task_policy'] = ('str', 'recurring')
return default_config
def get_model_from_repo(self, repo_name):
"""(Override) Transform from npm package name to model
"""
package_url, package_metadata_url = self._compute_urls(repo_name)
return {
'uid': repo_name,
'indexable': repo_name,
'name': repo_name,
'full_name': repo_name,
'html_url': package_metadata_url,
'origin_url': package_url,
'origin_type': 'npm',
}
def task_dict(self, origin_type, origin_url, **kwargs):
"""(Override) Return task dict for loading a npm package into the archive
This is overridden from the lister_base as more information is
needed for the ingestion task creation.
"""
task_type = 'load-%s' % origin_type
task_policy = self.config['loading_task_policy']
package_name = kwargs.get('name')
package_metadata_url = kwargs.get('html_url')
return create_task_dict(task_type, task_policy,
package_name, origin_url,
package_metadata_url=package_metadata_url)
def request_headers(self):
"""(Override) Set requests headers to send when querying the npm registry
"""
return {'User-Agent': 'Software Heritage npm lister',
'Accept': 'application/json'}
def _compute_urls(self, repo_name):
"""Return a tuple (package_url, package_metadata_url)
"""
return (
'https://www.npmjs.com/package/%s' % repo_name,
# package metadata url needs to be escaped otherwise some requests
# may fail (for instance when a package name contains '/')
'%s/%s' % (self.api_baseurl, quote(repo_name, safe=''))
)
def string_pattern_check(self, inner, lower, upper=None):
""" (Override) Inhibit the effect of that method as packages indices
correspond to package names and thus do not respect any kind
of fixed length string pattern
"""
pass
class NpmLister(NpmListerBase):
"""List all packages available in the npm registry in a paginated way
"""
PATH_TEMPLATE = '/_all_docs?startkey="%s"'
def get_next_target_from_response(self, response):
"""(Override) Get next npm package name to continue the listing
"""
repos = response.json()['rows']
return repos[-1]['id'] if len(repos) == self.per_page else None
def transport_response_simplified(self, response):
"""(Override) Transform npm registry response to list for model manipulation
"""
repos = response.json()['rows']
if len(repos) == self.per_page:
repos = repos[:-1]
return [self.get_model_from_repo(repo['id']) for repo in repos]
class NpmIncrementalLister(NpmListerBase):
"""List packages in the npm registry, updated since a specific
update_seq value of the underlying CouchDB database, in a paginated way
"""
PATH_TEMPLATE = '/_changes?since=%s'
@property
def CONFIG_BASE_FILENAME(self): # noqa: N802
return 'lister_npm_incremental'
def get_next_target_from_response(self, response):
"""(Override) Get next npm package name to continue the listing
"""
repos = response.json()['results']
return repos[-1]['seq'] if len(repos) == self.per_page else None
def transport_response_simplified(self, response):
"""(Override) Transform npm registry response to list for model manipulation
"""
repos = response.json()['results']
if len(repos) == self.per_page:
repos = repos[:-1]
return [self.get_model_from_repo(repo['id']) for repo in repos]
def filter_before_inject(self, models_list):
"""(Override) Filter out documents in the CouchDB database
not related to a npm package
"""
models_filtered = []
for model in models_list:
package_name = model['name']
# document related to CouchDB internals
if package_name.startswith('_design/'):
continue
models_filtered.append(model)
return models_filtered
def disable_deleted_repo_tasks(self, start, end, keep_these):
"""(Override) Disable the processing performed by that method
as it is not relevant in this incremental lister context
and it raises and exception due to a different index type
(int instead of str)
"""
pass
diff --git a/swh/lister/phabricator/lister.py b/swh/lister/phabricator/lister.py
index e5efc4c..81825e1 100644
--- a/swh/lister/phabricator/lister.py
+++ b/swh/lister/phabricator/lister.py
@@ -1,178 +1,178 @@
# Copyright (C) 2019 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import urllib.parse
-from swh.lister.core.indexing_lister import SWHIndexingHttpLister
+from swh.lister.core.indexing_lister import IndexingHttpLister
from swh.lister.phabricator.models import PhabricatorModel
from collections import defaultdict
logger = logging.getLogger(__name__)
-class PhabricatorLister(SWHIndexingHttpLister):
+class PhabricatorLister(IndexingHttpLister):
PATH_TEMPLATE = '?order=oldest&attachments[uris]=1&after=%s'
MODEL = PhabricatorModel
LISTER_NAME = 'phabricator'
def __init__(self, forge_url, instance=None, api_token=None,
override_config=None):
if forge_url.endswith("/"):
forge_url = forge_url[:-1]
self.forge_url = forge_url
api_baseurl = '%s/api/diffusion.repository.search' % forge_url
self.api_token = api_token
if not instance:
instance = urllib.parse.urlparse(forge_url).hostname
self.instance = instance
super().__init__(api_baseurl=api_baseurl,
override_config=override_config)
def _build_query_params(self, params, api_token):
"""Build query params to include the forge's api token
Returns:
updated params dict with 'params' entry.
"""
params.update({'params': {'api.token': api_token}})
return params
def request_params(self, identifier):
"""Override the default params behavior to retrieve the api token
Credentials are stored as:
credentials:
phabricator:
:
- username:
password:
"""
params = {}
params['headers'] = self.request_headers() or {}
if self.api_token:
return self._build_query_params(params, self.api_token)
instance_creds = self.request_instance_credentials()
if not instance_creds:
raise ValueError(
'Phabricator forge needs authentication credential to list.')
api_token = instance_creds[0]['password']
return self._build_query_params(params, api_token)
def request_headers(self):
"""
(Override) Set requests headers to send when querying the
Phabricator API
"""
return {'User-Agent': 'Software Heritage phabricator lister',
'Accept': 'application/json'}
def get_model_from_repo(self, repo):
url = get_repo_url(repo['attachments']['uris']['uris'])
if url is None:
return None
return {
'uid': self.forge_url + str(repo['id']),
'indexable': repo['id'],
'name': repo['fields']['shortName'],
'full_name': repo['fields']['name'],
'html_url': url,
'origin_url': url,
'origin_type': repo['fields']['vcs'],
'instance': self.instance,
}
def get_next_target_from_response(self, response):
body = response.json()['result']['cursor']
if body['after'] != 'null':
return body['after']
else:
return None
def transport_response_simplified(self, response):
repos = response.json()
if repos['result'] is None:
raise ValueError(
'Problem during information fetch: %s' % repos['error_code'])
repos = repos['result']['data']
return [self.get_model_from_repo(repo) for repo in repos]
def filter_before_inject(self, models_list):
"""
- (Overrides) SWHIndexingLister.filter_before_inject
+ (Overrides) IndexingLister.filter_before_inject
Bounds query results by this Lister's set max_index.
"""
models_list = [m for m in models_list if m is not None]
return super().filter_before_inject(models_list)
def _bootstrap_repositories_listing(self):
"""
Method called when no min_bound value has been provided
to the lister. Its purpose is to:
1. get the first repository data hosted on the Phabricator
instance
2. inject them into the lister database
3. return the first repository index to start the listing
after that value
Returns:
int: The first repository index
"""
params = '&order=oldest&limit=1'
response = self.safely_issue_request(params)
models_list = self.transport_response_simplified(response)
self.max_index = models_list[0]['indexable']
models_list = self.filter_before_inject(models_list)
injected = self.inject_repo_data_into_db(models_list)
self.schedule_missing_tasks(models_list, injected)
return self.max_index
def run(self, min_bound=None, max_bound=None):
"""
(Override) Run the lister on the specified Phabricator instance
Args:
min_bound (int): Optional repository index to start the listing
after it
max_bound (int): Optional repository index to stop the listing
after it
"""
# initial call to the lister, we need to bootstrap it in that case
if min_bound is None:
min_bound = self._bootstrap_repositories_listing()
super().run(min_bound, max_bound)
def get_repo_url(attachments):
"""
Return url for a hosted repository from its uris attachments according
to the following priority lists:
* protocol: https > http
* identifier: shortname > callsign > id
"""
processed_urls = defaultdict(dict)
for uri in attachments:
protocol = uri['fields']['builtin']['protocol']
url = uri['fields']['uri']['effective']
identifier = uri['fields']['builtin']['identifier']
if protocol in ('http', 'https'):
processed_urls[protocol][identifier] = url
elif protocol is None:
for protocol in ('https', 'http'):
if url.startswith(protocol):
processed_urls[protocol]['undefined'] = url
break
for protocol in ['https', 'http']:
for identifier in ['shortname', 'callsign', 'id', 'undefined']:
if (protocol in processed_urls and
identifier in processed_urls[protocol]):
return processed_urls[protocol][identifier]
return None