Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/tasks.py
# Copyright (C) 2016-2017 The Software Heritage developers | # Copyright (C) 2016-2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
from swh.scheduler.task import Task | import celery | ||||
from .orchestrator import OrchestratorAllContentsIndexer | from .orchestrator import OrchestratorAllContentsIndexer | ||||
from .orchestrator import OrchestratorTextContentsIndexer | from .orchestrator import OrchestratorTextContentsIndexer | ||||
from .mimetype import ContentMimetypeIndexer | from .mimetype import ContentMimetypeIndexer | ||||
from .language import ContentLanguageIndexer | from .language import ContentLanguageIndexer | ||||
from .ctags import CtagsIndexer | from .ctags import CtagsIndexer | ||||
from .fossology_license import ContentFossologyLicenseIndexer | from .fossology_license import ContentFossologyLicenseIndexer | ||||
from .rehash import RecomputeChecksums | from .rehash import RecomputeChecksums | ||||
from .metadata import RevisionMetadataIndexer, OriginMetadataIndexer | |||||
logging.basicConfig(level=logging.INFO) | logging.basicConfig(level=logging.INFO) | ||||
class Task(celery.Task): | |||||
def run_task(self, *args, **kwargs): | |||||
indexer = self.Indexer().run(*args, **kwargs) | |||||
indexer.run() | |||||
return indexer.results | |||||
class SWHOrchestratorAllContentsTask(Task): | class SWHOrchestratorAllContentsTask(Task): | ||||
"""Main task in charge of reading batch contents (of any type) and | """Main task in charge of reading batch contents (of any type) and | ||||
broadcasting them back to other tasks. | broadcasting them back to other tasks. | ||||
""" | """ | ||||
task_queue = 'swh_indexer_orchestrator_content_all' | task_queue = 'swh_indexer_orchestrator_content_all' | ||||
def run_task(self, *args, **kwargs): | Indexer = OrchestratorAllContentsIndexer | ||||
OrchestratorAllContentsIndexer().run(*args, **kwargs) | |||||
class SWHOrchestratorTextContentsTask(Task): | class SWHOrchestratorTextContentsTask(Task): | ||||
"""Main task in charge of reading batch contents (of type text) and | """Main task in charge of reading batch contents (of type text) and | ||||
broadcasting them back to other tasks. | broadcasting them back to other tasks. | ||||
""" | """ | ||||
task_queue = 'swh_indexer_orchestrator_content_text' | task_queue = 'swh_indexer_orchestrator_content_text' | ||||
def run_task(self, *args, **kwargs): | Indexer = OrchestratorTextContentsIndexer | ||||
OrchestratorTextContentsIndexer().run(*args, **kwargs) | |||||
class SWHRevisionMetadataTask(Task): | |||||
task_queue = 'swh_indexer_revision_metadata' | |||||
serializer = 'msgpack' | |||||
Indexer = RevisionMetadataIndexer | |||||
class SWHOriginMetadataTask(Task): | |||||
task_queue = 'swh_indexer_origin_intrinsic_metadata' | |||||
Indexer = OriginMetadataIndexer | |||||
class SWHContentMimetypeTask(Task): | class SWHContentMimetypeTask(Task): | ||||
"""Task which computes the mimetype, encoding from the sha1's content. | """Task which computes the mimetype, encoding from the sha1's content. | ||||
""" | """ | ||||
task_queue = 'swh_indexer_content_mimetype' | task_queue = 'swh_indexer_content_mimetype' | ||||
def run_task(self, *args, **kwargs): | Indexer = ContentMimetypeIndexer | ||||
ContentMimetypeIndexer().run(*args, **kwargs) | |||||
class SWHContentLanguageTask(Task): | class SWHContentLanguageTask(Task): | ||||
"""Task which computes the language from the sha1's content. | """Task which computes the language from the sha1's content. | ||||
""" | """ | ||||
task_queue = 'swh_indexer_content_language' | task_queue = 'swh_indexer_content_language' | ||||
def run_task(self, *args, **kwargs): | def run_task(self, *args, **kwargs): | ||||
ContentLanguageIndexer().run(*args, **kwargs) | ContentLanguageIndexer().run(*args, **kwargs) | ||||
class SWHCtagsTask(Task): | class SWHCtagsTask(Task): | ||||
"""Task which computes ctags from the sha1's content. | """Task which computes ctags from the sha1's content. | ||||
""" | """ | ||||
task_queue = 'swh_indexer_content_ctags' | task_queue = 'swh_indexer_content_ctags' | ||||
def run_task(self, *args, **kwargs): | Indexer = CtagsIndexer | ||||
CtagsIndexer().run(*args, **kwargs) | |||||
class SWHContentFossologyLicenseTask(Task): | class SWHContentFossologyLicenseTask(Task): | ||||
"""Task which computes licenses from the sha1's content. | """Task which computes licenses from the sha1's content. | ||||
""" | """ | ||||
task_queue = 'swh_indexer_content_fossology_license' | task_queue = 'swh_indexer_content_fossology_license' | ||||
def run_task(self, *args, **kwargs): | Indexer = ContentFossologyLicenseIndexer | ||||
ContentFossologyLicenseIndexer().run(*args, **kwargs) | |||||
class SWHRecomputeChecksumsTask(Task): | class SWHRecomputeChecksumsTask(Task): | ||||
"""Task which recomputes hashes and possibly new ones. | """Task which recomputes hashes and possibly new ones. | ||||
""" | """ | ||||
task_queue = 'swh_indexer_content_rehash' | task_queue = 'swh_indexer_content_rehash' | ||||
def run_task(self, *args, **kwargs): | Indexer = RecomputeChecksums | ||||
RecomputeChecksums().run(*args, **kwargs) |