diff --git a/bin/test-bulk-loader-all b/bin/test-bulk-loader-all new file mode 100755 index 0000000..7a340b9 --- /dev/null +++ b/bin/test-bulk-loader-all @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 + +import logging +import sys + +import psycopg2 +import psycopg2.extras +import pygit2 + +from swh.core import config +from swh.loader.git.git import BulkLoader + +DEFAULT_CONFIG = { + 'lister_db': ('str', 'dbname=lister-github'), + 'repo_basepath': ('str', '/srv/storage/space/data/github'), + 'dry_run': ('bool', True), + + 'db': ('str', 'dbname=softwareheritage-dev'), + 'storage_base': ('str', '/tmp/swh-loader-git/test'), + 'repo_path': ('str', None), + + 'origin': ('int', -1), + 'authority': ('int', 1), + 'validity': ('str', '2015-01-01 00:00:00+00'), + + 'create_origin': ('bool', True), + 'send_contents': ('bool', True), + 'send_directories': ('bool', True), + 'send_revisions': ('bool', True), + 'send_releases': ('bool', True), + 'send_occurrences': ('bool', True), + + 'content_packet_size': ('int', 100000), + 'directory_packet_size': ('int', 25000), + 'revision_packet_size': ('int', 100000), + 'release_packet_size': ('int', 100000), + 'occurrence_packet_size': ('int', 100000), +} + +logging.basicConfig(level=logging.INFO, + format='%(asctime)s %(name)s %(levelname)s %(message)s') + +base_config = config.read(sys.argv[1], DEFAULT_CONFIG) + +def process_one_repository(repo_name): + my_config = base_config.copy() + + basepath = my_config['repo_basepath'] + + my_path = os.path.join(basepath, repo_name[0], repo_name) + + if not os.path.exists(my_path): + logging.error('Repository %s does not exist at %s' % (repo_name, + my_path)) + return + + witness_file = os.path.join(my_path, 'witness') + if not os.path.exists(witness_file): + logging.warn('No witness file for repository %s, using default value ' + '%s' % (repo_name, my_config['validity'])) + else: + validity_timestamp = os.stat(witness_file).st_mtime + my_config['validity'] = "%s+00" % datetime.datetime.utcfromtimestamp( + validity_timestamp) + + logging.info('Processing repository %s fetched on %s' % (repo_name, + my_config['validity'])) + + if my_config['dry_run']: + return + + loader = BulkLoader(my_config) + loader.process() + +def list_random_repos(config): + db = psycopg2.connect(config['lister_db'], + cursor_factory=psycopg2.extras.NamedTupleCursor) + query = '''select full_name from repos_random_sample(0.001) r + inner join crawl_history c on r.id = c.repo where status=true''' + cur = db.cursor() + cur.execute(query) + ret = cur.fetchall() + cur.close() + db.close() + + return ret + +processed_repos = set() + +while True: + for repo in list_random_repos(): + repo_name = repo.full_name + if repo_name not in processed_repos: + try: + process_one_repository(repo_name) + except Exception: + logging.exception('Failed processing repository %s' % repo_name) + finally: + processed_repos.add(repo_name)