diff --git a/swh/deposit/injection/scheduler.py b/swh/deposit/injection/scheduler.py index 4737f37e..4e03d207 100644 --- a/swh/deposit/injection/scheduler.py +++ b/swh/deposit/injection/scheduler.py @@ -1,212 +1,212 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of sending deposit loading/checking as either celery task or scheduled one-shot tasks. """ import click import logging from abc import ABCMeta, abstractmethod from celery import group from swh.core import utils from swh.core.config import SWHConfig from swh.deposit.config import setup_django_for, DEPOSIT_STATUS_READY from swh.deposit.config import DEPOSIT_STATUS_READY_FOR_CHECKS from swh.scheduler.utils import get_task, create_oneshot_task_dict class SWHScheduling(SWHConfig, metaclass=ABCMeta): """Base swh scheduling class to aggregate the schedule deposit injection. """ CONFIG_BASE_FILENAME = 'deposit/server' DEFAULT_CONFIG = { 'dry_run': ('bool', False), } ADDITIONAL_CONFIG = {} def __init__(self): super().__init__() self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.log = logging.getLogger('swh.deposit.scheduling') @abstractmethod def schedule(self, deposits): """Schedule the new deposit injection. Args: data (dict): Deposit aggregated data Returns: None """ pass class SWHCeleryScheduling(SWHScheduling): """Deposit injection as Celery task scheduling. """ def __init__(self, config=None): super().__init__() if config: self.config.update(**config) self.dry_run = self.config['dry_run'] self.check = self.config['check'] if self.check: - task_name = 'swh.deposit.injection.tasks.DepositChecksTsk' + task_name = 'swh.deposit.injection.tasks.ChecksDepositTsk' else: task_name = 'swh.deposit.injection.tasks.LoadDepositArchiveTsk' self.task = get_task(task_name) def _convert(self, deposits): """Convert tuple to celery task signature. """ task = self.task for archive_url, meta_url, update_url, check_url in deposits: if self.check: yield task.s(deposit_check_url=check_url) else: yield task.s(archive_url=archive_url, deposit_meta_url=meta_url, deposit_update_url=update_url) def schedule(self, deposits): """Schedule the new deposit injection directly through celery. Args: depositdata (dict): Deposit aggregated information. Returns: None """ if self.dry_run: return return group(self._convert(deposits)).delay() class SWHSchedulerScheduling(SWHScheduling): """Deposit injection through SWH's task scheduling interface. """ ADDITIONAL_CONFIG = {} def __init__(self, config=None): super().__init__() from swh.scheduler.backend import SchedulerBackend if config: self.config.update(**config) self.dry_run = self.config['dry_run'] self.scheduler = SchedulerBackend(**self.config) self.check = self.config['check'] def _convert(self, deposits): """Convert tuple to one-shot scheduling tasks. """ for archive_url, meta_url, update_url, check_url in deposits: if self.check: task = create_oneshot_task_dict( 'swh-deposit-archive-checks', deposit_check_url=check_url) else: task = create_oneshot_task_dict( 'swh-deposit-archive-ingestion', archive_url=archive_url, deposit_meta_url=meta_url, deposit_update_url=update_url) yield task def schedule(self, deposits): """Schedule the new deposit injection through swh.scheduler's api. Args: deposits (dict): Deposit aggregated information. """ if self.dry_run: return self.scheduler.create_tasks(self._convert(deposits)) def get_deposit_by(status): """Filter deposit given a specific status. """ from swh.deposit.models import Deposit yield from Deposit.objects.filter(status=status) def prepare_task_arguments(check): """Convert deposit to argument for task to be executed. """ from swh.deposit.config import PRIVATE_GET_RAW_CONTENT from swh.deposit.config import PRIVATE_GET_DEPOSIT_METADATA from swh.deposit.config import PRIVATE_PUT_DEPOSIT from swh.deposit.config import PRIVATE_CHECK_DEPOSIT from django.core.urlresolvers import reverse if check: status = DEPOSIT_STATUS_READY_FOR_CHECKS else: status = DEPOSIT_STATUS_READY for deposit in get_deposit_by(status): args = [deposit.collection.name, deposit.id] archive_url = reverse(PRIVATE_GET_RAW_CONTENT, args=args) meta_url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=args) update_url = reverse(PRIVATE_PUT_DEPOSIT, args=args) check_url = reverse(PRIVATE_CHECK_DEPOSIT, args=args) yield archive_url, meta_url, update_url, check_url @click.command( help='Schedule one-shot deposit injections') @click.option('--platform', default='development', help='development or production platform') @click.option('--scheduling-method', default='celery', help='Scheduling method') @click.option('--batch-size', default=1000, type=click.INT, help='Task batch size') @click.option('--dry-run/--no-dry-run', is_flag=True, default=False, help='Dry run') @click.option('--check', is_flag=True, default=False) def main(platform, scheduling_method, batch_size, dry_run, check): setup_django_for(platform) override_config = {} if dry_run: override_config['dry_run'] = dry_run override_config['check'] = check if scheduling_method == 'celery': scheduling = SWHCeleryScheduling(override_config) elif scheduling_method == 'swh-scheduler': scheduling = SWHSchedulerScheduling(override_config) else: raise ValueError( 'Only `celery` or `swh-scheduler` values are accepted') for deposits in utils.grouper(prepare_task_arguments(check), batch_size): scheduling.schedule(deposits) if __name__ == '__main__': main() diff --git a/swh/deposit/injection/tasks.py b/swh/deposit/injection/tasks.py index 9eaaea5c..ce5d800a 100644 --- a/swh/deposit/injection/tasks.py +++ b/swh/deposit/injection/tasks.py @@ -1,51 +1,51 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task from swh.deposit.injection.loader import DepositLoader from swh.deposit.injection.checker import DepositChecker class LoadDepositArchiveTsk(Task): """Deposit archive ingestion task described by the following steps: 1. Retrieve tarball from deposit's private api and store locally in a temporary directory 2. Trigger the ingestion 3. clean up the temporary directory 4. Update the deposit's status according to result using the deposit's private update status api """ task_queue = 'swh_loader_deposit' def run_task(self, *, archive_url, deposit_meta_url, deposit_update_url): """Import a deposit tarball into swh. Args: see :func:`DepositLoader.load`. """ loader = DepositLoader() loader.log = self.log loader.load(archive_url=archive_url, deposit_meta_url=deposit_meta_url, deposit_update_url=deposit_update_url) -class DepositChecksTsk(Task): +class ChecksDepositTsk(Task): """Deposit checks task. """ task_queue = 'swh_checker_deposit' def run_task(self, deposit_check_url): """Check a deposit's status Args: see :func:`DepositChecker.check`. """ checker = DepositChecker() checker.log = self.log checker.check(deposit_check_url)