diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py index 6b7372f..f11ccc4 100644 --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -1,119 +1,77 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import logging -from swh.scheduler.task import Task as SchedulerTask +from celery import current_app as app from .mimetype import MimetypeIndexer, MimetypeRangeIndexer from .language import LanguageIndexer from .ctags import CtagsIndexer from .fossology_license import ( FossologyLicenseIndexer, FossologyLicenseRangeIndexer ) from .rehash import RecomputeChecksums from .metadata import RevisionMetadataIndexer, OriginMetadataIndexer from .origin_head import OriginHeadIndexer -logging.basicConfig(level=logging.INFO) +@app.task(name=__name__ + '.RevisionMetadata') +def revision_metadata(*args, **kwargs): + results = RevisionMetadataIndexer().run(*args, **kwargs) + return getattr(results, 'results', results) -class Task(SchedulerTask): - """Task whose results is needed for other computations. - """ - def run_task(self, *args, **kwargs): - indexer = self.Indexer().run(*args, **kwargs) - if hasattr(indexer, 'results'): # indexer tasks - return indexer.results - return indexer +@app.task(name=__name__ + '.OriginMetadata') +def origin_metadata(*args, **kwargs): + results = OriginMetadataIndexer().run(*args, **kwargs) + return getattr(results, 'results', results) -class StatusTask(SchedulerTask): - """Task which returns a status either eventful or uneventful. +@app.task(name=__name__ + '.OriginHead') +def origin_head(*args, **kwargs): + results = OriginHeadIndexer().run(*args, **kwargs) + return getattr(results, 'results', results) - """ - def run_task(self, *args, **kwargs): - results = self.Indexer().run(*args, **kwargs) - return {'status': 'eventful' if results else 'uneventful'} +@app.task(name=__name__ + '.ContentLanguage') +def content_language(*args, **kwargs): + results = LanguageIndexer().run(*args, **kwargs) + return getattr(results, 'results', results) -class RevisionMetadata(Task): - task_queue = 'swh_indexer_revision_metadata' - serializer = 'msgpack' +@app.task(name=__name__ + '.Ctags') +def ctags(*args, **kwargs): + results = CtagsIndexer().run(*args, **kwargs) + return getattr(results, 'results', results) - Indexer = RevisionMetadataIndexer +@app.task(name=__name__ + '.ContentFossologyLicense') +def fossology_license(*args, **kwargs): + results = FossologyLicenseIndexer().run(*args, **kwargs) + return getattr(results, 'results', results) -class OriginMetadata(Task): - task_queue = 'swh_indexer_origin_intrinsic_metadata' - Indexer = OriginMetadataIndexer +@app.task(name=__name__ + '.RecomputeChecksums') +def recompute_checksums(*args, **kwargs): + results = RecomputeChecksums().run(*args, **kwargs) + return getattr(results, 'results', results) -class OriginHead(Task): - task_queue = 'swh_indexer_origin_head' +@app.task(name=__name__ + '.ContentMimetype') +def mimetype(*args, **kwargs): + results = MimetypeIndexer().run(*args, **kwargs) + return {'status': 'eventful' if results else 'uneventful'} - Indexer = OriginHeadIndexer +@app.task(name=__name__ + '.ContentRangeMimetype') +def range_mimetype(*args, **kwargs): + results = MimetypeRangeIndexer(*args, **kwargs) + return {'status': 'eventful' if results else 'uneventful'} -class ContentMimetype(StatusTask): - """Compute (mimetype, encoding) on a list of sha1s' content. - """ - task_queue = 'swh_indexer_content_mimetype' - Indexer = MimetypeIndexer - - -class ContentRangeMimetype(StatusTask): - """Compute (mimetype, encoding) on a range of sha1s. - - """ - task_queue = 'swh_indexer_content_mimetype_range' - Indexer = MimetypeRangeIndexer - - -class ContentLanguage(Task): - """Task which computes the language from the sha1's content. - - """ - task_queue = 'swh_indexer_content_language' - - Indexer = LanguageIndexer - - -class Ctags(Task): - """Task which computes ctags from the sha1's content. - - """ - task_queue = 'swh_indexer_content_ctags' - - Indexer = CtagsIndexer - - -class ContentFossologyLicense(Task): - """Compute fossology licenses on a list of sha1s' content. - - """ - task_queue = 'swh_indexer_content_fossology_license' - Indexer = FossologyLicenseIndexer - - -class ContentRangeFossologyLicense(StatusTask): - """Compute fossology license on a range of sha1s. - - """ - task_queue = 'swh_indexer_content_fossology_license_range' - Indexer = FossologyLicenseRangeIndexer - - -class RecomputeChecksums(Task): - """Task which recomputes hashes and possibly new ones. - - """ - task_queue = 'swh_indexer_content_rehash' - - Indexer = RecomputeChecksums +@app.task(name=__name__ + '.ContentRangeFossologyLicense') +def range_license(*args, **kwargs): + results = FossologyLicenseRangeIndexer(*args, **kwargs) + return {'status': 'eventful' if results else 'uneventful'} diff --git a/swh/indexer/tests/conftest.py b/swh/indexer/tests/conftest.py index 2636e7a..7392517 100644 --- a/swh/indexer/tests/conftest.py +++ b/swh/indexer/tests/conftest.py @@ -1,28 +1,29 @@ import pytest from datetime import timedelta from swh.scheduler.tests.conftest import * # noqa TASK_NAMES = ['revision_metadata', 'origin_intrinsic_metadata'] @pytest.fixture def indexer_scheduler(swh_scheduler): for taskname in TASK_NAMES: swh_scheduler.create_task_type({ 'type': taskname, 'description': 'The {} indexer testing task'.format(taskname), 'backend_name': 'swh.indexer.tests.tasks.{}'.format(taskname), 'default_interval': timedelta(days=1), 'min_interval': timedelta(hours=6), 'max_interval': timedelta(days=12), 'num_retries': 3, }) return swh_scheduler @pytest.fixture(scope='session') def celery_includes(): return [ 'swh.indexer.tests.tasks', + 'swh.indexer.tasks', ]