diff --git a/swh/deposit/injection/scheduler.py b/swh/deposit/injection/scheduler.py index 4e03d207..41281020 100644 --- a/swh/deposit/injection/scheduler.py +++ b/swh/deposit/injection/scheduler.py @@ -1,212 +1,212 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of sending deposit loading/checking as either celery task or scheduled one-shot tasks. """ import click import logging from abc import ABCMeta, abstractmethod from celery import group from swh.core import utils from swh.core.config import SWHConfig from swh.deposit.config import setup_django_for, DEPOSIT_STATUS_READY from swh.deposit.config import DEPOSIT_STATUS_READY_FOR_CHECKS from swh.scheduler.utils import get_task, create_oneshot_task_dict class SWHScheduling(SWHConfig, metaclass=ABCMeta): """Base swh scheduling class to aggregate the schedule deposit injection. """ CONFIG_BASE_FILENAME = 'deposit/server' DEFAULT_CONFIG = { 'dry_run': ('bool', False), } ADDITIONAL_CONFIG = {} def __init__(self): super().__init__() self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.log = logging.getLogger('swh.deposit.scheduling') @abstractmethod def schedule(self, deposits): """Schedule the new deposit injection. Args: data (dict): Deposit aggregated data Returns: None """ pass class SWHCeleryScheduling(SWHScheduling): """Deposit injection as Celery task scheduling. """ def __init__(self, config=None): super().__init__() if config: self.config.update(**config) self.dry_run = self.config['dry_run'] self.check = self.config['check'] if self.check: task_name = 'swh.deposit.injection.tasks.ChecksDepositTsk' else: task_name = 'swh.deposit.injection.tasks.LoadDepositArchiveTsk' self.task = get_task(task_name) def _convert(self, deposits): """Convert tuple to celery task signature. """ task = self.task for archive_url, meta_url, update_url, check_url in deposits: if self.check: yield task.s(deposit_check_url=check_url) else: yield task.s(archive_url=archive_url, deposit_meta_url=meta_url, deposit_update_url=update_url) def schedule(self, deposits): """Schedule the new deposit injection directly through celery. Args: depositdata (dict): Deposit aggregated information. Returns: None """ if self.dry_run: return return group(self._convert(deposits)).delay() class SWHSchedulerScheduling(SWHScheduling): """Deposit injection through SWH's task scheduling interface. """ ADDITIONAL_CONFIG = {} def __init__(self, config=None): super().__init__() from swh.scheduler.backend import SchedulerBackend if config: self.config.update(**config) self.dry_run = self.config['dry_run'] self.scheduler = SchedulerBackend(**self.config) self.check = self.config['check'] def _convert(self, deposits): """Convert tuple to one-shot scheduling tasks. """ for archive_url, meta_url, update_url, check_url in deposits: if self.check: task = create_oneshot_task_dict( 'swh-deposit-archive-checks', deposit_check_url=check_url) else: task = create_oneshot_task_dict( - 'swh-deposit-archive-ingestion', + 'swh-deposit-archive-injection', archive_url=archive_url, deposit_meta_url=meta_url, deposit_update_url=update_url) yield task def schedule(self, deposits): """Schedule the new deposit injection through swh.scheduler's api. Args: deposits (dict): Deposit aggregated information. """ if self.dry_run: return self.scheduler.create_tasks(self._convert(deposits)) def get_deposit_by(status): """Filter deposit given a specific status. """ from swh.deposit.models import Deposit yield from Deposit.objects.filter(status=status) def prepare_task_arguments(check): """Convert deposit to argument for task to be executed. """ from swh.deposit.config import PRIVATE_GET_RAW_CONTENT from swh.deposit.config import PRIVATE_GET_DEPOSIT_METADATA from swh.deposit.config import PRIVATE_PUT_DEPOSIT from swh.deposit.config import PRIVATE_CHECK_DEPOSIT from django.core.urlresolvers import reverse if check: status = DEPOSIT_STATUS_READY_FOR_CHECKS else: status = DEPOSIT_STATUS_READY for deposit in get_deposit_by(status): args = [deposit.collection.name, deposit.id] archive_url = reverse(PRIVATE_GET_RAW_CONTENT, args=args) meta_url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=args) update_url = reverse(PRIVATE_PUT_DEPOSIT, args=args) check_url = reverse(PRIVATE_CHECK_DEPOSIT, args=args) yield archive_url, meta_url, update_url, check_url @click.command( help='Schedule one-shot deposit injections') @click.option('--platform', default='development', help='development or production platform') @click.option('--scheduling-method', default='celery', help='Scheduling method') @click.option('--batch-size', default=1000, type=click.INT, help='Task batch size') @click.option('--dry-run/--no-dry-run', is_flag=True, default=False, help='Dry run') @click.option('--check', is_flag=True, default=False) def main(platform, scheduling_method, batch_size, dry_run, check): setup_django_for(platform) override_config = {} if dry_run: override_config['dry_run'] = dry_run override_config['check'] = check if scheduling_method == 'celery': scheduling = SWHCeleryScheduling(override_config) elif scheduling_method == 'swh-scheduler': scheduling = SWHSchedulerScheduling(override_config) else: raise ValueError( 'Only `celery` or `swh-scheduler` values are accepted') for deposits in utils.grouper(prepare_task_arguments(check), batch_size): scheduling.schedule(deposits) if __name__ == '__main__': main() diff --git a/swh/deposit/injection/tasks.py b/swh/deposit/injection/tasks.py index ce5d800a..b5d81d16 100644 --- a/swh/deposit/injection/tasks.py +++ b/swh/deposit/injection/tasks.py @@ -1,51 +1,51 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.task import Task from swh.deposit.injection.loader import DepositLoader from swh.deposit.injection.checker import DepositChecker class LoadDepositArchiveTsk(Task): - """Deposit archive ingestion task described by the following steps: + """Deposit archive injection task described by the following steps: 1. Retrieve tarball from deposit's private api and store locally in a temporary directory - 2. Trigger the ingestion + 2. Trigger the injection 3. clean up the temporary directory 4. Update the deposit's status according to result using the deposit's private update status api """ task_queue = 'swh_loader_deposit' def run_task(self, *, archive_url, deposit_meta_url, deposit_update_url): """Import a deposit tarball into swh. Args: see :func:`DepositLoader.load`. """ loader = DepositLoader() loader.log = self.log loader.load(archive_url=archive_url, deposit_meta_url=deposit_meta_url, deposit_update_url=deposit_update_url) class ChecksDepositTsk(Task): """Deposit checks task. """ task_queue = 'swh_checker_deposit' def run_task(self, deposit_check_url): """Check a deposit's status Args: see :func:`DepositChecker.check`. """ checker = DepositChecker() checker.log = self.log checker.check(deposit_check_url) diff --git a/swh/deposit/signals.py b/swh/deposit/signals.py index f70ad763..9863d9aa 100644 --- a/swh/deposit/signals.py +++ b/swh/deposit/signals.py @@ -1,83 +1,83 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module in charge of defining some uncoupled actions on deposit. Typically, checking that the archives deposited are ok are not directly testing in the request/answer to avoid too long computations. So this is done in the deposit_on_status_ready_for_check callback. """ from django.db.models.signals import post_save from django.dispatch import receiver from .models import Deposit from .config import SWHDefaultConfig, DEPOSIT_STATUS_READY from .config import DEPOSIT_STATUS_READY_FOR_CHECKS @receiver(post_save, sender=Deposit) def post_deposit_save(sender, instance, created, raw, using, update_fields, **kwargs): """When a deposit is saved, check for the deposit's status change and schedule actions accordingly. When the status passes to ready-for-checks, schedule checks. When the status pass to ready, schedule loading. Otherwise, do nothing. Args: sender (Deposit): The model class instance (Deposit): The actual instance being saved created (bool): True if a new record was created raw (bool): True if the model is saved exactly as presented (i.e. when loading a fixture). One should not query/modify other records in the database as the database might not be in a consistent state yet using: The database alias being used update_fields: The set of fields to update as passed to Model.save(), or None if update_fields wasn’t passed to save() """ default_config = SWHDefaultConfig() if not default_config.config['checks']: return if instance.status not in {DEPOSIT_STATUS_READY_FOR_CHECKS, DEPOSIT_STATUS_READY}: return from django.core.urlresolvers import reverse from swh.scheduler.utils import create_oneshot_task_dict args = [instance.collection.name, instance.id] if instance.status == DEPOSIT_STATUS_READY_FOR_CHECKS: # schedule archive check from swh.deposit.config import PRIVATE_CHECK_DEPOSIT check_url = reverse(PRIVATE_CHECK_DEPOSIT, args=args) task = create_oneshot_task_dict( 'swh-deposit-archive-checks', archive_check_url=check_url) else: # instance.status == DEPOSIT_STATUS_READY: # schedule loading from swh.deposit.config import PRIVATE_GET_RAW_CONTENT from swh.deposit.config import PRIVATE_GET_DEPOSIT_METADATA from swh.deposit.config import PRIVATE_PUT_DEPOSIT archive_url = reverse(PRIVATE_GET_RAW_CONTENT, args=args) meta_url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=args) update_url = reverse(PRIVATE_PUT_DEPOSIT, args=args) task = create_oneshot_task_dict( - 'swh-deposit-archive-ingestion', + 'swh-deposit-archive-injection', archive_url=archive_url, deposit_meta_url=meta_url, deposit_update_url=update_url) default_config.scheduler.create_tasks([task]) diff --git a/swh/deposit/tests/api/test_deposit_update_status.py b/swh/deposit/tests/api/test_deposit_update_status.py index 909ad0ad..6532a966 100644 --- a/swh/deposit/tests/api/test_deposit_update_status.py +++ b/swh/deposit/tests/api/test_deposit_update_status.py @@ -1,119 +1,119 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from django.core.urlresolvers import reverse from nose.tools import istest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.models import Deposit, DEPOSIT_STATUS_DETAIL from swh.deposit.config import PRIVATE_PUT_DEPOSIT, DEPOSIT_STATUS_READY from ..common import BasicTestCase class UpdateDepositStatusTest(APITestCase, BasicTestCase): """Update the deposit's status scenario """ def setUp(self): super().setUp() deposit = Deposit(status=DEPOSIT_STATUS_READY, collection=self.collection, client=self.user) deposit.save() self.deposit = Deposit.objects.get(pk=deposit.id) assert self.deposit.status == DEPOSIT_STATUS_READY @istest def update_deposit_status(self): """Existing status for update should return a 204 response """ url = reverse(PRIVATE_PUT_DEPOSIT, args=[self.collection.name, self.deposit.id]) possible_status = set(DEPOSIT_STATUS_DETAIL.keys()) - set(['success']) for _status in possible_status: response = self.client.put( url, content_type='application/json', data=json.dumps({'status': _status})) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) deposit = Deposit.objects.get(pk=self.deposit.id) self.assertEquals(deposit.status, _status) @istest - def update_deposit_with_success_ingestion_and_swh_id(self): + def update_deposit_with_success_injection_and_swh_id(self): """Existing status for update should return a 204 response """ url = reverse(PRIVATE_PUT_DEPOSIT, args=[self.collection.name, self.deposit.id]) expected_status = 'success' expected_id = revision_id = '47dc6b4636c7f6cba0df83e3d5490bf4334d987e' response = self.client.put( url, content_type='application/json', data=json.dumps({ 'status': expected_status, 'revision_id': revision_id, })) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) deposit = Deposit.objects.get(pk=self.deposit.id) self.assertEquals(deposit.status, expected_status) self.assertEquals(deposit.swh_id, expected_id) @istest def update_deposit_status_will_fail_with_unknown_status(self): """Unknown status for update should return a 400 response """ url = reverse(PRIVATE_PUT_DEPOSIT, args=[self.collection.name, self.deposit.id]) response = self.client.put( url, content_type='application/json', data=json.dumps({'status': 'unknown'})) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) @istest def update_deposit_status_will_fail_with_no_status_key(self): """No status provided for update should return a 400 response """ url = reverse(PRIVATE_PUT_DEPOSIT, args=[self.collection.name, self.deposit.id]) response = self.client.put( url, content_type='application/json', data=json.dumps({'something': 'something'})) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) @istest def update_deposit_status_success_without_swh_id_fail(self): """Providing 'success' status without swh_id should return a 400 """ url = reverse(PRIVATE_PUT_DEPOSIT, args=[self.collection.name, self.deposit.id]) response = self.client.put( url, content_type='application/json', data=json.dumps({'status': 'success'})) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)