diff --git a/swh/cloner/git/producer/clones.py b/swh/cloner/git/producer/clones.py index 245726d..24d828c 100644 --- a/swh/cloner/git/producer/clones.py +++ b/swh/cloner/git/producer/clones.py @@ -1,75 +1,74 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from swh.cloner.git.storage import db, models from swh.cloner.git.worker import tasks - def just_print_repos(repository_scheme, repos): """Only print what should be posted in queue. Args: repository_scheme: not used repos: Generator of tuple repository id, repository name to print. Returns: None Raises: None """ print('This is a dry run, will only display the repositories to load...') for (repo_id, repo_name) in repos: print('load repo %s into queue' % repo_name) def post_to_task_queue(repository_scheme, repos): """Load repositories to queue. Args: Generator of repository to print. Returns: None Raises: None """ for (repo_id, repo_name) in repos: logging.info('load repo %s into queue' % repo_name) repo_url = repository_scheme % repo_name model_data = {'repo_id': repo_id, 'repo_url': repo_url, 'repo_name': repo_name} - tasks.orchestrate_clone_with_measure.delay(model_data) + tasks.CloneRepository().delay(model_data) _run_fn = { True : just_print_repos } def produce(conf): """Produce a list of repositories to clone in the queue. Args: conf: a dictionary of setup - db_url: the setup string to access the db - dry_run: optional flag setup by cli to avoid actually producing real messages - repository_scheme: the uri to use for cloning repositories """ db_url = conf['db_url'] dry_run = conf['dry_run'] repository_scheme = conf['repository_scheme'] run_fn = _run_fn.get(dry_run, post_to_task_queue) with db.connect(db_url) as db_conn: repos = models.load_repos(db_conn) run_fn(repository_scheme, repos) diff --git a/swh/cloner/git/worker/tasks.py b/swh/cloner/git/worker/tasks.py index 0dbb186..56be1b0 100644 --- a/swh/cloner/git/worker/tasks.py +++ b/swh/cloner/git/worker/tasks.py @@ -1,198 +1,210 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import resource import os import json from datetime import datetime from celery.utils import log from celery.exceptions import SoftTimeLimitExceeded -from .celery import app, conf +from .celery import conf from swh.cloner.git import git, file from swh.cloner.git.storage import db, models +from swh.core import scheduling + logger = log.get_task_logger(__name__) MAIN_CONF = conf.read_conf() FSIZE_LIMIT = MAIN_CONF['clone_limit_size'] # FIXME: for json's limitation which do not know how to serialize timedelta # object (cf. json encoder) def execute_with_measure(key_measure, fn, data, timedelta=False): """Execute fn with `data` as parameter and measure its `wall clock time`. - Result from such computation is then returned with this time measured indexed at key `key_measure` - timedelta determined the format of such time, if false (the default) stored in seconds, otherwise in python's timedelta. """ try: start, data, end = datetime.utcnow(), fn(data), datetime.utcnow() diff_time = end - start data.update({key_measure: diff_time.total_seconds() if timedelta is False else diff_time}) return data except SoftTimeLimitExceeded: logger.error('Soft time limit exceeded. Clean up.') if 'clone_repo' in data: cleanup(data) raise SoftTimeLimitExceeded # raise it again for the error callback def filter_keys(data_keys, result): """Filter no longer needed data_keys in result (if they are present). """ for data_key in data_keys: if data_key in result: result.pop(data_key) return result def report_task_summary(task_id, status, result): """Persist the task's result. """ # retrieve the datetime and remove it from data to persist in `result` json task_start_date = result.pop('task_start_date') # task_duration could be non-existent due to runtime failure task_duration = \ result.pop('task_duration') if 'task_duration' in result else None result_stdout = result.pop('stdout', '') if 'stdout' in result else '' result_stderr = result.pop('stderr', '') if 'stderr' in result else '' result = filter_keys(['source_folder', 'clone_repo', 'repo_name', 'repo_url'], result) repo_id = result.pop('repo_id') json_result = json.dumps(result) return repo_id, task_id, task_start_date, task_duration, status, \ json_result, result_stdout, result_stderr def persist(task_id, status, result): """Persist the task_id's result with status. """ # FIXME Deal with potential db problem... with db.connect(MAIN_CONF['db_url']) as db_conn: res = report_task_summary(task_id, status=status, result=result) models.persist_task_result(db_conn, *res) def on_success_cb(task, retval, task_id, args, kwargs): """Handler function called when a task is in 'SUCCESS' status! """ persist(task_id, status=True, result=retval) def on_failure_cb(task, exc, task_id, args, kwargs, einfo): """Handler function when a task is done! """ result, = args # potentially, errors occurred before the cleanup if 'clone_repo' in result: cleanup(result) result.update({'error_message': str(exc), 'stderr': str(einfo)}) persist(task_id, status=False, result=result) -@app.task(on_success=on_success_cb, on_failure=on_failure_cb) def orchestrate_clone_with_measure(model_data): """Orchestrate the cloning of a repository. """ model_data.update({'task_start_date': datetime.utcnow()}) return execute_with_measure('task_duration', orchestrate_clone, model_data, timedelta=True) def orchestrate_clone(model_data): """Clone, fsck, du, rsync, cleanup a repository. """ model_data = execute_with_measure('time_clone', clone, model_data) if 'clone_repo' not in model_data: logger.info(model_data) return model_data model_data = size_as_bytes(model_data) logger.info(model_data) model_data = execute_with_measure('time_rsync', rsync, model_data) if 'time_rsync' not in model_data: logger.info(model_data) return model_data model_data = execute_with_measure('time_cleanup', cleanup, model_data) return model_data def clone(model_data): """Clone the repository. Error can be raised if the repository exceeds accepted limits. """ resource.setrlimit(resource.RLIMIT_FSIZE, (FSIZE_LIMIT, FSIZE_LIMIT)) source_folder = MAIN_CONF['mount'] clone_repo, stdout = git.clone_or_nothing(model_data['repo_url'], source_folder, model_data['repo_name'], MAIN_CONF['witness-file-name']) model_data.update({'source_folder': source_folder, 'clone_repo': clone_repo, 'stdout': stdout}) return model_data def fsck(model_data): """Execute a git fsck on a repository folder. """ stdout = git.fsck(model_data['clone_repo']) model_data['stdout'] = model_data.get('stdout', '') + stdout return model_data def rsync(model_data): """Execute rsync to the host from the current machine the source_folder/repo_name. repo_name is expected to be of form / """ prepared_source_folder = os.path.join(model_data['source_folder'], './', model_data['repo_name'][0], model_data['repo_name']) stdout = file.rsync(MAIN_CONF['ssh-host'], MAIN_CONF['ssh-access-command'], prepared_source_folder, MAIN_CONF.get('ssh-host-destination-folder', '')) model_data['stdout'] = model_data.get('stdout', '') + stdout return model_data def cleanup(model_data): """Remove the source_folder directory. """ file.rm(model_data['clone_repo']) return model_data def size_as_bytes(model_data): """Count the repository's size on disk. """ clone_repo = model_data['clone_repo'] model_data.update( {'clone_repo_size': file.count_size_in_bytes(clone_repo)}) return model_data + + +class CloneRepository(scheduling.Task): + def run(self, model_data): + return orchestrate_clone_with_measure(model_data) + + def on_success(self, retval, task_id, args, kwargs): + return on_success_cb(self, retval, task_id, args, kwargs) + + def on_failure(self, exc, task_id, args, kwargs, einfo): + return on_failure_cb(self, exc, task_id, args, kwargs, einfo)