diff --git a/swh/cloner/git/worker/tasks.py b/swh/cloner/git/worker/tasks.py index 56be1b0..30a5466 100644 --- a/swh/cloner/git/worker/tasks.py +++ b/swh/cloner/git/worker/tasks.py @@ -1,210 +1,210 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import resource import os import json from datetime import datetime from celery.utils import log from celery.exceptions import SoftTimeLimitExceeded from .celery import conf from swh.cloner.git import git, file from swh.cloner.git.storage import db, models from swh.core import scheduling logger = log.get_task_logger(__name__) MAIN_CONF = conf.read_conf() FSIZE_LIMIT = MAIN_CONF['clone_limit_size'] # FIXME: for json's limitation which do not know how to serialize timedelta # object (cf. json encoder) def execute_with_measure(key_measure, fn, data, timedelta=False): """Execute fn with `data` as parameter and measure its `wall clock time`. - Result from such computation is then returned with this time measured indexed at key `key_measure` - timedelta determined the format of such time, if false (the default) stored in seconds, otherwise in python's timedelta. """ try: start, data, end = datetime.utcnow(), fn(data), datetime.utcnow() diff_time = end - start data.update({key_measure: diff_time.total_seconds() if timedelta is False else diff_time}) return data except SoftTimeLimitExceeded: logger.error('Soft time limit exceeded. Clean up.') if 'clone_repo' in data: cleanup(data) raise SoftTimeLimitExceeded # raise it again for the error callback def filter_keys(data_keys, result): """Filter no longer needed data_keys in result (if they are present). """ for data_key in data_keys: if data_key in result: result.pop(data_key) return result def report_task_summary(task_id, status, result): """Persist the task's result. """ # retrieve the datetime and remove it from data to persist in `result` json task_start_date = result.pop('task_start_date') # task_duration could be non-existent due to runtime failure task_duration = \ result.pop('task_duration') if 'task_duration' in result else None result_stdout = result.pop('stdout', '') if 'stdout' in result else '' result_stderr = result.pop('stderr', '') if 'stderr' in result else '' result = filter_keys(['source_folder', 'clone_repo', 'repo_name', 'repo_url'], result) repo_id = result.pop('repo_id') json_result = json.dumps(result) - return repo_id, task_id, task_start_date, task_duration, status, \ - json_result, result_stdout, result_stderr + return (repo_id, task_id, task_start_date, task_duration, status, + json_result, result_stdout, result_stderr) def persist(task_id, status, result): """Persist the task_id's result with status. """ # FIXME Deal with potential db problem... with db.connect(MAIN_CONF['db_url']) as db_conn: res = report_task_summary(task_id, status=status, result=result) models.persist_task_result(db_conn, *res) def on_success_cb(task, retval, task_id, args, kwargs): """Handler function called when a task is in 'SUCCESS' status! """ persist(task_id, status=True, result=retval) def on_failure_cb(task, exc, task_id, args, kwargs, einfo): """Handler function when a task is done! """ result, = args # potentially, errors occurred before the cleanup if 'clone_repo' in result: cleanup(result) result.update({'error_message': str(exc), 'stderr': str(einfo)}) persist(task_id, status=False, result=result) def orchestrate_clone_with_measure(model_data): """Orchestrate the cloning of a repository. """ model_data.update({'task_start_date': datetime.utcnow()}) return execute_with_measure('task_duration', orchestrate_clone, model_data, timedelta=True) def orchestrate_clone(model_data): """Clone, fsck, du, rsync, cleanup a repository. """ model_data = execute_with_measure('time_clone', clone, model_data) if 'clone_repo' not in model_data: logger.info(model_data) return model_data model_data = size_as_bytes(model_data) logger.info(model_data) model_data = execute_with_measure('time_rsync', rsync, model_data) if 'time_rsync' not in model_data: logger.info(model_data) return model_data model_data = execute_with_measure('time_cleanup', cleanup, model_data) return model_data def clone(model_data): """Clone the repository. Error can be raised if the repository exceeds accepted limits. """ resource.setrlimit(resource.RLIMIT_FSIZE, (FSIZE_LIMIT, FSIZE_LIMIT)) source_folder = MAIN_CONF['mount'] clone_repo, stdout = git.clone_or_nothing(model_data['repo_url'], source_folder, model_data['repo_name'], MAIN_CONF['witness-file-name']) model_data.update({'source_folder': source_folder, 'clone_repo': clone_repo, 'stdout': stdout}) return model_data def fsck(model_data): """Execute a git fsck on a repository folder. """ stdout = git.fsck(model_data['clone_repo']) model_data['stdout'] = model_data.get('stdout', '') + stdout return model_data def rsync(model_data): """Execute rsync to the host from the current machine the source_folder/repo_name. repo_name is expected to be of form / """ prepared_source_folder = os.path.join(model_data['source_folder'], './', model_data['repo_name'][0], model_data['repo_name']) stdout = file.rsync(MAIN_CONF['ssh-host'], MAIN_CONF['ssh-access-command'], prepared_source_folder, MAIN_CONF.get('ssh-host-destination-folder', '')) model_data['stdout'] = model_data.get('stdout', '') + stdout return model_data def cleanup(model_data): """Remove the source_folder directory. """ file.rm(model_data['clone_repo']) return model_data def size_as_bytes(model_data): """Count the repository's size on disk. """ clone_repo = model_data['clone_repo'] model_data.update( {'clone_repo_size': file.count_size_in_bytes(clone_repo)}) return model_data class CloneRepository(scheduling.Task): def run(self, model_data): return orchestrate_clone_with_measure(model_data) def on_success(self, retval, task_id, args, kwargs): return on_success_cb(self, retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): return on_failure_cb(self, exc, task_id, args, kwargs, einfo)