diff --git a/swh/deposit/injection/scheduler.py b/swh/deposit/injection/scheduler.py index c6de590a..5c2fbaa4 100644 --- a/swh/deposit/injection/scheduler.py +++ b/swh/deposit/injection/scheduler.py @@ -1,203 +1,228 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -"""Module in charge of sending deposit injection as celery task or -scheduled one-shot tasks. +"""Module in charge of sending deposit loading/checking as either +celery task or scheduled one-shot tasks. """ import click import logging from abc import ABCMeta, abstractmethod from celery import group from swh.core import utils from swh.core.config import SWHConfig from swh.deposit.config import setup_django_for, DEPOSIT_STATUS_READY +from swh.deposit.config import DEPOSIT_STATUS_READY_FOR_CHECKS class SWHScheduling(SWHConfig, metaclass=ABCMeta): """Base swh scheduling class to aggregate the schedule deposit injection. """ CONFIG_BASE_FILENAME = 'deposit/server' DEFAULT_CONFIG = { 'dry_run': ('bool', False), } + ADDITIONAL_CONFIG = {} + def __init__(self): super().__init__() self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.log = logging.getLogger('swh.deposit.scheduling') @abstractmethod def schedule(self, deposits): """Schedule the new deposit injection. Args: data (dict): Deposit aggregated data Returns: None """ pass class SWHCeleryScheduling(SWHScheduling): """Deposit injection as Celery task scheduling. """ - ADDITIONAL_CONFIG = { - 'task_name': ( - 'str', 'swh.deposit.injection.tasks.LoadDepositArchiveTsk'), - } - def __init__(self, config=None): super().__init__() - from swh.scheduler import utils - self.task_name = self.config['task_name'] - self.task = utils.get_task(self.task_name) if config: self.config.update(**config) self.dry_run = self.config['dry_run'] + self.check = self.config['check'] + if self.check: + task_name = 'swh.deposit.injection.tasks.DepositChecksTsk' + else: + task_name = 'swh.deposit.injection.tasks.LoadDepositArchiveTsk' + from swh.scheduler import utils + self.task = utils.get_task(task_name) def _convert(self, deposits): """Convert tuple to celery task signature. """ task = self.task - for archive_url, deposit_meta_url, deposit_update_url in deposits: - yield task.s(archive_url=archive_url, - deposit_meta_url=deposit_meta_url, - deposit_update_url=deposit_update_url) + for archive_url, meta_url, update_url, check_url in deposits: + if self.check: + yield task.s(deposit_check_url=check_url) + else: + yield task.s(archive_url=archive_url, + deposit_meta_url=meta_url, + deposit_update_url=update_url) def schedule(self, deposits): """Schedule the new deposit injection directly through celery. Args: depositdata (dict): Deposit aggregated information. Returns: None """ if self.dry_run: return return group(self._convert(deposits)).delay() -class SWHScheduling(SWHScheduling): +class SWHSchedulerScheduling(SWHScheduling): """Deposit injection through SWH's task scheduling interface. """ ADDITIONAL_CONFIG = {} def __init__(self, config=None): super().__init__() from swh.scheduler.backend import SchedulerBackend if config: self.config.update(**config) self.dry_run = self.config['dry_run'] self.scheduler = SchedulerBackend(**self.config) + self.check = self.config['check'] def _convert(self, deposits): """Convert tuple to one-shot scheduling tasks. """ import datetime - for archive_url, deposit_meta_url, deposit_update_url in deposits: + for archive_url, meta_url, update_url, check_url in deposits: + if self.check: + type = 'swh-deposit-archive-checks' + kwargs = { + 'deposit_check_url': check_url + } + else: + type = 'swh-deposit-archive-ingestion' + kwargs = { + 'archive_url': archive_url, + 'deposit_meta_url': meta_url, + 'deposit_update_url': update_url, + } + yield { 'policy': 'oneshot', - 'type': 'swh-deposit-archive-ingestion', + 'type': type, 'next_run': datetime.datetime.now(tz=datetime.timezone.utc), 'arguments': { 'args': [], - 'kwargs': { - 'archive_url': archive_url, - 'deposit_meta_url': deposit_meta_url, - 'deposit_update_url': deposit_update_url, - }, + 'kwargs': kwargs, } } def schedule(self, deposits): """Schedule the new deposit injection through swh.scheduler's api. Args: deposits (dict): Deposit aggregated information. """ if self.dry_run: return self.scheduler.create_tasks(self._convert(deposits)) -def get_deposit_ready(): - """Retrieve deposit ready to be task executed. +def get_deposit_by(status): + """Filter deposit given a specific status. """ from swh.deposit.models import Deposit - yield from Deposit.objects.filter(status=DEPOSIT_STATUS_READY) + yield from Deposit.objects.filter(status=status) -def prepare_task_arguments(): +def prepare_task_arguments(check): """Convert deposit to argument for task to be executed. """ from swh.deposit.config import PRIVATE_GET_RAW_CONTENT from swh.deposit.config import PRIVATE_GET_DEPOSIT_METADATA from swh.deposit.config import PRIVATE_PUT_DEPOSIT + from swh.deposit.config import PRIVATE_CHECK_DEPOSIT from django.core.urlresolvers import reverse - for deposit in get_deposit_ready(): + if check: + status = DEPOSIT_STATUS_READY_FOR_CHECKS + else: + status = DEPOSIT_STATUS_READY + + for deposit in get_deposit_by(status): args = [deposit.collection.name, deposit.id] archive_url = reverse( PRIVATE_GET_RAW_CONTENT, args=args) - deposit_meta_url = reverse( + meta_url = reverse( PRIVATE_GET_DEPOSIT_METADATA, args=args) - deposit_update_url = reverse( + update_url = reverse( PRIVATE_PUT_DEPOSIT, args=args) + check_url = reverse( + PRIVATE_CHECK_DEPOSIT, args=args) - yield archive_url, deposit_meta_url, deposit_update_url + yield archive_url, meta_url, update_url, check_url @click.command( help='Schedule one-shot deposit injections') @click.option('--platform', default='development', help='development or production platform') @click.option('--scheduling-method', default='celery', help='Scheduling method') @click.option('--batch-size', default=1000, type=click.INT, help='Task batch size') @click.option('--dry-run/--no-dry-run', is_flag=True, default=False, help='Dry run') -def main(platform, scheduling_method, batch_size, dry_run): +@click.option('--check', is_flag=True, default=False) +def main(platform, scheduling_method, batch_size, dry_run, check): setup_django_for(platform) override_config = {} if dry_run: override_config['dry_run'] = dry_run + override_config['check'] = check if scheduling_method == 'celery': scheduling = SWHCeleryScheduling(override_config) elif scheduling_method == 'swh-scheduler': - scheduling = SWHScheduling(override_config) + scheduling = SWHSchedulerScheduling(override_config) else: raise ValueError( 'Only `celery` or `swh-scheduler` values are accepted') - for deposits in utils.grouper(prepare_task_arguments(), batch_size): + for deposits in utils.grouper(prepare_task_arguments(check), batch_size): scheduling.schedule(deposits) if __name__ == '__main__': main() diff --git a/swh/deposit/injection/tasks.py b/swh/deposit/injection/tasks.py index 8d9dced5..e588fb35 100644 --- a/swh/deposit/injection/tasks.py +++ b/swh/deposit/injection/tasks.py @@ -1,51 +1,51 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task from swh.deposit.injection.loader import DepositLoader from swh.deposit.injection.checker import DepositChecker class LoadDepositArchiveTsk(Task): """Deposit archive ingestion task described by the following steps: 1. Retrieve tarball from deposit's private api and store locally in a temporary directory 2. Trigger the ingestion 3. clean up the temporary directory 4. Update the deposit's status according to result using the deposit's private update status api """ - task_queue = 'swh_deposit_archive' + task_queue = 'swh_deposit_archives' def run_task(self, *, archive_url, deposit_meta_url, deposit_update_url): """Import a deposit tarball into swh. Args: see :func:`DepositLoader.load`. """ loader = DepositLoader() loader.log = self.log loader.load(archive_url=archive_url, deposit_meta_url=deposit_meta_url, deposit_update_url=deposit_update_url) class DepositChecksTsk(Task): """Deposit checks task. """ task_queue = 'swh_deposit_checks' - def run_task(self, *, deposit_check_url, deposit_update_url): + def run_task(self, deposit_check_url): """Check a deposit's status Args: see :func:`DepositChecker.check`. """ checker = DepositChecker() checker.log = self.log - checker.check(deposit_check_url=deposit_check_url) + checker.check(deposit_check_url)