diff --git a/PKG-INFO b/PKG-INFO index 680b02c..16c7266 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.lister.github -Version: 0.0.2 +Version: 0.0.3 Summary: Software Heritage GitHub lister Home-page: https://forge.softwareheritage.org/diffusion/DLSGH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/README b/README index 04baf88..ae7b7ed 100644 --- a/README +++ b/README @@ -1,37 +1,47 @@ Licensing ========= This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. See top-level LICENSE file for the full text of the GNU General Public License along with this program. Dependencies ============ -See requirements.txt +- python3 +- python3-psycopg2 +- python3-requests +- python3-sqlalchemy + Deployment ========== -The github lister can be run standalone by using `python3 -m swh.lister.github.lister`. +1. git clone under $GHLISTER_ROOT (of your choosing) +2. mkdir ~/.config/swh/ ~/.cache/swh/lister-github/ +3. edit $GHLISTER_ROOT/etc/crontab and customize GHLISTER_ROOT +4. crontab $GHLISTER_ROOT/etc/crontab +5. create configuration file ~/.config/swh/lister-github.ini Sample configuration file ------------------------- -cat ~/.config/swh/lister/github.ini +cat ~/.config/swh/lister-github.ini [main] - storage_class = local_storage - storage_args = dbname=softwareheritage-dev, /srv/softwareheritage/objects - queue_file = ~/.cache/swh/lister-github/queue.pickle - credentials = olasd:olasd_github_token, zacchiro:zacchiro_github_token \ No newline at end of file + db_url = postgres:///github + # see http://docs.sqlalchemy.org/en/latest/core/engines.html#database-urls + cache_dir = /home/zack/.cache/swh/lister-github + log_dir = /home/zack/.cache/swh/lister-github + username = foobar # github username + password = quux # github password diff --git a/TODO b/TODO index c10c02f..46a84f3 100644 --- a/TODO +++ b/TODO @@ -1,49 +1,57 @@ # -*- mode: org -*- +* TODO SQL: rework repo_history/repo_creations to use last_seen +* TODO cache dir: split json data from other HTTP info + for easier further processing of additional API data + +* TODO cache dir: split in subdirs + to avoid hitting too hard on the filesystem due to the large amount of files + (200k+) + * TODO network-level traceback Traceback (most recent call last): File "/usr/lib/python3/dist-packages/urllib3/response.py", line 186, in read data = self._fp.read(amt) File "/usr/lib/python3.4/http/client.py", line 500, in read return super(HTTPResponse, self).read(amt) File "/usr/lib/python3.4/http/client.py", line 529, in readinto return self._readinto_chunked(b) File "/usr/lib/python3.4/http/client.py", line 621, in _readinto_chunked n = self._safe_readinto(mvb) File "/usr/lib/python3.4/http/client.py", line 680, in _safe_readinto raise IncompleteRead(bytes(mvb[0:total_bytes]), len(b)) http.client.IncompleteRead: IncompleteRead(3201 bytes read, 10240 more expected) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/lib/python3/dist-packages/requests/models.py", line 653, in generate for chunk in self.raw.stream(chunk_size, decode_content=True): File "/usr/lib/python3/dist-packages/urllib3/response.py", line 256, in stream data = self.read(amt=amt, decode_content=decode_content) File "/usr/lib/python3/dist-packages/urllib3/response.py", line 214, in read raise ProtocolError('Connection broken: %r' % e, e) urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(3201 bytes read, 10240 more expected)', IncompleteRead(3201 bytes read, 10240 more expected)) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "bin/ghlister", line 110, in max_id=args.interval[1]) File "/home/zack/dati/projects/github-list-repo/ghlister/lister.py", line 129, in fetch repos_res = gh_api_request('/repositories?since=%d' % since, **cred) File "/home/zack/dati/projects/github-list-repo/ghlister/lister.py", line 55, in gh_api_request r = requests.get(GH_API_URL + path, **params) File "/usr/lib/python3/dist-packages/requests/api.py", line 60, in get return request('get', url, **kwargs) File "/usr/lib/python3/dist-packages/requests/api.py", line 49, in request return session.request(method=method, url=url, **kwargs) File "/usr/lib/python3/dist-packages/requests/sessions.py", line 457, in request resp = self.send(prep, **send_kwargs) File "/usr/lib/python3/dist-packages/requests/sessions.py", line 606, in send r.content File "/usr/lib/python3/dist-packages/requests/models.py", line 724, in content self._content = bytes().join(self.iter_content(CONTENT_CHUNK_SIZE)) or bytes() File "/usr/lib/python3/dist-packages/requests/models.py", line 656, in generate raise ChunkedEncodingError(e) requests.exceptions.ChunkedEncodingError: ('Connection broken: IncompleteRead(3201 bytes read, 10240 more expected)', IncompleteRead(3201 bytes read, 10240 more expected)) diff --git a/bin/batch b/bin/batch new file mode 100755 index 0000000..9796387 --- /dev/null +++ b/bin/batch @@ -0,0 +1,41 @@ +#!/bin/bash + +# Copyright (C) 2015 Stefano Zacchiroli +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +export https_proxy="127.0.0.1:8118" # use Tor +export PYTHONPATH=`pwd` + +DBNAME=github +DBCONN="-p 5433" + +psql="psql $DBCONN --no-psqlrc --pset t --pset format=unaligned ${DBNAME}" + +BATCH_NO="$1" +shift +if [ -z "$BATCH_NO" ] ; then + echo "Usage: batch MILLION_NO [ MIN_ID | continue ]" + exit 2 +fi + +MIN_ID="$1" +shift + +min_id=$[ ($BATCH_NO - 1) * 1000000 + 1 ] +max_id=$[ $BATCH_NO * 1000000 ] + +# allow min_id override on the command line +if [ "$MIN_ID" = "continue" ] ; then + last_id=$(echo "select max(id) from repos where ${min_id} <= id and id <= ${max_id}" | $psql) + if [ "$last_id" -eq "$last_id" ] 2> /dev/null ; then # is an integer? + echo "Continuing from last known id ${last_id}" + min_id=$last_id + fi +elif [ -n "$MIN_ID" ] ; then + min_id=$[ $MIN_ID > $min_id ? $MIN_ID : $min_id ] +fi + +cmd="bin/ghlister list ${min_id}-${max_id}" +echo Running $cmd ... +$cmd diff --git a/bin/ghlister b/bin/ghlister new file mode 100755 index 0000000..594f138 --- /dev/null +++ b/bin/ghlister @@ -0,0 +1,101 @@ +#!/usr/bin/python3 + +# Copyright (C) 2015 Stefano Zacchiroli +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import argparse +import logging +import sys + +from swh.lister.github import models +from swh.lister.github.lister import GitHubLister + +DEFAULT_CONF = { + 'cache_dir': './cache', + 'log_dir': './log', + 'cache_json': 'False', +} + + +def int_interval(s): + """parse an "N-M" string as an interval. + + Return an (N,M) int (or None) pair + + """ + def not_an_interval(): + raise argparse.ArgumentTypeError('not an interval: ' + s) + + def parse_int(s): + if s: + return int(s) + else: + return None + + if '-' not in s: + not_an_interval() + parts = s.split('-') + if len(parts) > 2: + not_an_interval() + return tuple([parse_int(p) for p in parts]) + + +def parse_args(): + cli = argparse.ArgumentParser( + description='list GitHub repositories and load them into a DB') + cli.add_argument('--db-url', '-d', metavar='SQLALCHEMY_URL', + help='SQLAlchemy DB URL (override conffile); see ' + '') # NOQA + cli.add_argument('--verbose', '-v', action='store_true', + help='be verbose') + + subcli = cli.add_subparsers(dest='action') + subcli.add_parser('createdb', help='initialize DB') + subcli.add_parser('dropdb', help='destroy DB') + + list_cli = subcli.add_parser('list', help='list repositories') + list_cli.add_argument('interval', + type=int_interval, + help='interval of repository IDs to list, ' + 'in N-M format; either N or M can be omitted.') + + list_cli = subcli.add_parser('catchup', + help='catchup with new repos since last time') + + args = cli.parse_args() + + if not args.action: + cli.error('no action given') + + return args + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) # XXX + + args = parse_args() + + override_conf = {} + if args.db_url: + override_conf['lister_db_url'] = args.db_url + + lister = GitHubLister(override_conf) + + if args.action == 'createdb': + models.SQLBase.metadata.create_all(lister.db_engine) + elif args.action == 'dropdb': + models.SQLBase.metadata.drop_all(lister.db_engine) + elif args.action == 'list': + lister.fetch(min_id=args.interval[0], + max_id=args.interval[1]) + elif args.action == 'catchup': + last_known_id = lister.last_repo_id() + if last_known_id is not None: + logging.info('catching up from last known repo id: %d' % + last_known_id) + lister.fetch(min_id=last_known_id + 1, + max_id=None) + else: + logging.error('Cannot catchup: no last known id found. Abort.') + sys.exit(2) diff --git a/bin/reset.sh b/bin/reset.sh new file mode 100644 index 0000000..f5bf69b --- /dev/null +++ b/bin/reset.sh @@ -0,0 +1,9 @@ +# Copyright (C) 2015 Stefano Zacchiroli +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +export PYTHONPATH=`pwd` +dropdb github +createdb github +bin/ghlister createdb +rm cache/* diff --git a/bin/status b/bin/status new file mode 100755 index 0000000..8a3105f --- /dev/null +++ b/bin/status @@ -0,0 +1,18 @@ +#!/bin/bash + +# Copyright (C) 2015 Stefano Zacchiroli +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# intended usage: watch -n 60 bin/status + +DBNAME="github" +DBCONN="-p 5433" + +psql="psql $DBCONN --no-psqlrc" + +ps auxw | grep bin/batch +echo "SELECT COUNT(*), MAX(id) FROM repos" | $psql "$DBNAME" +echo "\\l+ ${DBNAME}" | $psql "$DBNAME" +du -sh cache/ +zgrep -i --color=auto "'X-RateLimit-Remaining'" cache/$(ls -t cache/ | head -n 4 | tail -n 1) diff --git a/debian/control b/debian/control index 768522c..a3b75c8 100644 --- a/debian/control +++ b/debian/control @@ -1,22 +1,24 @@ Source: swh-lister-github Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-all, python3-dateutil, python3-nose, - python3-qless, - python3-redis, python3-setuptools, python3-swh.core, - python3-swh.storage, + python3-swh.scheduler (>= 0.0.10~), + python3-swh.storage (>= 0.0.69~), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DLSGH/ Package: python3-swh.lister.github Architecture: all -Depends: ${misc:Depends}, ${python3:Depends} +Depends: python3-swh.scheduler (>= 0.0.10~), + python3-swh.storage (>= 0.0.69~), + ${misc:Depends}, + ${python3:Depends} Description: Software Heritage GitHub lister diff --git a/etc/crontab b/etc/crontab new file mode 100644 index 0000000..4ebb2d1 --- /dev/null +++ b/etc/crontab @@ -0,0 +1,5 @@ +SHELL=/bin/bash +GHLISTER_ROOT=/home/zack/src/swh-lister-github + +# m h dom mon dow command + 0 8 * * * PYTHONPATH=$GHLISTER_ROOT $GHLISTER_ROOT/bin/ghlister catchup >> ~/.cache/swh/lister-github/$(date +\%Y\%m\%d).log 2>&1 diff --git a/requirements.txt b/requirements.txt index 66b87b7..5648255 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,6 @@ -python-dateutil -qless-py requests -redis +SQLAlchemy swh.core -swh.storage +swh.storage >= 0.0.69 +swh.scheduler >= 0.0.10 diff --git a/setup.py b/setup.py index 4676037..d9f9573 100644 --- a/setup.py +++ b/setup.py @@ -1,30 +1,30 @@ #!/usr/bin/env python3 from setuptools import setup def parse_requirements(): requirements = [] with open('requirements.txt') as f: for line in f.readlines(): line = line.strip() if not line or line.startswith('#'): continue requirements.append(line) return requirements setup( name='swh.lister.github', description='Software Heritage GitHub lister', author='Software Heritage developers', author_email='swh-devel@inria.fr', url='https://forge.softwareheritage.org/diffusion/DLSGH/', packages=['swh.lister.github'], - scripts=[], + scripts=['bin/ghlister'], install_requires=parse_requirements(), setup_requires=['vcversioner'], vcversioner={}, include_package_data=True, ) diff --git a/sql/crawler.sql b/sql/crawler.sql new file mode 100644 index 0000000..0a30b54 --- /dev/null +++ b/sql/crawler.sql @@ -0,0 +1,106 @@ + +-- -- return a random sample of repos, containing %percent repositories +-- create or replace function repos_random_sample_array(percent real) +-- returns setof repos as $$ +-- declare +-- samples integer; +-- repo repos%rowtype; +-- ids integer[]; +-- begin +-- select floor(count(*) / 100 * percent) into samples from repos; +-- ids := array(select id from repos order by id); +-- for i in 1 .. samples loop +-- select * into repo +-- from repos +-- where id = ids[round(random() * samples)]; +-- return next repo; +-- end loop; +-- return; +-- end +-- $$ +-- language plpgsql; + +-- return a random sample of repositories +create or replace function repos_random_sample(percent real) +returns setof repos as $$ +declare + sample_size integer; +begin + select floor(count(*) / 100 * percent) into sample_size from repos; + return query + select * from repos + order by random() + limit sample_size; + return; +end +$$ +language plpgsql; + +-- -- return a random sample of repositories +-- create or replace function random_sample_sequence(percent real) +-- returns setof repos as $$ +-- declare +-- sample_size integer; +-- seq_size integer; +-- min_id integer; +-- max_id integer; +-- begin +-- select floor(count(*) / 100 * percent) into sample_size from repos; +-- select min(id) into min_id from repos; +-- select max(id) into max_id from repos; +-- seq_size := sample_size * 3; -- IDs are sparse, generate a larger sequence +-- -- to have enough of them +-- return query +-- select * from repos +-- where id in +-- (select floor(random() * (max_id - min_id + 1))::integer +-- + min_id +-- from generate_series(1, seq_size)) +-- order by random() limit sample_size; +-- return; +-- end +-- $$ +-- language plpgsql; + +create or replace function repos_well_known() +returns setof repos as $$ +begin + return query + select * from repos + where full_name like 'apache/%' + or full_name like 'eclipse/%' + or full_name like 'mozilla/%' + or full_name = 'torvalds/linux' + or full_name = 'gcc-mirror/gcc'; + return; +end +$$ +language plpgsql; + +create table crawl_history ( + id bigserial primary key, + repo integer references repos(id), + task_id uuid, -- celery task id + date timestamptz not null, + duration interval, + status boolean, + result json, + stdout text, + stderr text +); + +create index on crawl_history (repo); + +create view missing_orig_repos AS + select * + from orig_repos as repos + where not exists + (select 1 from crawl_history as history + where history.repo = repos.id); + +create view missing_fork_repos AS + select * + from fork_repos as repos + where not exists + (select 1 from crawl_history as history + where history.repo = repos.id); diff --git a/sql/pimp_db.sql b/sql/pimp_db.sql new file mode 100644 index 0000000..2cc9cef --- /dev/null +++ b/sql/pimp_db.sql @@ -0,0 +1,36 @@ + +create view orig_repos as + select id, name, full_name, html_url, description, last_seen + from repos + where not fork; + +create view fork_repos as + select id, name, full_name, html_url, description, last_seen + from repos + where fork + +create extension pg_trgm; + +create index ix_trgm_repos_description on + repos using gin (description gin_trgm_ops); + +create index ix_trgm_repos_full_name on + repos using gin (full_name gin_trgm_ops); + +create table repos_history ( + ts timestamp default current_timestamp, + repos integer not null, + fork_repos integer, + orig_repos integer +); + +create view repo_creations as + select today.ts :: date as date, + today.repos - yesterday.repos as repos, + today.fork_repos - yesterday.fork_repos as fork_repos, + today.orig_repos - yesterday.orig_repos as orig_repos + from repos_history today + join repos_history yesterday on + (yesterday.ts = (select max(ts) + from repos_history + where ts < today.ts)); diff --git a/swh.lister.github.egg-info/PKG-INFO b/swh.lister.github.egg-info/PKG-INFO index 680b02c..16c7266 100644 --- a/swh.lister.github.egg-info/PKG-INFO +++ b/swh.lister.github.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.lister.github -Version: 0.0.2 +Version: 0.0.3 Summary: Software Heritage GitHub lister Home-page: https://forge.softwareheritage.org/diffusion/DLSGH/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.lister.github.egg-info/SOURCES.txt b/swh.lister.github.egg-info/SOURCES.txt index 0e6c754..365a3d9 100644 --- a/swh.lister.github.egg-info/SOURCES.txt +++ b/swh.lister.github.egg-info/SOURCES.txt @@ -1,30 +1,34 @@ .gitignore ACKNOWLEDGEMENTS LICENSE MANIFEST.in Makefile README TODO requirements.txt setup.py version.txt +bin/batch +bin/ghlister +bin/reset.sh +bin/status debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format +etc/crontab +sql/crawler.sql +sql/pimp_db.sql swh.lister.github.egg-info/PKG-INFO swh.lister.github.egg-info/SOURCES.txt swh.lister.github.egg-info/dependency_links.txt swh.lister.github.egg-info/requires.txt swh.lister.github.egg-info/top_level.txt swh/lister/github/__init__.py -swh/lister/github/cache.py -swh/lister/github/constants.py -swh/lister/github/converters.py -swh/lister/github/github_api.py +swh/lister/github/base.py +swh/lister/github/db_utils.py swh/lister/github/lister.py -swh/lister/github/processors.py -swh/lister/github/req_queue.py -swh/lister/github/storage_utils.py \ No newline at end of file +swh/lister/github/models.py +swh/lister/github/tasks.py \ No newline at end of file diff --git a/swh.lister.github.egg-info/requires.txt b/swh.lister.github.egg-info/requires.txt index 7184a44..4cd1a20 100644 --- a/swh.lister.github.egg-info/requires.txt +++ b/swh.lister.github.egg-info/requires.txt @@ -1,6 +1,5 @@ -python-dateutil -qless-py -redis +SQLAlchemy requests swh.core -swh.storage +swh.scheduler>=0.0.10 +swh.storage>=0.0.69 diff --git a/swh/lister/github/base.py b/swh/lister/github/base.py new file mode 100644 index 0000000..fdf99a0 --- /dev/null +++ b/swh/lister/github/base.py @@ -0,0 +1,63 @@ +# Copyright (C) 2016 The Software Heritage developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.core import config +from swh.storage import get_storage +from swh.scheduler.backend import SchedulerBackend + + +# TODO: split this into a lister-agnostic module + +class SWHLister(config.SWHConfig): + CONFIG_BASE_FILENAME = None + + DEFAULT_CONFIG = { + 'storage_class': ('str', 'remote_storage'), + 'storage_args': ('list[str]', ['http://localhost:5000/']), + + 'scheduling_db': ('str', 'dbname=softwareheritage-scheduler'), + } + + ADDITIONAL_CONFIG = {} + + def __init__(self): + self.config = self.parse_config_file( + additional_configs=[self.ADDITIONAL_CONFIG]) + + self.storage = get_storage(self.config['storage_class'], + self.config['storage_args']) + + self.scheduler = SchedulerBackend( + scheduling_db=self.config['scheduling_db'], + ) + + def create_origins(self, origins): + """Create the origins listed, and return their ids. + + Args: + origins: a list of origins + Returns: + a list of origin ids + """ + return self.storage.origin_add(origins) + + def create_tasks(self, tasks): + """Create the tasks specified, and return their ids. + + Args: + tasks (list of dict): a list of task specifications: + type (str): the task type + arguments (dict): the arguments for the task runner + args (list of str): arguments + kwargs (dict str -> str): keyword arguments + next_run (datetime.datetime): when to schedule the next run + Returns: + a list of task ids + """ + returned_tasks = self.scheduler.create_tasks(tasks) + return [returned_task['id'] for returned_task in returned_tasks] + + def disable_tasks(self, task_ids): + """Disable the tasks identified by the given ids""" + self.scheduler.disable_tasks(task_ids) diff --git a/swh/lister/github/cache.py b/swh/lister/github/cache.py deleted file mode 100644 index f47df37..0000000 --- a/swh/lister/github/cache.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright © 2016 The Software Heritage Developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import redis - -cache = None - - -def init_cache(url): - global cache - cache = redis.StrictRedis.from_url(url, decode_responses=True) - - -def _user_key(id): - return 'github:user:%d:uuid' % id - - -def _repo_key(id): - return 'github:repo:%d:uuid' % id - - -def get_user(id): - """Get the cache value for user `id`""" - return cache.get(_user_key(id)) - - -def set_user(id, uuid): - """Set the cache value for user `id`""" - cache.set(_user_key(id), uuid) - - -def get_repo(id): - """Get the cache value for repo `id`""" - return cache.get(_repo_key(id)) - - -def set_repo(id, uuid): - """Set the cache value for repo `id`""" - return cache.set(_repo_key(id), uuid) diff --git a/swh/lister/github/constants.py b/swh/lister/github/constants.py deleted file mode 100644 index 08dcc89..0000000 --- a/swh/lister/github/constants.py +++ /dev/null @@ -1,7 +0,0 @@ -# Copyright © 2016 The Software Heritage Developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -GITHUB_ORGS_UUID = '9f7b34d9-aa98-44d4-8907-b332c1036bc3' -GITHUB_USERS_UUID = 'ad6df473-c1d2-4f40-bc58-2b091d4a750e' -GITHUB_LISTER_UUID = '34bd6b1b-463f-43e5-a697-785107f598e4' diff --git a/swh/lister/github/converters.py b/swh/lister/github/converters.py deleted file mode 100644 index 4846f85..0000000 --- a/swh/lister/github/converters.py +++ /dev/null @@ -1,95 +0,0 @@ -# Copyright © 2016 The Software Heritage Developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import copy -import datetime -from email.utils import format_datetime - -from dateutil.parser import parse as parse_datetime - -from . import cache, constants - - -def utcnow(): - return datetime.datetime.now(tz=datetime.timezone.utc) - - -def updated_at_to_last_modified(updated_at): - if not updated_at: - return None - - dt = parse_datetime(updated_at).astimezone(datetime.timezone.utc) - return format_datetime(dt, usegmt=True) - - -def repository_to_entity(orig_entity, repo): - """Convert a repository to an entity""" - - entity = copy.deepcopy(orig_entity) - - owner_uuid = cache.get_user(repo['owner']['id']) - if not owner_uuid: - raise ValueError("Owner %s (id=%d) not in cache" % ( - repo['owner']['login'], repo['owner']['id'])) - - entity['parent'] = owner_uuid - entity['name'] = repo['full_name'] - entity['type'] = 'project' - entity['description'] = repo['description'] - if 'homepage' in repo: - entity['homepage'] = repo['homepage'] - entity['active'] = True - entity['generated'] = True - - entity['lister_metadata']['lister'] = constants.GITHUB_LISTER_UUID - entity['lister_metadata']['type'] = 'repository' - entity['lister_metadata']['id'] = repo['id'] - entity['lister_metadata']['fork'] = repo['fork'] - - if 'updated_at' in repo: - entity['lister_metadata']['updated_at'] = repo['updated_at'] - - entity['validity'] = [utcnow()] - - return entity - - -def user_to_entity(orig_entity, user): - """Convert a GitHub user toan entity""" - - entity = copy.deepcopy(orig_entity) - - if user['type'] == 'User': - parent = constants.GITHUB_USERS_UUID - - type = 'person' - elif user['type'] == 'Organization': - parent = constants.GITHUB_ORGS_UUID - type = 'group_of_persons' - else: - raise ValueError("Unknown GitHub user type %s" % user['type']) - - entity['parent'] = parent - - if 'name' in user: - entity['name'] = user['name'] - - if not entity.get('name'): - entity['name'] = user['login'] - - entity['type'] = type - entity['active'] = True - entity['generated'] = True - - entity['lister_metadata']['lister'] = constants.GITHUB_LISTER_UUID - entity['lister_metadata']['type'] = 'user' - entity['lister_metadata']['id'] = user['id'] - entity['lister_metadata']['login'] = user['login'] - - if 'updated_at' in user: - entity['lister_metadata']['updated_at'] = user['updated_at'] - - entity['validity'] = [datetime.datetime.now()] - - return entity diff --git a/swh/lister/github/db_utils.py b/swh/lister/github/db_utils.py new file mode 100644 index 0000000..0563036 --- /dev/null +++ b/swh/lister/github/db_utils.py @@ -0,0 +1,18 @@ +# Copyright (C) 2015 Stefano Zacchiroli +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from contextlib import contextmanager + + +@contextmanager +def session_scope(mk_session): + session = mk_session() + try: + yield session + session.commit() + except: + session.rollback() + raise + finally: + session.close() diff --git a/swh/lister/github/github_api.py b/swh/lister/github/github_api.py deleted file mode 100644 index 0e4807f..0000000 --- a/swh/lister/github/github_api.py +++ /dev/null @@ -1,137 +0,0 @@ -# Copyright © 2016 The Software Heritage Developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -# see https://developer.github.com/v3/ for the GitHub API documentation - -import time -from urllib.parse import urljoin - -import requests - - -GITHUB_API_BASE = 'https://api.github.com/' - - -def github_api_request(url, last_modified=None, etag=None, session=None, - credentials=None): - """Make a request to the GitHub API at 'url'. - - Args: - url: the URL of the GitHub API endpoint to request - last_modified: the last time that URL was requested - etag: the etag for the last answer at this URL (overrides - last_modified) - session: a requests session - credentials: a list of dicts for GitHub credentials with keys: - login: the GitHub login for the credential - token: the API token for the credential - x_ratelimit_*: the rate-limit info for the given credential - - Returns: - a dict with the following keys: - credential_used: the login for the credential used - x_ratelimit_*: GitHub rate-limiting information - """ - print("Requesting url %s" % url) - if not session: - session = requests - - headers = { - 'Accept': 'application/vnd.github.v3+json', - } - - if etag: - headers['If-None-Match'] = etag - else: - if last_modified: - headers['If-Modified-Since'] = last_modified - - if not credentials: - credentials = {None: {}} - - reply = None - ret = {} - for login, creds in credentials.items(): - # Handle rate-limiting - remaining = creds.get('x_ratelimit_remaining') - reset = creds.get('x_ratelimit_reset') - if remaining == 0 and reset and reset > time.time(): - continue - - kwargs = {} - if login and creds['token']: - kwargs['auth'] = (login, creds['token']) - - reply = session.get(url, headers=headers, **kwargs) - - ratelimit = { - key.lower().replace('-', '_'): int(value) - for key, value in reply.headers.items() - if key.lower().startswith('x-ratelimit') - } - - ret.update(ratelimit) - creds.update(ratelimit) - - if not(reply.status_code == 403 and - ratelimit.get('x_ratelimit_remaining') == 0): - # Request successful, let's get out of here - break - else: - # we broke out of the loop - raise ValueError('All out of credentials', credentials) - - etag = reply.headers.get('ETag') - if etag and etag.startswith(('w/', 'W/')): - # Strip down reference to "weak" etags - etag = etag[2:] - - ret.update({ - 'url': url, - 'code': reply.status_code, - 'data': reply.json() if reply.status_code != 304 else None, - 'etag': etag, - 'last_modified': reply.headers.get('Last-Modified'), - 'links': reply.links, - 'login': login, - }) - - return ret - - -def repositories(since=0, url=None, session=None, credentials=None): - """Request the list of public repositories with id greater than `since`""" - if not url: - url = urljoin(GITHUB_API_BASE, 'repositories?since=%s' % since) - - req = github_api_request(url, session=session, credentials=credentials) - - return req - - -def repository(id, session=None, credentials=None, last_modified=None): - """Request the information on the repository with the given id""" - url = urljoin(GITHUB_API_BASE, 'repositories/%d' % id) - req = github_api_request(url, session=session, credentials=credentials, - last_modified=last_modified) - - return req - - -def forks(id, page, session=None, credentials=None): - """Request the information on the repository with the given id""" - url = urljoin(GITHUB_API_BASE, - 'repositories/%d/forks?sort=oldest&page=%d' % (id, page)) - req = github_api_request(url, session=session, credentials=credentials) - - return req - - -def user(id, session=None, credentials=None, last_modified=None): - """Request the information on the user with the given id""" - url = urljoin(GITHUB_API_BASE, 'user/%d' % id) - req = github_api_request(url, session=session, credentials=credentials, - last_modified=last_modified) - - return req diff --git a/swh/lister/github/lister.py b/swh/lister/github/lister.py index 1bf12f9..295bc78 100644 --- a/swh/lister/github/lister.py +++ b/swh/lister/github/lister.py @@ -1,53 +1,348 @@ -# Copyright © 2016 The Software Heritage Developers +# Copyright (C) 2015 Stefano Zacchiroli +# Copyright (C) 2016 The Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import os +# see https://developer.github.com/v3/ for GitHub API documentation +import datetime +import gzip +import logging +import os +import random +import re import requests +import time + +from pprint import pformat + +from sqlalchemy import create_engine, func +from sqlalchemy.orm import sessionmaker + +from swh.core import config +from swh.lister.github.base import SWHLister +from swh.lister.github.db_utils import session_scope +from swh.lister.github.models import Repository + + +GH_API_URL = 'https://api.github.com' +GH_REPO_URL_TEMPLATE = 'https://github.com/%s' +MAX_RETRIES = 7 +MAX_SLEEP = 3600 # 1 hour +CONN_SLEEP = 10 + +REPO_API_URL_RE = re.compile(r'^.*/repositories\?since=(\d+)') + + +class FetchError(RuntimeError): + + def __init__(self, response): + self.response = response + + def __str__(self): + return repr(self.response) + + +def save_http_response(r, cache_dir): + def escape_url_path(p): + return p.replace('/', '__') + + fname = os.path.join(cache_dir, + escape_url_path(r.request.path_url) + '.gz') + with gzip.open(fname, 'w') as f: + def emit(s): + f.write(bytes(s, 'UTF-8')) + emit(pformat(r.request.path_url)) + emit('\n#\n') + emit(pformat(r.status_code)) + emit('\n#\n') + emit(pformat(r.headers)) + emit('\n#\n') + emit(pformat(r.json())) + + +def gh_api_request(path, username=None, password=None, session=None, + headers=None): + params = {} + + if headers is None: + headers = {} + + if 'Accept' not in headers: # request version 3 of the API + headers['Accept'] = 'application/vnd.github.v3+json' + + params['headers'] = headers + if username is not None and password is not None: + params['auth'] = (username, password) + + if session is None: + session = requests.Session() + + retries_left = MAX_RETRIES + while retries_left > 0: + logging.debug('sending API request: %s' % path) + try: + r = session.get(GH_API_URL + path, **params) + except requests.exceptions.ConnectionError: + # network-level connection error, try again + logging.warn('connection error upon %s: sleep for %d seconds' % + (path, CONN_SLEEP)) + time.sleep(CONN_SLEEP) + retries_left -= 1 + continue + + if r.ok: # all went well, do not retry + break + + # detect throttling + if r.status_code == 403 and \ + int(r.headers['X-RateLimit-Remaining']) == 0: + delay = int(r.headers['X-RateLimit-Reset']) - time.time() + delay = min(delay, MAX_SLEEP) + logging.warn('rate limited upon %s: sleep for %d seconds' % + (path, int(delay))) + time.sleep(delay) + else: # unexpected error, abort + break + + retries_left -= 1 + + if not retries_left: + logging.warn('giving up on %s: max retries exceed' % path) + + return r + + +class GitHubLister(SWHLister): + CONFIG_BASE_FILENAME = 'lister-github' + ADDITIONAL_CONFIG = { + 'lister_db_url': ('str', 'postgresql:///lister-github'), + 'credentials': ('list[dict]', []), + 'cache_json': ('bool', False), + 'cache_dir': ('str', '~/.cache/swh/lister/github'), + } + + def __init__(self, override_config=None): + super().__init__() + if override_config: + self.config.update(override_config) + + self.config['cache_dir'] = os.path.expanduser(self.config['cache_dir']) + if self.config['cache_json']: + config.prepare_folders(self.config, ['cache_dir']) + + if not self.config['credentials']: + raise ValueError('The GitHub lister needs credentials for API') + + self.db_engine = create_engine(self.config['lister_db_url']) + self.mk_session = sessionmaker(bind=self.db_engine) + + def lookup_repo(self, repo_id, db_session=None): + if not db_session: + with session_scope(self.mk_session) as db_session: + return self.lookup_repo(repo_id, db_session=db_session) + + return db_session.query(Repository) \ + .filter(Repository.id == repo_id) \ + .first() + + def query_range(self, start, end, db_session=None): + if not db_session: + with session_scope(self.mk_session) as db_session: + return self.query_range(start, end, db_session=db_session) + + return db_session.query(Repository) \ + .filter(Repository.id >= start) \ + .filter(Repository.id <= end) + + def lookup_full_names(self, full_names, db_session=None): + if not db_session: + with session_scope(self.mk_session) as db_session: + return self.lookup_full_names(full_names, + db_session=db_session) + + return db_session.query(Repository) \ + .filter(Repository.full_name.in_(full_names)) \ + .all() + + def last_repo_id(self, db_session=None): + if not db_session: + with session_scope(self.mk_session) as db_session: + return self.last_repo_id(db_session=db_session) + + t = db_session.query(func.max(Repository.id)).first() + + if t is not None: + return t[0] + + INJECT_KEYS = ['id', 'name', 'full_name', 'html_url', 'description', + 'fork'] + + def inject_repo(self, repo, db_session=None): + if not db_session: + with session_scope(self.mk_session) as db_session: + return self.inject_repo(repo, db_session=db_session) + + logging.debug('injecting repo %d' % repo['id']) + sql_repo = self.lookup_repo(repo['id'], db_session) + if not sql_repo: + kwargs = {k: repo[k] for k in self.INJECT_KEYS if k in repo} + sql_repo = Repository(**kwargs) + db_session.add(sql_repo) + else: + for k in self.INJECT_KEYS: + if k in repo: + setattr(sql_repo, k, repo[k]) + sql_repo.last_seen = datetime.datetime.now() + + return sql_repo + + @staticmethod + def repo_to_origin(full_name): + return { + 'type': 'git', + 'url': GH_REPO_URL_TEMPLATE % full_name, + } + + @staticmethod + def repo_to_task(full_name): + return { + 'type': 'origin-update-git', + 'arguments': { + 'args': [ + GH_REPO_URL_TEMPLATE % full_name, + ], + 'kwargs': {}, + }, + 'next_run': datetime.datetime.now(), + } + + def fetch(self, min_id=None, max_id=None): + if min_id is None: + min_id = 1 + if max_id is None: + max_id = float('inf') + next_id = min_id + + do_cache = self.config['cache_json'] + cache_dir = self.config['cache_dir'] + + session = requests.Session() + db_session = self.mk_session() + loop_count = 0 + while min_id <= next_id <= max_id: + logging.info('listing repos starting at %d' % next_id) + + # github API ?since=... is '>' strict, not '>=' + since = next_id - 1 + + cred = random.choice(self.config['credentials']) + repos_res = gh_api_request('/repositories?since=%d' % since, + session=session, **cred) + + if do_cache: + save_http_response(repos_res, cache_dir) + + if not repos_res.ok: + raise FetchError(repos_res) + + next_next_id = None + if 'next' in repos_res.links: + next_url = repos_res.links['next']['url'] + m = REPO_API_URL_RE.match(next_url) # parse next_id + next_next_id = int(m.group(1)) + 1 + + repos = repos_res.json() + mapped_repos = {} + tasks = {} + origins = {} + repo_ids = set() + for repo in repos: + if repo['id'] > max_id: # do not overstep max_id + break + repo_ids.add(repo['id']) + full_name = repo['full_name'] + mapped_repos[full_name] = self.inject_repo(repo, db_session) -from swh.core.config import load_named_config -from swh.storage import get_storage + # Retrieve and reset task and origin ids from existing repos + old_repos = self.lookup_full_names(list(mapped_repos.keys()), + db_session=db_session) + for old_repo in old_repos: + full_name = old_repo.full_name + if old_repo.task_id: + tasks[full_name] = old_repo.task_id + old_repo.task_id = None + if old_repo.origin_id: + origins[full_name] = old_repo.origin_id + old_repo.origin_id = None -from . import req_queue, processors, cache + # Create missing origins + missing_origins = [ + full_name for full_name in sorted(mapped_repos) + if full_name not in origins + ] -DEFAULT_CONFIG = { - 'queue_url': ('str', 'redis://localhost'), - 'cache_url': ('str', 'redis://localhost'), - 'storage_class': ('str', 'local_storage'), - 'storage_args': ('list[str]', ['dbname=softwareheritage-dev', - '/srv/softwareheritage/objects']), - 'credentials': ('list[str]', []), + if missing_origins: + new_origins = [ + self.repo_to_origin(full_name) + for full_name in missing_origins + ] + new_origin_ids = self.create_origins(new_origins) + origins.update(zip(missing_origins, new_origin_ids)) -} -CONFIG_NAME = 'lister/github.ini' + for full_name, origin_id in origins.items(): + mapped_repos[full_name].origin_id = origin_id + # Create missing tasks + missing_tasks = [ + full_name for full_name in sorted(mapped_repos) + if full_name not in tasks + ] -def run_from_queue(): - config = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) + if missing_tasks: + new_tasks = [ + self.repo_to_task(full_name) + for full_name in missing_tasks + ] + new_task_ids = self.create_tasks(new_tasks) + tasks.update(zip(missing_tasks, new_task_ids)) - cache.init_cache(config['cache_url']) + for full_name, task_id in tasks.items(): + mapped_repos[full_name].task_id = task_id - queue_url = os.path.expanduser(config['queue_url']) + # Disable tasks for deleted repos + if next_next_id is not None: + start, end = next_id, min(max_id, next_next_id) + else: + start, end = next_id, max_id - credentials = {} - for credential in config['credentials']: - login, token = credential.split(':') - credentials[login] = {'token': token} + deleted_repos = self.query_range(start, end, + db_session=db_session) \ + .filter(~Repository.id.in_(repo_ids)) \ + .all() - queue = req_queue.from_url(queue_url) + if deleted_repos: + tasks_to_disable = [] + for repo in deleted_repos: + if repo.task_id is not None: + tasks_to_disable.append(repo.task_id) + repo.task_id = None - if req_queue.empty(queue): - req_queue.push(queue, {'type': 'repositories', 'url': None}) + if tasks_to_disable: + self.disable_tasks(tasks_to_disable) - session = requests.Session() - storage = get_storage(config['storage_class'], config['storage_args']) + if next_next_id is None: + logging.info('stopping after id %d, no next link found' % + next_id) + break + else: + next_id = next_next_id - while not req_queue.empty(queue): - processors.process_one_item( - queue, session=session, credentials=credentials, - storage=storage - ) + loop_count += 1 + if loop_count == 20: + logging.info('flushing updates') + loop_count = 0 + db_session.commit() + db_session = self.mk_session() -if __name__ == '__main__': - run_from_queue() + db_session.commit() diff --git a/swh/lister/github/models.py b/swh/lister/github/models.py new file mode 100644 index 0000000..e35ca78 --- /dev/null +++ b/swh/lister/github/models.py @@ -0,0 +1,51 @@ +# Copyright (C) 2015 Stefano Zacchiroli +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import datetime + +from sqlalchemy import Column +from sqlalchemy import Boolean, DateTime, Integer, String +from sqlalchemy.ext.declarative import declarative_base + + +SQLBase = declarative_base() + + +class Repository(SQLBase): + + """a GitHub repository""" + + __tablename__ = 'repos' + + id = Column(Integer, primary_key=True) + + name = Column(String, index=True) + full_name = Column(String, index=True) + html_url = Column(String) + description = Column(String) + fork = Column(Boolean, index=True) + + last_seen = Column(DateTime, nullable=False) + + task_id = Column(Integer) + origin_id = Column(Integer) + + def __init__(self, id, name=None, full_name=None, html_url=None, + description=None, fork=None, task_id=None, origin_id=None): + self.id = id + self.last_seen = datetime.now() + if name is not None: + self.name = name + if full_name is not None: + self.full_name = full_name + if html_url is not None: + self.html_url = html_url + if description is not None: + self.description = description + if fork is not None: + self.fork = fork + if task_id is not None: + self.task_id = task_id + if origin_id is not None: + self.origin_id = origin_id diff --git a/swh/lister/github/processors.py b/swh/lister/github/processors.py deleted file mode 100644 index 84e3a7d..0000000 --- a/swh/lister/github/processors.py +++ /dev/null @@ -1,163 +0,0 @@ -# Copyright © 2016 The Software Heritage Developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from math import ceil -import time - -from . import github_api, req_queue, storage_utils - - -class ProcessError(ValueError): - pass - - -def repositories(item, queue, session, credentials, storage): - print('Processing scrolling repositories %s' % item['url']) - repos = github_api.repositories(url=item['url'], session=session, - credentials=credentials) - if not repos['code'] == 200: - raise ProcessError(item) - - if 'next' in repos['links']: - req_queue.push(queue, { - 'type': 'repositories', - 'url': repos['links']['next']['url'], - }) - - users = {} - for repo in repos['data']: - users[repo['owner']['id']] = repo['owner'] - - for id, user in users.items(): - jid = 'user-%d' % id - if not queue.client.jobs[jid]: - req_queue.push(queue, { - 'type': 'user', - 'user_login': user['login'], - 'user_id': id, - }, jid=jid) - - for repo in repos['data']: - if not repo['fork']: - req_queue.push(queue, { - 'type': 'repository', - 'repo_name': repo['full_name'], - 'repo_id': repo['id'], - }) - - storage_utils.update_repo_entities(storage, repos['data']) - - -def repository(item, queue, session, credentials, storage): - print('Processing repository %s (%s)' % (item['repo_name'], - item['repo_id'])) - - last_modified = storage_utils.repo_last_modified(storage, item['repo_id']) - data = github_api.repository(item['repo_id'], session, credentials, - last_modified) - - print(last_modified, '/', data['last_modified']) - if data['code'] == 304: - print('not modified') - # Not modified - # XXX: add validity - return - elif data['code'] == 200: - print('modified') - storage_utils.update_repo_entities(storage, [data['data']]) - if data['data']['forks']: - npages = ceil(data['data']['forks']/30) - for page in range(1, npages + 1): - req_queue.push(queue, { - 'type': 'forks', - 'repo_id': item['repo_id'], - 'repo_name': item['repo_name'], - 'forks_page': page, - 'check_next': page == npages, - }) - return - else: - print('Could not get reply for repository %s' % item['repo_name']) - print(data) - - -def forks(item, queue, session, credentials, storage): - print('Processing forks for repository %s (%s, page %s)' % ( - item['repo_name'], item['repo_id'], item['forks_page'])) - - forks = github_api.forks(item['repo_id'], item['forks_page'], session, - credentials) - - users = {} - for repo in forks['data']: - users[repo['owner']['id']] = repo['owner'] - - for id, user in users.items(): - jid = 'user-%d' % id - if not queue.client.jobs[jid]: - req_queue.push(queue, { - 'type': 'user', - 'user_login': user['login'], - 'user_id': id, - }, jid=jid) - - if item['check_next'] and 'next' in forks['links']: - req_queue.push(queue, { - 'type': 'forks', - 'repo_id': item['repo_id'], - 'repo_name': item['repo_name'], - 'forks_page': item['forks_page'] + 1, - 'check_next': True, - }) - - storage_utils.update_repo_entities(storage, forks['data']) - - -def user(item, queue, session, credentials, storage): - print('Processing user %s (%s)' % (item['user_login'], item['user_id'])) - - last_modified = storage_utils.user_last_modified(storage, item['user_id']) - - data = github_api.user(item['user_id'], session, credentials, - last_modified) - - print(last_modified, '/', data['last_modified']) - if data['code'] == 304: - print('not modified') - # Not modified - # XXX: add validity - return - elif data['code'] == 200: - print('modified') - storage_utils.update_user_entities(storage, [data['data']]) - return - else: - print('Could not get reply for user %s' % item['user_login']) - print(data) - -PROCESSORS = { - 'repositories': repositories, - 'repository': repository, - 'forks': forks, - 'user': user, -} - - -def process_one_item(queue, session, credentials, storage): - - job = None - - while True: - job = req_queue.pop(queue) - if job: - break - time.sleep(0.1) - - try: - PROCESSORS[job.klass_name](job.data, queue, session, credentials, - storage) - except Exception: - raise - else: - job.complete() diff --git a/swh/lister/github/req_queue.py b/swh/lister/github/req_queue.py deleted file mode 100644 index 6c9472f..0000000 --- a/swh/lister/github/req_queue.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright © 2016 The Software Heritage Developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import qless - - -PRIORITIES = { - 'forks': 100, - 'repository': 75, - 'user': 50, - 'repositories': 40, -} - - -def from_url(url): - return qless.Client(url).queues['github-lister'] - - -def push(queue, item, **kwargs): - print("Push %s to %s" % (item, queue.name)) - type = item.pop('type') - queue.put(type, item, priority=PRIORITIES.get(type, 0), **kwargs) - - -def pop(queue): - return queue.pop() - - -def empty(queue): - return not len(queue) diff --git a/swh/lister/github/storage_utils.py b/swh/lister/github/storage_utils.py deleted file mode 100644 index f625fab..0000000 --- a/swh/lister/github/storage_utils.py +++ /dev/null @@ -1,117 +0,0 @@ -# Copyright © 2016 The Software Heritage Developers -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import uuid - -from . import cache, constants, converters - - -def update_user_entities(storage, users): - """Update entities for several users in storage. Returns the new entities. - """ - - users = list(sorted(users, key=lambda u: u['id'])) - - query = [{ - 'lister': constants.GITHUB_LISTER_UUID, - 'type': 'user', - 'id': user['id'], - } for user in users] - - entities = list(storage.entity_get_from_lister_metadata(query)) - - new_entities = [] - - for user, entity in zip(users, entities): - if not entity['uuid']: - entity = { - 'uuid': uuid.uuid4(), - 'doap': {}, - 'lister_metadata': {}, - } - new_entity = converters.user_to_entity(entity, user) - cache.set_user(user['id'], new_entity['uuid']) - new_entities.append(new_entity) - - storage.entity_add(new_entities) - - return new_entities - - -def update_repo_entities(storage, repos): - """Update entities for several repositories in storage. Returns the new - entities.""" - - repos = list(sorted(repos, key=lambda r: r['id'])) - - users = {} - for repo in repos: - if not cache.get_user(repo['owner']['id']): - users[repo['owner']['id']] = repo['owner'] - - if users: - update_user_entities(storage, users.values()) - - query = [{ - 'lister': constants.GITHUB_LISTER_UUID, - 'type': 'repository', - 'id': repo['id'], - } for repo in repos] - - entities = list(storage.entity_get_from_lister_metadata(query)) - - new_entities = [] - - for repo, entity in zip(repos, entities): - if not entity['uuid']: - entity = { - 'uuid': uuid.uuid4(), - 'doap': {}, - 'lister_metadata': {}, - } - new_entities.append(converters.repository_to_entity(entity, repo)) - - storage.entity_add(new_entities) - - return new_entities - - -def repo_last_modified(storage, id): - entity_id = cache.get_repo(id) - - if entity_id: - entity = storage.entity_get_one(entity_id) - else: - entity = list(storage.entity_get_from_lister_metadata([{ - 'lister': constants.GITHUB_LISTER_UUID, - 'type': 'repository', - 'id': id, - }]))[0] - - if entity['uuid']: - cache.set_repo(id, entity['uuid']) - - updated_at = entity.get('lister_metadata', {}).get('updated_at') - - return converters.updated_at_to_last_modified(updated_at) - - -def user_last_modified(storage, id): - entity_id = cache.get_user(id) - - if entity_id: - entity = storage.entity_get_one(entity_id) - else: - entity = list(storage.entity_get_from_lister_metadata([{ - 'lister': constants.GITHUB_LISTER_UUID, - 'type': 'user', - 'id': id, - }]))[0] - - if entity['uuid']: - cache.set_user(id, entity['uuid']) - - updated_at = entity.get('lister_metadata', {}).get('updated_at') - - return converters.updated_at_to_last_modified(updated_at) diff --git a/swh/lister/github/tasks.py b/swh/lister/github/tasks.py new file mode 100644 index 0000000..dea8523 --- /dev/null +++ b/swh/lister/github/tasks.py @@ -0,0 +1,47 @@ +# Copyright (C) 2016 the Software Heritage developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import random + +from celery import group + +from swh.scheduler.task import Task + +from .lister import GitHubLister + +GROUP_SPLIT = 10000 + + +class IncrementalGitHubLister(Task): + task_queue = 'swh_lister_github_incremental' + + def run(self): + lister = GitHubLister() + last_id = lister.last_repo_id() + lister.fetch(min_id=last_id + 1, max_id=None) + + +class RangeGitHubLister(Task): + task_queue = 'swh_lister_github_full' + + def run(self, start, end): + lister = GitHubLister() + lister.fetch(min_id=start, max_id=end) + + +class FullGitHubLister(Task): + task_queue = 'swh_lister_github_full' + + def run(self): + lister = GitHubLister() + last_id = lister.last_repo_id() + ranges = [ + (i, min(last_id, i + GROUP_SPLIT - 1)) + for i in range(1, last_id, GROUP_SPLIT) + ] + + random.shuffle(ranges) + + range_task = RangeGitHubLister() + group(range_task.s(min, max) for min, max in ranges)() diff --git a/version.txt b/version.txt index 990a0fe..6f9b32e 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.2-0-g9550ca3 \ No newline at end of file +v0.0.3-0-g9809dee \ No newline at end of file