diff --git a/swh/cloner/git/git.py b/swh/cloner/git/git.py index 9a575e2..dfa25e6 100755 --- a/swh/cloner/git/git.py +++ b/swh/cloner/git/git.py @@ -1,49 +1,50 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import logging from swh.cloner.git import file, proc def clone_or_nothing(repo_url, mount_repo_folder, repo_name, witness_filename): """Clone a git repo_name (user/repo) repository repo_url. Check if a witness file exists. If it exists, do nothing. Else, check the existence of the clone. If it exists, it's a partial clone so it's cleaned-up. Otherwise, clone the repository and create a witness file after that. This function returns a tuple, first the full path to the local clone and second, the git clone command output. """ full_repo_folder = os.path.join(mount_repo_folder, repo_name[0], repo_name) full_witness_filename = os.path.join(full_repo_folder, witness_filename) if os.path.isfile(full_witness_filename): logging.warn('Clone %s already present!' % repo_name) return full_repo_folder else: logging.info('Clone %s in %s' % (repo_url, full_repo_folder)) if os.path.isdir(full_repo_folder): # cleanup partial repository logging.info('Clean partial clone %s!' % repo_name) file.rm(full_repo_folder) stdout = basic_clone(repo_url, full_repo_folder) file.touch(full_witness_filename) return full_repo_folder, stdout + def basic_clone(repo_url, repo_dest_folder): """Clone a repository url in the destination folder `repo_dest_folder`. Returns the output of the git clone command. """ return proc.execute(['git', 'clone', '--bare', '--quiet', repo_url, repo_dest_folder]) def fsck(repo_folder): """Compute a git fsck on a git repository. Return the output of the git fsck command. """ return proc.execute(['git', '--git-dir', repo_folder, 'fsck', '--full']) diff --git a/swh/cloner/git/producer/clones.py b/swh/cloner/git/producer/clones.py index 24d828c..ec8b249 100644 --- a/swh/cloner/git/producer/clones.py +++ b/swh/cloner/git/producer/clones.py @@ -1,74 +1,77 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information + import logging from swh.cloner.git.storage import db, models from swh.cloner.git.worker import tasks + def just_print_repos(repository_scheme, repos): """Only print what should be posted in queue. Args: repository_scheme: not used repos: Generator of tuple repository id, repository name to print. Returns: None Raises: None """ print('This is a dry run, will only display the repositories to load...') for (repo_id, repo_name) in repos: print('load repo %s into queue' % repo_name) def post_to_task_queue(repository_scheme, repos): """Load repositories to queue. Args: Generator of repository to print. Returns: None Raises: None """ for (repo_id, repo_name) in repos: logging.info('load repo %s into queue' % repo_name) repo_url = repository_scheme % repo_name model_data = {'repo_id': repo_id, 'repo_url': repo_url, 'repo_name': repo_name} tasks.CloneRepository().delay(model_data) _run_fn = { - True : just_print_repos + True: just_print_repos } def produce(conf): """Produce a list of repositories to clone in the queue. Args: conf: a dictionary of setup - db_url: the setup string to access the db - - dry_run: optional flag setup by cli to avoid actually producing real messages + - dry_run: optional flag setup by cli to avoid actually producing + real messages - repository_scheme: the uri to use for cloning repositories """ db_url = conf['db_url'] dry_run = conf['dry_run'] repository_scheme = conf['repository_scheme'] run_fn = _run_fn.get(dry_run, post_to_task_queue) with db.connect(db_url) as db_conn: repos = models.load_repos(db_conn) run_fn(repository_scheme, repos) diff --git a/swh/cloner/git/storage/models.py b/swh/cloner/git/storage/models.py index 128df53..8f482ee 100644 --- a/swh/cloner/git/storage/models.py +++ b/swh/cloner/git/storage/models.py @@ -1,33 +1,35 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.cloner.git.storage import db def load_repos(db_conn): """List the repository. Limit the number of repository to load. """ - yield from db.query_fetch(db_conn, 'select id, full_name from missing_orig_repos order by id') + yield from db.query_fetch( + db_conn, + 'select id, full_name from missing_orig_repos order by id') def persist_task_result(db_conn, repo_id, task_id, task_start_date, task_duration, status, json_result, stdout, stderr): """Persist the task's result. """ return db.query_execute(db_conn, ("""INSERT INTO crawl_history (repo, task_id, date, duration, status, result, stdout, stderr) VALUES(%s, %s, %s, %s, %s, %s, %s, %s) """, (repo_id, task_id, task_start_date, task_duration, status, json_result, stdout, stderr))) diff --git a/swh/cloner/git/worker/celery.py b/swh/cloner/git/worker/celery.py index f41b09e..f54499f 100644 --- a/swh/cloner/git/worker/celery.py +++ b/swh/cloner/git/worker/celery.py @@ -1,70 +1,71 @@ # coding: utf-8 # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import Celery from . import conf app = Celery('swh.cloner.git', include=['swh.cloner.git.worker.tasks']) CONF = conf.read_conf() # source: http://celery.readthedocs.org/en/latest/configuration.html # Optional configuration, see the application user guide. app.conf.update( # Default broker URL. This must be an URL in the form of: # transport://userid:password@hostname:port/virtual_host - # Only the scheme part (transport://) is required, the rest is optional, and - # defaults to the specific transports default values. - # The transport part is the broker implementation to use, and the default is - # amqp, which uses librabbitmq by default or falls back to pyamqp if that is - # not installed. Also there are many other choices including redis, beanstalk, - # sqlalchemy, django, mongodb, couchdb. It can also be a fully qualified path - # to your own transport implementation. - BROKER_URL = CONF['queue_url'], # 'amqp://guest:guest@localhost:5672//', + # Only the scheme part (transport://) is required, the rest is optional, + # and defaults to the specific transports default values. + # The transport part is the broker implementation to use, and the default + # is amqp, which uses librabbitmq by default or falls back to pyamqp if + # that is not installed. Also there are many other choices including redis, + # beanstalk, sqlalchemy, django, mongodb, couchdb. It can also be a fully + # qualified path to your own transport implementation. + BROKER_URL=CONF['queue_url'], # 'amqp://guest:guest@localhost:5672//' # Time (in seconds, or a timedelta object) for when after stored task # tombstones will be deleted. CELERY_TASK_RESULT_EXPIRES=3600, # Late ack means the task messages will be acknowledged after the task has # been executed, not just before, which is the default behavior. CELERY_ACKS_LATE=True, # The backend used to store task results (tombstones). # Disabled by default. Can be one of the rpc, amqp, postgres, redit, cache, # mongodb, cassandra, etc... # CELERY_RESULT_BACKEND='rpc://', # CELERY_RESULT_BACKEND='db+postgresql://scott:tiger@localhost/mydatabase', # CELERY_RESULT_PERSISTENT=True, # A string identifying the default serialization method to use. # Can be pickle (default), json, yaml, msgpack or any custom serialization # methods that have been registered with kombu.serialization.registry CELERY_ACCEPT_CONTENT=['pickle', 'json'], # If True the task will report its status as “started” # when the task is executed by a worker. CELERY_TRACK_STARTED=True, # Default compression used for task messages. Can be gzip, bzip2 # (if available), or any custom compression schemes registered # in the Kombu compression registry. # CELERY_MESSAGE_COMPRESSION='bzip2', # Disable all rate limits, even if tasks has explicit rate limits set. # (Disabling rate limits altogether is recommended if you don’t have any # tasks using them.) CELERY_DISABLE_RATE_LIMITS=True, # Task hard time limit in seconds. The worker processing the task will be # killed and replaced with a new one when this is exceeded. # CELERYD_TASK_TIME_LIMIT=3600, # Task soft time limit in seconds. # The SoftTimeLimitExceeded exception will be raised when this is exceeded. - # The task can catch this to e.g. clean up before the hard time limit comes. + # The task can catch this to e.g. clean up before the hard time limit + # comes. CELERYD_TASK_SOFT_TIME_LIMIT=CONF['task_soft_time_limit'] ) if __name__ == '__main__': app.start() diff --git a/swh/cloner/git/worker/conf.py b/swh/cloner/git/worker/conf.py index ff1855a..023e676 100644 --- a/swh/cloner/git/worker/conf.py +++ b/swh/cloner/git/worker/conf.py @@ -1,44 +1,49 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information + import os from swh.core import config + # Default configuration file DEFAULT_CONF_FILE = '~/.config/swh/clones-worker.ini' # default configuration (can be overriden by the DEFAULT_CONF_FILE) DEFAULT_CONF = { # mount folder where to dump the clones 'mount': ('string', '/tmp/swh-git-cloner/'), # witness file built after the cloning. It's a witness of a success clone 'witness-file-name': ('string', 'witness'), # the ssh access key to the ssh-host - 'ssh-access-command': ('string', 'ssh -i ~/.ssh/inria/id-rsa-swh-rsync -l swh'), + 'ssh-access-command': ('string', + 'ssh -i ~/.ssh/inria/id-rsa-swh-rsync -l swh'), # ssh host 'ssh-host': ('string', 'tytso.inria.fr'), # destination folder on host 'ssh-host-destination-folder': ('string', ''), # url access to db - 'db_url': ('string', 'host= port= dbname= user= password='), + 'db_url': + ('string', + 'host= port= dbname= user= password='), # max file size limit allowed to git clone, in bytes 'clone_limit_size': ('int', '4294967296'), # 4 GB # the queue url to access for consuming tasks 'queue_url': ('string', 'amqp://guest:guest@localhost:5672//'), # soft time limit for a task, if exceeded, the worker cleans up # and stops 'task_soft_time_limit': ('int', '3600'), } def read_conf(conf_file=DEFAULT_CONF_FILE): """Read the user's configuration file. args contains the repo to parse. Transmit to the result. """ return config.read(os.path.expanduser(conf_file), DEFAULT_CONF)