diff --git a/bin/test-bulk-loader-all b/bin/test-bulk-loader-all index 52ec8e1..bb6aea3 100755 --- a/bin/test-bulk-loader-all +++ b/bin/test-bulk-loader-all @@ -1,109 +1,114 @@ #!/usr/bin/env python3 import datetime import logging import os import sys import psycopg2 import psycopg2.extras from swh.core import config from swh.loader.git.git import BulkLoader DEFAULT_CONFIG = { 'lister_db': ('str', 'dbname=lister-github'), 'repo_basepath': ('str', '/srv/storage/space/data/github'), 'dry_run': ('bool', True), 'db': ('str', 'dbname=softwareheritage-dev'), 'storage_base': ('str', '/tmp/swh-loader-git/test'), 'repo_path': ('str', None), 'origin': ('int', -1), 'authority': ('int', 1), 'validity': ('str', '2015-01-01 00:00:00+00'), 'create_origin': ('bool', True), 'send_contents': ('bool', True), 'send_directories': ('bool', True), 'send_revisions': ('bool', True), 'send_releases': ('bool', True), 'send_occurrences': ('bool', True), 'content_packet_size': ('int', 100000), 'directory_packet_size': ('int', 25000), 'revision_packet_size': ('int', 100000), 'release_packet_size': ('int', 100000), 'occurrence_packet_size': ('int', 100000), } logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)s %(levelname)s %(message)s') logger = logging.getLogger('test-bulk-loader-all') base_config = config.read(sys.argv[1], DEFAULT_CONFIG) def process_one_repository(base_config, repo_name): my_config = base_config.copy() basepath = my_config['repo_basepath'] my_path = os.path.join(basepath, repo_name[0], repo_name) if not os.path.exists(my_path): logger.error('Repository %s does not exist at %s' % (repo_name, my_path)) return witness_file = os.path.join(my_path, 'witness') if not os.path.exists(witness_file): logger.warn('No witness file for repository %s, using default value ' '%s' % (repo_name, my_config['validity'])) else: validity_timestamp = os.stat(witness_file).st_mtime my_config['validity'] = "%s+00" % datetime.datetime.utcfromtimestamp( validity_timestamp) logging.info('Processing repository %s fetched on %s' % ( repo_name, my_config['validity'])) if my_config['dry_run']: return loader = BulkLoader(my_config) + origin = loader.get_origin() + if origin['id']: + logger.info('Repository %s already loaded (origin id=%s), skipping' % ( + repo_name, origin['id'])) + return loader.process() def list_random_repos(config): db = psycopg2.connect(config['lister_db'], cursor_factory=psycopg2.extras.NamedTupleCursor) query = '''select full_name from repos_random_sample(0.001) r inner join crawl_history c on r.id = c.repo where status=true''' cur = db.cursor() cur.execute(query) ret = cur.fetchall() cur.close() db.close() return ret processed_repos = set() while True: logger.info('listing 0.001% random repos') random_repos = list_random_repos(base_config) logger.info('done') for repo in random_repos: repo_name = repo.full_name if repo_name not in processed_repos: try: process_one_repository(base_config, repo_name) except Exception: logger.exception('Failed processing repository %s' % repo_name) finally: processed_repos.add(repo_name)