diff --git a/swh/cloner/git/worker/conf.py b/swh/cloner/git/worker/conf.py index a4ef1af..ff1855a 100644 --- a/swh/cloner/git/worker/conf.py +++ b/swh/cloner/git/worker/conf.py @@ -1,52 +1,44 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import configparser import os +from swh.core import config # Default configuration file DEFAULT_CONF_FILE = '~/.config/swh/clones-worker.ini' # default configuration (can be overriden by the DEFAULT_CONF_FILE) DEFAULT_CONF = { # mount folder where to dump the clones - 'mount': '/tmp/swh-git-cloner/', + 'mount': ('string', '/tmp/swh-git-cloner/'), # witness file built after the cloning. It's a witness of a success clone - 'witness-file-name': 'witness', + 'witness-file-name': ('string', 'witness'), # the ssh access key to the ssh-host - 'ssh-access-command': 'ssh -i ~/.ssh/inria/id-rsa-swh-rsync -l swh', + 'ssh-access-command': ('string', 'ssh -i ~/.ssh/inria/id-rsa-swh-rsync -l swh'), # ssh host - 'ssh-host': 'tytso.inria.fr', + 'ssh-host': ('string', 'tytso.inria.fr'), # destination folder on host - 'ssh-host-destination-folder': '', + 'ssh-host-destination-folder': ('string', ''), # url access to db - 'db_url': 'host= port= dbname= user= password=', + 'db_url': ('string', 'host= port= dbname= user= password='), # max file size limit allowed to git clone, in bytes - 'clone_limit_size': '4294967296', # 4 GB + 'clone_limit_size': ('int', '4294967296'), # 4 GB # the queue url to access for consuming tasks - 'queue_url': 'amqp://guest:guest@localhost:5672//', + 'queue_url': ('string', 'amqp://guest:guest@localhost:5672//'), # soft time limit for a task, if exceeded, the worker cleans up # and stops - 'task_soft_time_limit': '3600', + 'task_soft_time_limit': ('int', '3600'), } def read_conf(conf_file=DEFAULT_CONF_FILE): """Read the user's configuration file. args contains the repo to parse. Transmit to the result. """ - config = configparser.ConfigParser(defaults=DEFAULT_CONF) - config.read(os.path.expanduser(conf_file)) - conf = config._sections['main'] - - # ensure the default keys are set if some are missing - for key in DEFAULT_CONF: - conf[key] = conf.get(key, DEFAULT_CONF[key]) - - return conf + return config.read(os.path.expanduser(conf_file), DEFAULT_CONF) diff --git a/swh/cloner/git/worker/tasks.py b/swh/cloner/git/worker/tasks.py index 0b3061b..0dbb186 100644 --- a/swh/cloner/git/worker/tasks.py +++ b/swh/cloner/git/worker/tasks.py @@ -1,198 +1,198 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import resource import os import json from datetime import datetime from celery.utils import log from celery.exceptions import SoftTimeLimitExceeded from .celery import app, conf from swh.cloner.git import git, file from swh.cloner.git.storage import db, models logger = log.get_task_logger(__name__) MAIN_CONF = conf.read_conf() -FSIZE_LIMIT = int(MAIN_CONF['clone_limit_size']) +FSIZE_LIMIT = MAIN_CONF['clone_limit_size'] # FIXME: for json's limitation which do not know how to serialize timedelta # object (cf. json encoder) def execute_with_measure(key_measure, fn, data, timedelta=False): """Execute fn with `data` as parameter and measure its `wall clock time`. - Result from such computation is then returned with this time measured indexed at key `key_measure` - timedelta determined the format of such time, if false (the default) stored in seconds, otherwise in python's timedelta. """ try: start, data, end = datetime.utcnow(), fn(data), datetime.utcnow() diff_time = end - start data.update({key_measure: diff_time.total_seconds() if timedelta is False else diff_time}) return data except SoftTimeLimitExceeded: logger.error('Soft time limit exceeded. Clean up.') if 'clone_repo' in data: cleanup(data) raise SoftTimeLimitExceeded # raise it again for the error callback def filter_keys(data_keys, result): """Filter no longer needed data_keys in result (if they are present). """ for data_key in data_keys: if data_key in result: result.pop(data_key) return result def report_task_summary(task_id, status, result): """Persist the task's result. """ # retrieve the datetime and remove it from data to persist in `result` json task_start_date = result.pop('task_start_date') # task_duration could be non-existent due to runtime failure task_duration = \ result.pop('task_duration') if 'task_duration' in result else None result_stdout = result.pop('stdout', '') if 'stdout' in result else '' result_stderr = result.pop('stderr', '') if 'stderr' in result else '' result = filter_keys(['source_folder', 'clone_repo', 'repo_name', 'repo_url'], result) repo_id = result.pop('repo_id') json_result = json.dumps(result) return repo_id, task_id, task_start_date, task_duration, status, \ json_result, result_stdout, result_stderr def persist(task_id, status, result): """Persist the task_id's result with status. """ # FIXME Deal with potential db problem... with db.connect(MAIN_CONF['db_url']) as db_conn: res = report_task_summary(task_id, status=status, result=result) models.persist_task_result(db_conn, *res) def on_success_cb(task, retval, task_id, args, kwargs): """Handler function called when a task is in 'SUCCESS' status! """ persist(task_id, status=True, result=retval) def on_failure_cb(task, exc, task_id, args, kwargs, einfo): """Handler function when a task is done! """ result, = args # potentially, errors occurred before the cleanup if 'clone_repo' in result: cleanup(result) result.update({'error_message': str(exc), 'stderr': str(einfo)}) persist(task_id, status=False, result=result) @app.task(on_success=on_success_cb, on_failure=on_failure_cb) def orchestrate_clone_with_measure(model_data): """Orchestrate the cloning of a repository. """ model_data.update({'task_start_date': datetime.utcnow()}) return execute_with_measure('task_duration', orchestrate_clone, model_data, timedelta=True) def orchestrate_clone(model_data): """Clone, fsck, du, rsync, cleanup a repository. """ model_data = execute_with_measure('time_clone', clone, model_data) if 'clone_repo' not in model_data: logger.info(model_data) return model_data model_data = size_as_bytes(model_data) logger.info(model_data) model_data = execute_with_measure('time_rsync', rsync, model_data) if 'time_rsync' not in model_data: logger.info(model_data) return model_data model_data = execute_with_measure('time_cleanup', cleanup, model_data) return model_data def clone(model_data): """Clone the repository. Error can be raised if the repository exceeds accepted limits. """ resource.setrlimit(resource.RLIMIT_FSIZE, (FSIZE_LIMIT, FSIZE_LIMIT)) source_folder = MAIN_CONF['mount'] clone_repo, stdout = git.clone_or_nothing(model_data['repo_url'], source_folder, model_data['repo_name'], MAIN_CONF['witness-file-name']) model_data.update({'source_folder': source_folder, 'clone_repo': clone_repo, 'stdout': stdout}) return model_data def fsck(model_data): """Execute a git fsck on a repository folder. """ stdout = git.fsck(model_data['clone_repo']) model_data['stdout'] = model_data.get('stdout', '') + stdout return model_data def rsync(model_data): """Execute rsync to the host from the current machine the source_folder/repo_name. repo_name is expected to be of form / """ prepared_source_folder = os.path.join(model_data['source_folder'], './', model_data['repo_name'][0], model_data['repo_name']) stdout = file.rsync(MAIN_CONF['ssh-host'], MAIN_CONF['ssh-access-command'], prepared_source_folder, MAIN_CONF.get('ssh-host-destination-folder', '')) model_data['stdout'] = model_data.get('stdout', '') + stdout return model_data def cleanup(model_data): """Remove the source_folder directory. """ file.rm(model_data['clone_repo']) return model_data def size_as_bytes(model_data): """Count the repository's size on disk. """ clone_repo = model_data['clone_repo'] model_data.update( {'clone_repo_size': file.count_size_in_bytes(clone_repo)}) return model_data