diff --git a/README b/README index ae7b7ed..04baf88 100644 --- a/README +++ b/README @@ -1,47 +1,37 @@ Licensing ========= This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. See top-level LICENSE file for the full text of the GNU General Public License along with this program. Dependencies ============ -- python3 -- python3-psycopg2 -- python3-requests -- python3-sqlalchemy - +See requirements.txt Deployment ========== -1. git clone under $GHLISTER_ROOT (of your choosing) -2. mkdir ~/.config/swh/ ~/.cache/swh/lister-github/ -3. edit $GHLISTER_ROOT/etc/crontab and customize GHLISTER_ROOT -4. crontab $GHLISTER_ROOT/etc/crontab -5. create configuration file ~/.config/swh/lister-github.ini +The github lister can be run standalone by using `python3 -m swh.lister.github.lister`. Sample configuration file ------------------------- -cat ~/.config/swh/lister-github.ini +cat ~/.config/swh/lister/github.ini [main] - db_url = postgres:///github - # see http://docs.sqlalchemy.org/en/latest/core/engines.html#database-urls - cache_dir = /home/zack/.cache/swh/lister-github - log_dir = /home/zack/.cache/swh/lister-github - username = foobar # github username - password = quux # github password + storage_class = local_storage + storage_args = dbname=softwareheritage-dev, /srv/softwareheritage/objects + queue_file = ~/.cache/swh/lister-github/queue.pickle + credentials = olasd:olasd_github_token, zacchiro:zacchiro_github_token \ No newline at end of file diff --git a/TODO b/TODO index 46a84f3..c10c02f 100644 --- a/TODO +++ b/TODO @@ -1,57 +1,49 @@ # -*- mode: org -*- -* TODO SQL: rework repo_history/repo_creations to use last_seen -* TODO cache dir: split json data from other HTTP info - for easier further processing of additional API data - -* TODO cache dir: split in subdirs - to avoid hitting too hard on the filesystem due to the large amount of files - (200k+) - * TODO network-level traceback Traceback (most recent call last): File "/usr/lib/python3/dist-packages/urllib3/response.py", line 186, in read data = self._fp.read(amt) File "/usr/lib/python3.4/http/client.py", line 500, in read return super(HTTPResponse, self).read(amt) File "/usr/lib/python3.4/http/client.py", line 529, in readinto return self._readinto_chunked(b) File "/usr/lib/python3.4/http/client.py", line 621, in _readinto_chunked n = self._safe_readinto(mvb) File "/usr/lib/python3.4/http/client.py", line 680, in _safe_readinto raise IncompleteRead(bytes(mvb[0:total_bytes]), len(b)) http.client.IncompleteRead: IncompleteRead(3201 bytes read, 10240 more expected) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/lib/python3/dist-packages/requests/models.py", line 653, in generate for chunk in self.raw.stream(chunk_size, decode_content=True): File "/usr/lib/python3/dist-packages/urllib3/response.py", line 256, in stream data = self.read(amt=amt, decode_content=decode_content) File "/usr/lib/python3/dist-packages/urllib3/response.py", line 214, in read raise ProtocolError('Connection broken: %r' % e, e) urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(3201 bytes read, 10240 more expected)', IncompleteRead(3201 bytes read, 10240 more expected)) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "bin/ghlister", line 110, in max_id=args.interval[1]) File "/home/zack/dati/projects/github-list-repo/ghlister/lister.py", line 129, in fetch repos_res = gh_api_request('/repositories?since=%d' % since, **cred) File "/home/zack/dati/projects/github-list-repo/ghlister/lister.py", line 55, in gh_api_request r = requests.get(GH_API_URL + path, **params) File "/usr/lib/python3/dist-packages/requests/api.py", line 60, in get return request('get', url, **kwargs) File "/usr/lib/python3/dist-packages/requests/api.py", line 49, in request return session.request(method=method, url=url, **kwargs) File "/usr/lib/python3/dist-packages/requests/sessions.py", line 457, in request resp = self.send(prep, **send_kwargs) File "/usr/lib/python3/dist-packages/requests/sessions.py", line 606, in send r.content File "/usr/lib/python3/dist-packages/requests/models.py", line 724, in content self._content = bytes().join(self.iter_content(CONTENT_CHUNK_SIZE)) or bytes() File "/usr/lib/python3/dist-packages/requests/models.py", line 656, in generate raise ChunkedEncodingError(e) requests.exceptions.ChunkedEncodingError: ('Connection broken: IncompleteRead(3201 bytes read, 10240 more expected)', IncompleteRead(3201 bytes read, 10240 more expected)) diff --git a/bin/batch b/bin/batch deleted file mode 100755 index 9796387..0000000 --- a/bin/batch +++ /dev/null @@ -1,41 +0,0 @@ -#!/bin/bash - -# Copyright (C) 2015 Stefano Zacchiroli -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -export https_proxy="127.0.0.1:8118" # use Tor -export PYTHONPATH=`pwd` - -DBNAME=github -DBCONN="-p 5433" - -psql="psql $DBCONN --no-psqlrc --pset t --pset format=unaligned ${DBNAME}" - -BATCH_NO="$1" -shift -if [ -z "$BATCH_NO" ] ; then - echo "Usage: batch MILLION_NO [ MIN_ID | continue ]" - exit 2 -fi - -MIN_ID="$1" -shift - -min_id=$[ ($BATCH_NO - 1) * 1000000 + 1 ] -max_id=$[ $BATCH_NO * 1000000 ] - -# allow min_id override on the command line -if [ "$MIN_ID" = "continue" ] ; then - last_id=$(echo "select max(id) from repos where ${min_id} <= id and id <= ${max_id}" | $psql) - if [ "$last_id" -eq "$last_id" ] 2> /dev/null ; then # is an integer? - echo "Continuing from last known id ${last_id}" - min_id=$last_id - fi -elif [ -n "$MIN_ID" ] ; then - min_id=$[ $MIN_ID > $min_id ? $MIN_ID : $min_id ] -fi - -cmd="bin/ghlister list ${min_id}-${max_id}" -echo Running $cmd ... -$cmd diff --git a/bin/ghlister b/bin/ghlister deleted file mode 100755 index da4c5d9..0000000 --- a/bin/ghlister +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/python3 - -# Copyright (C) 2015 Stefano Zacchiroli -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import argparse -import configparser -import logging -import os -import sys - -from sqlalchemy import create_engine -from sqlalchemy.orm import sessionmaker - -from swh.lister.github import lister, models -from swh.lister.github.db_utils import session_scope - - -DEFAULT_CONF = { - 'cache_dir': './cache', - 'log_dir': './log', - 'cache_json': 'False', -} - - -def db_connect(db_url): - engine = create_engine(db_url) - session = sessionmaker(bind=engine) - - return (engine, session) - - -def int_interval(s): - """parse an "N-M" string as an interval. - - Return an (N,M) int (or None) pair - - """ - def not_an_interval(): - raise argparse.ArgumentTypeError('not an interval: ' + s) - - def parse_int(s): - if s: - return int(s) - else: - return None - - if '-' not in s: - not_an_interval() - parts = s.split('-') - if len(parts) > 2: - not_an_interval() - return tuple([parse_int(p) for p in parts]) - - -def parse_args(): - cli = argparse.ArgumentParser( - description='list GitHub repositories and load them into a DB') - cli.add_argument('--db-url', '-d', metavar='SQLALCHEMY_URL', - help='SQLAlchemy DB URL (override conffile); see ' - '') # NOQA - cli.add_argument('--verbose', '-v', action='store_true', - help='be verbose') - - subcli = cli.add_subparsers(dest='action') - subcli.add_parser('createdb', help='initialize DB') - subcli.add_parser('dropdb', help='destroy DB') - - list_cli = subcli.add_parser('list', help='list repositories') - list_cli.add_argument('interval', - type=int_interval, - help='interval of repository IDs to list, ' - 'in N-M format; either N or M can be omitted.') - - list_cli = subcli.add_parser('catchup', - help='catchup with new repos since last time') - - args = cli.parse_args() - - if not args.action: - cli.error('no action given') - - return args - - -def read_conf(args): - config = configparser.ConfigParser(defaults=DEFAULT_CONF) - config.read(os.path.expanduser('~/.config/swh/lister-github.ini')) - - conf = config._sections['main'] - - # overrides - if args.db_url: - conf['db_url'] = args.db_url - - # typing - if 'cache_json' in conf and conf['cache_json'].lower() == 'true': - conf['cache_json'] = True - else: - conf['cache_json'] = False - - return conf - - -if __name__ == '__main__': - logging.basicConfig(level=logging.INFO) # XXX - - args = parse_args() - conf = read_conf(args) - - db_engine, mk_session = db_connect(conf['db_url']) - - if args.action == 'createdb': - models.SQLBase.metadata.create_all(db_engine) - elif args.action == 'dropdb': - models.SQLBase.metadata.drop_all(db_engine) - elif args.action == 'list': - lister.fetch(conf, - mk_session, - min_id=args.interval[0], - max_id=args.interval[1]) - elif args.action == 'catchup': - with session_scope(mk_session) as db_session: - last_known_id = lister.last_repo_id(db_session) - if last_known_id is not None: - logging.info('catching up from last known repo id: %d' % - last_known_id) - lister.fetch(conf, - mk_session, - min_id=last_known_id + 1, - max_id=None) - else: - logging.error('Cannot catchup: no last known id found. Abort.') - sys.exit(2) diff --git a/bin/reset.sh b/bin/reset.sh deleted file mode 100644 index f5bf69b..0000000 --- a/bin/reset.sh +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright (C) 2015 Stefano Zacchiroli -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -export PYTHONPATH=`pwd` -dropdb github -createdb github -bin/ghlister createdb -rm cache/* diff --git a/bin/status b/bin/status deleted file mode 100755 index 8a3105f..0000000 --- a/bin/status +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash - -# Copyright (C) 2015 Stefano Zacchiroli -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -# intended usage: watch -n 60 bin/status - -DBNAME="github" -DBCONN="-p 5433" - -psql="psql $DBCONN --no-psqlrc" - -ps auxw | grep bin/batch -echo "SELECT COUNT(*), MAX(id) FROM repos" | $psql "$DBNAME" -echo "\\l+ ${DBNAME}" | $psql "$DBNAME" -du -sh cache/ -zgrep -i --color=auto "'X-RateLimit-Remaining'" cache/$(ls -t cache/ | head -n 4 | tail -n 1) diff --git a/etc/crontab b/etc/crontab deleted file mode 100644 index 4ebb2d1..0000000 --- a/etc/crontab +++ /dev/null @@ -1,5 +0,0 @@ -SHELL=/bin/bash -GHLISTER_ROOT=/home/zack/src/swh-lister-github - -# m h dom mon dow command - 0 8 * * * PYTHONPATH=$GHLISTER_ROOT $GHLISTER_ROOT/bin/ghlister catchup >> ~/.cache/swh/lister-github/$(date +\%Y\%m\%d).log 2>&1 diff --git a/requirements.txt b/requirements.txt index 833d5e7..851b9e8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,5 @@ +dateutil requests -SQLAlchemy + +swh.core +swh.storage diff --git a/sql/crawler.sql b/sql/crawler.sql deleted file mode 100644 index 0a30b54..0000000 --- a/sql/crawler.sql +++ /dev/null @@ -1,106 +0,0 @@ - --- -- return a random sample of repos, containing %percent repositories --- create or replace function repos_random_sample_array(percent real) --- returns setof repos as $$ --- declare --- samples integer; --- repo repos%rowtype; --- ids integer[]; --- begin --- select floor(count(*) / 100 * percent) into samples from repos; --- ids := array(select id from repos order by id); --- for i in 1 .. samples loop --- select * into repo --- from repos --- where id = ids[round(random() * samples)]; --- return next repo; --- end loop; --- return; --- end --- $$ --- language plpgsql; - --- return a random sample of repositories -create or replace function repos_random_sample(percent real) -returns setof repos as $$ -declare - sample_size integer; -begin - select floor(count(*) / 100 * percent) into sample_size from repos; - return query - select * from repos - order by random() - limit sample_size; - return; -end -$$ -language plpgsql; - --- -- return a random sample of repositories --- create or replace function random_sample_sequence(percent real) --- returns setof repos as $$ --- declare --- sample_size integer; --- seq_size integer; --- min_id integer; --- max_id integer; --- begin --- select floor(count(*) / 100 * percent) into sample_size from repos; --- select min(id) into min_id from repos; --- select max(id) into max_id from repos; --- seq_size := sample_size * 3; -- IDs are sparse, generate a larger sequence --- -- to have enough of them --- return query --- select * from repos --- where id in --- (select floor(random() * (max_id - min_id + 1))::integer --- + min_id --- from generate_series(1, seq_size)) --- order by random() limit sample_size; --- return; --- end --- $$ --- language plpgsql; - -create or replace function repos_well_known() -returns setof repos as $$ -begin - return query - select * from repos - where full_name like 'apache/%' - or full_name like 'eclipse/%' - or full_name like 'mozilla/%' - or full_name = 'torvalds/linux' - or full_name = 'gcc-mirror/gcc'; - return; -end -$$ -language plpgsql; - -create table crawl_history ( - id bigserial primary key, - repo integer references repos(id), - task_id uuid, -- celery task id - date timestamptz not null, - duration interval, - status boolean, - result json, - stdout text, - stderr text -); - -create index on crawl_history (repo); - -create view missing_orig_repos AS - select * - from orig_repos as repos - where not exists - (select 1 from crawl_history as history - where history.repo = repos.id); - -create view missing_fork_repos AS - select * - from fork_repos as repos - where not exists - (select 1 from crawl_history as history - where history.repo = repos.id); diff --git a/sql/pimp_db.sql b/sql/pimp_db.sql deleted file mode 100644 index 2cc9cef..0000000 --- a/sql/pimp_db.sql +++ /dev/null @@ -1,36 +0,0 @@ - -create view orig_repos as - select id, name, full_name, html_url, description, last_seen - from repos - where not fork; - -create view fork_repos as - select id, name, full_name, html_url, description, last_seen - from repos - where fork - -create extension pg_trgm; - -create index ix_trgm_repos_description on - repos using gin (description gin_trgm_ops); - -create index ix_trgm_repos_full_name on - repos using gin (full_name gin_trgm_ops); - -create table repos_history ( - ts timestamp default current_timestamp, - repos integer not null, - fork_repos integer, - orig_repos integer -); - -create view repo_creations as - select today.ts :: date as date, - today.repos - yesterday.repos as repos, - today.fork_repos - yesterday.fork_repos as fork_repos, - today.orig_repos - yesterday.orig_repos as orig_repos - from repos_history today - join repos_history yesterday on - (yesterday.ts = (select max(ts) - from repos_history - where ts < today.ts)); diff --git a/swh/lister/github/cache.py b/swh/lister/github/cache.py new file mode 100644 index 0000000..ac58e8b --- /dev/null +++ b/swh/lister/github/cache.py @@ -0,0 +1,27 @@ +# Copyright © 2016 The Software Heritage Developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +GITHUB_USER_UUID_CACHE = {} +GITHUB_REPO_UUID_CACHE = {} + + +def get_user(id): + """Get the cache value for user `id`""" + return GITHUB_USER_UUID_CACHE.get(id) + + +def set_user(id, uuid): + """Set the cache value for user `id`""" + GITHUB_USER_UUID_CACHE[id] = uuid + + +def get_repo(id): + """Get the cache value for repo `id`""" + return GITHUB_REPO_UUID_CACHE.get(id) + + +def set_repo(id, uuid): + """Set the cache value for repo `id`""" + GITHUB_REPO_UUID_CACHE[id] = uuid diff --git a/swh/lister/github/converters.py b/swh/lister/github/converters.py new file mode 100644 index 0000000..e630a64 --- /dev/null +++ b/swh/lister/github/converters.py @@ -0,0 +1,94 @@ +# Copyright © 2016 The Software Heritage Developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import copy +import datetime +from email.utils import format_datetime + +from dateutil.parser import parse as parse_datetime + +from . import cache, storage_utils + + +def utcnow(): + return datetime.datetime.now(tz=datetime.timezone.utc) + + +def updated_at_to_last_modified(updated_at): + if not updated_at: + return None + + dt = parse_datetime(updated_at).astimezone(datetime.timezone.utc) + return format_datetime(dt, usegmt=True) + + +def repository_to_entity(orig_entity, repo): + """Convert a repository to an entity""" + + entity = copy.deepcopy(orig_entity) + + owner_uuid = cache.get_user(repo['owner']['id']) + if not owner_uuid: + raise ValueError("Owner %s (id=%d) not in cache" % ( + repo['owner']['login'], repo['owner']['id'])) + + entity['parent'] = owner_uuid + entity['name'] = repo['full_name'] + entity['type'] = 'project' + entity['description'] = repo['description'] + if 'homepage' in repo: + entity['homepage'] = repo['homepage'] + entity['active'] = True + entity['generated'] = True + + entity['lister_metadata']['lister'] = storage_utils.GITHUB_LISTER_UUID + entity['lister_metadata']['type'] = 'repository' + entity['lister_metadata']['id'] = repo['id'] + entity['lister_metadata']['fork'] = repo['fork'] + + if 'updated_at' in repo: + entity['lister_metadata']['updated_at'] = repo['updated_at'] + + entity['validity'] = [utcnow()] + + return entity + + +def user_to_entity(orig_entity, user): + """Convert a GitHub user toan entity""" + + entity = copy.deepcopy(orig_entity) + + if user['type'] == 'User': + parent = storage_utils.GITHUB_USERS_UUID + type = 'person' + elif user['type'] == 'Organization': + parent = storage_utils.GITHUB_ORGS_UUID + type = 'group_of_persons' + else: + raise ValueError("Unknown GitHub user type %s" % user['type']) + + entity['parent'] = parent + + if 'name' in user: + entity['name'] = user['name'] + + if not entity.get('name'): + entity['name'] = user['login'] + + entity['type'] = type + entity['active'] = True + entity['generated'] = True + + entity['lister_metadata']['lister'] = storage_utils.GITHUB_LISTER_UUID + entity['lister_metadata']['type'] = 'user' + entity['lister_metadata']['id'] = user['id'] + entity['lister_metadata']['login'] = user['login'] + + if 'updated_at' in user: + entity['lister_metadata']['updated_at'] = user['updated_at'] + + entity['validity'] = [datetime.datetime.now()] + + return entity diff --git a/swh/lister/github/db_utils.py b/swh/lister/github/db_utils.py deleted file mode 100644 index 0563036..0000000 --- a/swh/lister/github/db_utils.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright (C) 2015 Stefano Zacchiroli -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from contextlib import contextmanager - - -@contextmanager -def session_scope(mk_session): - session = mk_session() - try: - yield session - session.commit() - except: - session.rollback() - raise - finally: - session.close() diff --git a/swh/lister/github/generate_tasks.py b/swh/lister/github/generate_tasks.py deleted file mode 100644 index 1184b73..0000000 --- a/swh/lister/github/generate_tasks.py +++ /dev/null @@ -1,111 +0,0 @@ -# Copyright (C) 2015 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import psycopg2 -import pickle - - -def list_imported_repos(swh_db): - """List all the repositories that have been successfully imported in Software - Heritage. - """ - query = """ - select o.url - from origin o - left join fetch_history fh - on o.id = fh.origin - where - fh.status = true and - o.url ~~ 'https://github.com/%' - """ - - cur = swh_db.cursor() - cur.execute(query) - res = cur.fetchall() - cur.close() - return set('/'.join(repo.rsplit('/', 2)[-2:]) for (repo,) in res) - - -def list_fetched_repos(ghlister_db): - """List all the repositories that have been successfully fetched from GitHub. - """ - query = """ - select r.full_name - from crawl_history ch - left join repos r - on ch.repo = r.id - where - ch.status = true and - r.fork = false - """ - - cur = ghlister_db.cursor() - cur.execute(query) - res = cur.fetchall() - cur.close() - return set(repo for (repo,) in res) - - -def list_missing_repos(): - """List all the repositories that have not yet been imported successfully.""" - swh_db = psycopg2.connect('service=softwareheritage') - imported_repos = list_imported_repos(swh_db) - swh_db.close() - - ghlister_db = psycopg2.connect('service=lister-github') - fetched_repos = list_fetched_repos(ghlister_db) - ghlister_db.close() - - return fetched_repos - imported_repos - - -def generate_tasks(checkpoint_file='repos', checkpoint_every=100000): - """Generate the Celery tasks to fetch all the missing repositories. - - Checkpoint the missing repositories every checkpoint_every tasks sent, in a - pickle file called checkpoint_file. - - If the checkpoint file exists, we do not call the database again but load - from the file. - """ - import swh.loader.git.tasks - from swh.core.worker import app # flake8: noqa for side effects - - def checkpoint_repos(repos, checkpoint=checkpoint_file): - tmp = '.%s.tmp' % checkpoint - with open(tmp, 'wb') as f: - pickle.dump(repos, f) - - os.rename(tmp, checkpoint) - - def fetch_checkpoint_repos(checkpoint=checkpoint_file): - with open(checkpoint, 'rb') as f: - return pickle.load(f) - - repos = set() - - if not os.path.exists(checkpoint_file): - repos = list_missing_repos() - checkpoint_repos(repos) - else: - repos = fetch_checkpoint_repos() - - task = app.tasks['swh.loader.git.tasks.LoadGitHubRepository'] - - ctr = 0 - while True: - try: - repo = repos.pop() - except KeyError: - break - - task.delay(repo) - - ctr += 1 - if ctr >= checkpoint_every: - ctr = 0 - checkpoint_repos(repos) - - os.unlink(checkpoint) diff --git a/swh/lister/github/github_api.py b/swh/lister/github/github_api.py new file mode 100644 index 0000000..0e4807f --- /dev/null +++ b/swh/lister/github/github_api.py @@ -0,0 +1,137 @@ +# Copyright © 2016 The Software Heritage Developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# see https://developer.github.com/v3/ for the GitHub API documentation + +import time +from urllib.parse import urljoin + +import requests + + +GITHUB_API_BASE = 'https://api.github.com/' + + +def github_api_request(url, last_modified=None, etag=None, session=None, + credentials=None): + """Make a request to the GitHub API at 'url'. + + Args: + url: the URL of the GitHub API endpoint to request + last_modified: the last time that URL was requested + etag: the etag for the last answer at this URL (overrides + last_modified) + session: a requests session + credentials: a list of dicts for GitHub credentials with keys: + login: the GitHub login for the credential + token: the API token for the credential + x_ratelimit_*: the rate-limit info for the given credential + + Returns: + a dict with the following keys: + credential_used: the login for the credential used + x_ratelimit_*: GitHub rate-limiting information + """ + print("Requesting url %s" % url) + if not session: + session = requests + + headers = { + 'Accept': 'application/vnd.github.v3+json', + } + + if etag: + headers['If-None-Match'] = etag + else: + if last_modified: + headers['If-Modified-Since'] = last_modified + + if not credentials: + credentials = {None: {}} + + reply = None + ret = {} + for login, creds in credentials.items(): + # Handle rate-limiting + remaining = creds.get('x_ratelimit_remaining') + reset = creds.get('x_ratelimit_reset') + if remaining == 0 and reset and reset > time.time(): + continue + + kwargs = {} + if login and creds['token']: + kwargs['auth'] = (login, creds['token']) + + reply = session.get(url, headers=headers, **kwargs) + + ratelimit = { + key.lower().replace('-', '_'): int(value) + for key, value in reply.headers.items() + if key.lower().startswith('x-ratelimit') + } + + ret.update(ratelimit) + creds.update(ratelimit) + + if not(reply.status_code == 403 and + ratelimit.get('x_ratelimit_remaining') == 0): + # Request successful, let's get out of here + break + else: + # we broke out of the loop + raise ValueError('All out of credentials', credentials) + + etag = reply.headers.get('ETag') + if etag and etag.startswith(('w/', 'W/')): + # Strip down reference to "weak" etags + etag = etag[2:] + + ret.update({ + 'url': url, + 'code': reply.status_code, + 'data': reply.json() if reply.status_code != 304 else None, + 'etag': etag, + 'last_modified': reply.headers.get('Last-Modified'), + 'links': reply.links, + 'login': login, + }) + + return ret + + +def repositories(since=0, url=None, session=None, credentials=None): + """Request the list of public repositories with id greater than `since`""" + if not url: + url = urljoin(GITHUB_API_BASE, 'repositories?since=%s' % since) + + req = github_api_request(url, session=session, credentials=credentials) + + return req + + +def repository(id, session=None, credentials=None, last_modified=None): + """Request the information on the repository with the given id""" + url = urljoin(GITHUB_API_BASE, 'repositories/%d' % id) + req = github_api_request(url, session=session, credentials=credentials, + last_modified=last_modified) + + return req + + +def forks(id, page, session=None, credentials=None): + """Request the information on the repository with the given id""" + url = urljoin(GITHUB_API_BASE, + 'repositories/%d/forks?sort=oldest&page=%d' % (id, page)) + req = github_api_request(url, session=session, credentials=credentials) + + return req + + +def user(id, session=None, credentials=None, last_modified=None): + """Request the information on the user with the given id""" + url = urljoin(GITHUB_API_BASE, 'user/%d' % id) + req = github_api_request(url, session=session, credentials=credentials, + last_modified=last_modified) + + return req diff --git a/swh/lister/github/lister.py b/swh/lister/github/lister.py index 9bbde6a..a0151ab 100644 --- a/swh/lister/github/lister.py +++ b/swh/lister/github/lister.py @@ -1,161 +1,52 @@ -# Copyright (C) 2015 Stefano Zacchiroli +# Copyright © 2016 The Software Heritage Developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -# see https://developer.github.com/v3/ for GitHub API documentation - -import gzip -import logging import os -import re -import requests -import time - -from pprint import pformat -from sqlalchemy import func - -from swh.lister.github.db_utils import session_scope -from swh.lister.github.models import Repository - - -GH_API_URL = 'https://api.github.com' -MAX_RETRIES = 7 -MAX_SLEEP = 3600 # 1 hour -CONN_SLEEP = 10 - -REPO_API_URL_RE = re.compile(r'^.*/repositories\?since=(\d+)') - - -def save_http_response(r, cache_dir): - def escape_url_path(p): - return p.replace('/', '__') - - fname = os.path.join(cache_dir, - escape_url_path(r.request.path_url) + '.gz') - with gzip.open(fname, 'w') as f: - def emit(s): - f.write(bytes(s, 'UTF-8')) - emit(pformat(r.request.path_url)) - emit('\n#\n') - emit(pformat(r.status_code)) - emit('\n#\n') - emit(pformat(r.headers)) - emit('\n#\n') - emit(pformat(r.json())) - - -def gh_api_request(path, username=None, password=None, headers={}): - params = {} - if 'Accept' not in headers: # request version 3 of the API - headers['Accept'] = 'application/vnd.github.v3+json' - params['headers'] = headers - if username is not None and password is not None: - params['auth'] = (username, password) - retries_left = MAX_RETRIES - while retries_left > 0: - logging.debug('sending API request: %s' % path) - try: - r = requests.get(GH_API_URL + path, **params) - except requests.exceptions.ConnectionError: - # network-level connection error, try again - logging.warn('connection error upon %s: sleep for %d seconds' % - (path, CONN_SLEEP)) - time.sleep(CONN_SLEEP) - retries_left -= 1 - continue - - if r.ok: # all went well, do not retry - break - - # detect throttling - if r.status_code == 403 and \ - int(r.headers['X-RateLimit-Remaining']) == 0: - delay = int(r.headers['X-RateLimit-Reset']) - time.time() - delay = min(delay, MAX_SLEEP) - logging.warn('rate limited upon %s: sleep for %d seconds' % - (path, int(delay))) - time.sleep(delay) - else: # unexpected error, abort - break - - retries_left -= 1 - - if not retries_left: - logging.warn('giving up on %s: max retries exceed' % path) - - return r - - -def lookup_repo(db_session, repo_id): - return db_session.query(Repository) \ - .filter(Repository.id == repo_id) \ - .first() - - -def last_repo_id(db_session): - t = db_session.query(func.max(Repository.id)) \ - .first() - if t is not None: - return t[0] - # else: return None - - -INJECT_KEYS = ['id', 'name', 'full_name', 'html_url', 'description', 'fork'] +import requests +from swh.core.config import load_named_config, prepare_folders +from swh.storage import get_storage -def inject_repo(db_session, repo): - logging.debug('injecting repo %d' % repo['id']) - if lookup_repo(db_session, repo['id']): - logging.info('not injecting already present repo %d' % repo['id']) - return - kwargs = {k: repo[k] for k in INJECT_KEYS if k in repo} - sql_repo = Repository(**kwargs) - db_session.add(sql_repo) +from . import req_queue, processors +DEFAULT_CONFIG = { + 'queue_file': ('str', '~/.cache/swh/lister-github/queue.pickle'), + 'storage_class': ('str', 'local_storage'), + 'storage_args': ('list[str]', ['dbname=softwareheritage-dev', + '/srv/softwareheritage/objects']), + 'credentials': ('list[str]', []), -class FetchError(RuntimeError): +} +CONFIG_NAME = 'lister/github.ini' - def __init__(self, response): - self.response = response - def __str__(self): - return repr(self.response) +def run_from_queue(): + config = load_named_config(CONFIG_NAME, DEFAULT_CONFIG) + queue_file = os.path.expanduser(config['queue_file']) + prepare_folders(os.path.dirname(queue_file)) -def fetch(conf, mk_session, min_id=None, max_id=None): - if min_id is None: - min_id = 1 - if max_id is None: - max_id = float('inf') - next_id = min_id + credentials = {} + for credential in config['credentials']: + login, token = credential.split(':') + credentials[login] = {'token': token} - cred = {} - for key in ['username', 'password']: - if key in conf: - cred[key] = conf[key] + queue = req_queue.restore_from_file(queue_file) - while min_id <= next_id <= max_id: - logging.info('listing repos starting at %d' % next_id) - since = next_id - 1 # github API ?since=... is '>' strict, not '>=' - repos_res = gh_api_request('/repositories?since=%d' % since, **cred) + if req_queue.empty(queue): + req_queue.push(queue, {'type': 'repositories', 'url': None}) - if 'cache_dir' in conf and conf['cache_json']: - save_http_response(repos_res, conf['cache_dir']) - if not repos_res.ok: - raise FetchError(repos_res) + session = requests.Session() + storage = get_storage(config['storage_class'], config['storage_args']) - repos = repos_res.json() - for repo in repos: - if repo['id'] > max_id: # do not overstep max_id - break - with session_scope(mk_session) as db_session: - inject_repo(db_session, repo) + try: + while not req_queue.empty(queue): + processors.process_one_item( + queue, session=session, credentials=credentials, + storage=storage + ) - if 'next' in repos_res.links: - next_url = repos_res.links['next']['url'] - m = REPO_API_URL_RE.match(next_url) # parse next_id - next_id = int(m.group(1)) + 1 - else: - logging.info('stopping after id %d, no next link found' % next_id) - break + finally: + req_queue.dump_to_file(queue, queue_file) diff --git a/swh/lister/github/models.py b/swh/lister/github/models.py deleted file mode 100644 index c4cb027..0000000 --- a/swh/lister/github/models.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright (C) 2015 Stefano Zacchiroli -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from datetime import datetime - -from sqlalchemy import Column -from sqlalchemy import Boolean, DateTime, Integer, String -from sqlalchemy.ext.declarative import declarative_base - - -SQLBase = declarative_base() - - -class Repository(SQLBase): - - """a GitHub repository""" - - __tablename__ = 'repos' - - id = Column(Integer, primary_key=True) - - name = Column(String, index=True) - full_name = Column(String, index=True) - html_url = Column(String) - description = Column(String) - fork = Column(Boolean, index=True) - - last_seen = Column(DateTime, nullable=False) - - def __init__(self, id, name=None, full_name=None, html_url=None, - description=None, fork=None): - self.id = id - self.last_seen = datetime.now() - if name is not None: - self.name = name - if full_name is not None: - self.full_name = full_name - if html_url is not None: - self.html_url = html_url - if description is not None: - self.description = description - if fork is not None: - self.fork = fork diff --git a/swh/lister/github/processors.py b/swh/lister/github/processors.py new file mode 100644 index 0000000..9d764de --- /dev/null +++ b/swh/lister/github/processors.py @@ -0,0 +1,121 @@ +# Copyright © 2016 The Software Heritage Developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +from . import github_api, req_queue, storage_utils + + +class ProcessError(ValueError): + pass + + +def repositories(item, queue, session, credentials, storage): + print('Processing scrolling repositories %s' % item['url']) + repos = github_api.repositories(url=item['url'], session=session, + credentials=credentials) + if not repos['code'] == 200: + raise ProcessError(item) + + storage_utils.update_repo_entities(storage, repos['data']) + + for repo in repos['data']: + if not repo['fork']: + req_queue.push(queue, { + 'type': 'repository', + 'repo_name': repo['full_name'], + 'repo_id': repo['id'], + }) + + if 'next' in repos['links']: + req_queue.push(queue, { + 'type': 'repositories', + 'url': repos['links']['next']['url'], + }) + + +def repository(item, queue, session, credentials, storage): + print('Processing repository %s (%s)' % (item['repo_name'], + item['repo_id'])) + + last_modified = storage_utils.repo_last_modified(storage, item['repo_id']) + data = github_api.repository(item['repo_id'], session, credentials, + last_modified) + + print(last_modified, '/', data['last_modified']) + if data['code'] == 304: + print('not modified') + # Not modified + # XXX: add validity + return + elif data['code'] == 200: + print('modified') + storage_utils.update_repo_entities(storage, [data['data']]) + if data['data']['forks']: + req_queue.push(queue, { + 'type': 'forks', + 'repo_id': item['repo_id'], + 'repo_name': item['repo_name'], + 'forks_page': 1, + }) + return + else: + print('Could not get reply for repository %s' % item['repo_name']) + print(data) + + +def forks(item, queue, session, credentials, storage): + print('Processing forks for repository %s (%s, page %s)' % ( + item['repo_name'], item['repo_id'], item['forks_page'])) + + forks = github_api.forks(item['repo_id'], item['forks_page'], session, + credentials) + + storage_utils.update_repo_entities(storage, forks['data']) + + if 'next' in forks['links']: + req_queue.push(queue, { + 'type': 'forks', + 'repo_id': item['repo_id'], + 'repo_name': item['repo_name'], + 'forks_page': item['forks_page'] + 1, + }) + + +def user(item, queue, session, credentials, storage): + print('Processing user %s (%s)' % (item['user_login'], item['user_id'])) + + last_modified = storage_utils.user_last_modified(storage, item['repo_id']) + + data = github_api.user(item['user_id'], session, credentials, + last_modified) + + print(last_modified, '/', data['last_modified']) + if data['code'] == 304: + print('not modified') + # Not modified + # XXX: add validity + return + elif data['code'] == 200: + print('modified') + storage_utils.update_user_entities(storage, [data['data']]) + return + else: + print('Could not get reply for user %s' % item['user_login']) + print(data) + +PROCESSORS = { + 'repositories': repositories, + 'repository': repository, + 'forks': forks, + 'user': user, +} + + +def process_one_item(queue, session, credentials, storage): + item = req_queue.pop(queue) + try: + PROCESSORS[item['type']](item, queue, session, credentials, storage) + except Exception: + req_queue.push_front(queue, item) + raise diff --git a/swh/lister/github/req_queue.py b/swh/lister/github/req_queue.py new file mode 100644 index 0000000..cc32c3b --- /dev/null +++ b/swh/lister/github/req_queue.py @@ -0,0 +1,52 @@ +# Copyright © 2016 The Software Heritage Developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import defaultdict, deque +import os +import pickle +import tempfile + +PRIORITIES = { + 'forks': 10, + 'user': 15, + 'repository': 20, + 'repositories': 30, +} + + +def restore_from_file(file): + if not os.path.exists(file): + return defaultdict(deque) + + with open(file, 'rb') as f: + return pickle.load(f) + + +def dump_to_file(queue, file): + fd, filename = tempfile.mkstemp(dir=os.path.dirname(file)) + with open(fd, 'wb') as f: + pickle.dump(queue, f) + f.flush() + os.fsync(fd) + os.rename(filename, file) + + +def push(queue, item): + queue[item['type']].append(item) + + +def push_front(queue, item): + queue[item['type']].appendleft(item) + + +def pop(queue): + for type in sorted(queue, key=lambda t: PRIORITIES.get(t, 1000)): + if queue[type]: + return queue[type].popleft() + + raise IndexError("No items to pop") + + +def empty(queue): + return not queue diff --git a/swh/lister/github/storage_utils.py b/swh/lister/github/storage_utils.py new file mode 100644 index 0000000..8b4a53e --- /dev/null +++ b/swh/lister/github/storage_utils.py @@ -0,0 +1,122 @@ +# Copyright © 2016 The Software Heritage Developers +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import uuid + +from . import cache, converters + + +GITHUB_ORGS_UUID = '9f7b34d9-aa98-44d4-8907-b332c1036bc3' +GITHUB_USERS_UUID = 'ad6df473-c1d2-4f40-bc58-2b091d4a750e' +GITHUB_LISTER_UUID = '34bd6b1b-463f-43e5-a697-785107f598e4' + + +def update_user_entities(storage, users): + """Update entities for several users in storage. Returns the new entities. + """ + + users = list(sorted(users, key=lambda u: u['id'])) + + query = [{ + 'lister': GITHUB_LISTER_UUID, + 'type': 'user', + 'id': user['id'], + } for user in users] + + entities = list(storage.entity_get_from_lister_metadata(query)) + + new_entities = [] + + for user, entity in zip(users, entities): + if not entity['uuid']: + entity = { + 'uuid': uuid.uuid4(), + 'doap': {}, + 'lister_metadata': {}, + } + new_entity = converters.user_to_entity(entity, user) + cache.set_user(user['id'], new_entity['uuid']) + new_entities.append(new_entity) + + storage.entity_add(new_entities) + + return new_entities + + +def update_repo_entities(storage, repos): + """Update entities for several repositories in storage. Returns the new + entities.""" + + repos = list(sorted(repos, key=lambda r: r['id'])) + + users = {} + for repo in repos: + if not cache.get_user(repo['owner']['id']): + users[repo['owner']['id']] = repo['owner'] + + if users: + update_user_entities(storage, users.values()) + + query = [{ + 'lister': GITHUB_LISTER_UUID, + 'type': 'repository', + 'id': repo['id'], + } for repo in repos] + + entities = list(storage.entity_get_from_lister_metadata(query)) + + new_entities = [] + + for repo, entity in zip(repos, entities): + if not entity['uuid']: + entity = { + 'uuid': uuid.uuid4(), + 'doap': {}, + 'lister_metadata': {}, + } + new_entities.append(converters.repository_to_entity(entity, repo)) + + storage.entity_add(new_entities) + + return new_entities + + +def repo_last_modified(storage, id): + entity_id = cache.get_repo(id) + + if entity_id: + entity = storage.entity_get_one(entity_id) + else: + entity = list(storage.entity_get_from_lister_metadata([{ + 'lister': GITHUB_LISTER_UUID, + 'type': 'repository', + 'id': id, + }]))[0] + + if entity['uuid']: + cache.set_repo(id, entity['uuid']) + + updated_at = entity.get('lister_metadata', {}).get('updated_at') + + return converters.updated_at_to_last_modified(updated_at) + + +def user_last_modified(storage, id): + entity_id = cache.get_user(id) + + if entity_id: + entity = storage.entity_get_one(entity_id) + else: + entity = list(storage.entity_get_from_lister_metadata([{ + 'lister': GITHUB_LISTER_UUID, + 'type': 'user', + 'id': id, + }]))[0] + + if entity['uuid']: + cache.set_repo(id, entity['uuid']) + + updated_at = entity.get('lister_metadata', {}).get('updated_at') + + return converters.updated_at_to_last_modified(updated_at)