diff --git a/requirements.txt b/requirements.txt index b5e90dd..cea9e6b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ dateutil +qless requests redis swh.core swh.storage diff --git a/swh/lister/github/lister.py b/swh/lister/github/lister.py index 6ee474d..1bf12f9 100644 --- a/swh/lister/github/lister.py +++ b/swh/lister/github/lister.py @@ -1,58 +1,53 @@ # Copyright © 2016 The Software Heritage Developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import requests -from swh.core.config import load_named_config, prepare_folders +from swh.core.config import load_named_config from swh.storage import get_storage from . import req_queue, processors, cache DEFAULT_CONFIG = { - 'queue_file': ('str', '~/.cache/swh/lister-github/queue.pickle'), + 'queue_url': ('str', 'redis://localhost'), 'cache_url': ('str', 'redis://localhost'), 'storage_class': ('str', 'local_storage'), 'storage_args': ('list[str]', ['dbname=softwareheritage-dev', '/srv/softwareheritage/objects']), 'credentials': ('list[str]', []), } CONFIG_NAME = 'lister/github.ini' def run_from_queue(): config = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) cache.init_cache(config['cache_url']) - queue_file = os.path.expanduser(config['queue_file']) - prepare_folders(os.path.dirname(queue_file)) + queue_url = os.path.expanduser(config['queue_url']) credentials = {} for credential in config['credentials']: login, token = credential.split(':') credentials[login] = {'token': token} - queue = req_queue.restore_from_file(queue_file) + queue = req_queue.from_url(queue_url) if req_queue.empty(queue): req_queue.push(queue, {'type': 'repositories', 'url': None}) session = requests.Session() storage = get_storage(config['storage_class'], config['storage_args']) - try: - while not req_queue.empty(queue): - processors.process_one_item( - queue, session=session, credentials=credentials, - storage=storage - ) - - finally: - req_queue.dump_to_file(queue, queue_file) + while not req_queue.empty(queue): + processors.process_one_item( + queue, session=session, credentials=credentials, + storage=storage + ) if __name__ == '__main__': run_from_queue() diff --git a/swh/lister/github/processors.py b/swh/lister/github/processors.py index b3a6d9f..4ab6abc 100644 --- a/swh/lister/github/processors.py +++ b/swh/lister/github/processors.py @@ -1,126 +1,137 @@ # Copyright © 2016 The Software Heritage Developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from math import ceil +import time from . import github_api, req_queue, storage_utils class ProcessError(ValueError): pass def repositories(item, queue, session, credentials, storage): print('Processing scrolling repositories %s' % item['url']) repos = github_api.repositories(url=item['url'], session=session, credentials=credentials) if not repos['code'] == 200: raise ProcessError(item) if 'next' in repos['links']: req_queue.push(queue, { 'type': 'repositories', 'url': repos['links']['next']['url'], }) storage_utils.update_repo_entities(storage, repos['data']) for repo in repos['data']: if not repo['fork']: req_queue.push(queue, { 'type': 'repository', 'repo_name': repo['full_name'], 'repo_id': repo['id'], }) def repository(item, queue, session, credentials, storage): print('Processing repository %s (%s)' % (item['repo_name'], item['repo_id'])) last_modified = storage_utils.repo_last_modified(storage, item['repo_id']) data = github_api.repository(item['repo_id'], session, credentials, last_modified) print(last_modified, '/', data['last_modified']) if data['code'] == 304: print('not modified') # Not modified # XXX: add validity return elif data['code'] == 200: print('modified') storage_utils.update_repo_entities(storage, [data['data']]) if data['data']['forks']: npages = ceil(data['data']['forks']/30) for page in range(1, npages + 1): req_queue.push(queue, { 'type': 'forks', 'repo_id': item['repo_id'], 'repo_name': item['repo_name'], 'forks_page': page, 'check_next': page == npages, }) return else: print('Could not get reply for repository %s' % item['repo_name']) print(data) def forks(item, queue, session, credentials, storage): print('Processing forks for repository %s (%s, page %s)' % ( item['repo_name'], item['repo_id'], item['forks_page'])) forks = github_api.forks(item['repo_id'], item['forks_page'], session, credentials) storage_utils.update_repo_entities(storage, forks['data']) if item['check_next'] and 'next' in forks['links']: req_queue.push(queue, { 'type': 'forks', 'repo_id': item['repo_id'], 'repo_name': item['repo_name'], 'forks_page': item['forks_page'] + 1, 'check_next': True, }) def user(item, queue, session, credentials, storage): print('Processing user %s (%s)' % (item['user_login'], item['user_id'])) last_modified = storage_utils.user_last_modified(storage, item['user_id']) data = github_api.user(item['user_id'], session, credentials, last_modified) print(last_modified, '/', data['last_modified']) if data['code'] == 304: print('not modified') # Not modified # XXX: add validity return elif data['code'] == 200: print('modified') storage_utils.update_user_entities(storage, [data['data']]) return else: print('Could not get reply for user %s' % item['user_login']) print(data) PROCESSORS = { 'repositories': repositories, 'repository': repository, 'forks': forks, 'user': user, } def process_one_item(queue, session, credentials, storage): - item = req_queue.pop(queue) + + job = None + + while True: + job = req_queue.pop(queue) + if job: + break + time.sleep(0.1) + try: - PROCESSORS[item['type']](item, queue, session, credentials, storage) + PROCESSORS[job.klass_name](job.data, queue, session, credentials, + storage) except Exception: - req_queue.push_front(queue, item) raise + else: + job.complete() diff --git a/swh/lister/github/req_queue.py b/swh/lister/github/req_queue.py index cc32c3b..6c9472f 100644 --- a/swh/lister/github/req_queue.py +++ b/swh/lister/github/req_queue.py @@ -1,52 +1,31 @@ # Copyright © 2016 The Software Heritage Developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections import defaultdict, deque -import os -import pickle -import tempfile +import qless + PRIORITIES = { - 'forks': 10, - 'user': 15, - 'repository': 20, - 'repositories': 30, + 'forks': 100, + 'repository': 75, + 'user': 50, + 'repositories': 40, } -def restore_from_file(file): - if not os.path.exists(file): - return defaultdict(deque) - - with open(file, 'rb') as f: - return pickle.load(f) - - -def dump_to_file(queue, file): - fd, filename = tempfile.mkstemp(dir=os.path.dirname(file)) - with open(fd, 'wb') as f: - pickle.dump(queue, f) - f.flush() - os.fsync(fd) - os.rename(filename, file) +def from_url(url): + return qless.Client(url).queues['github-lister'] -def push(queue, item): - queue[item['type']].append(item) - - -def push_front(queue, item): - queue[item['type']].appendleft(item) +def push(queue, item, **kwargs): + print("Push %s to %s" % (item, queue.name)) + type = item.pop('type') + queue.put(type, item, priority=PRIORITIES.get(type, 0), **kwargs) def pop(queue): - for type in sorted(queue, key=lambda t: PRIORITIES.get(t, 1000)): - if queue[type]: - return queue[type].popleft() - - raise IndexError("No items to pop") + return queue.pop() def empty(queue): - return not queue + return not len(queue)