Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/tasks.py
# Copyright (C) 2016-2017 The Software Heritage developers | # Copyright (C) 2016-2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
from swh.scheduler.task import Task | import celery | ||||
from .orchestrator import OrchestratorAllContentsIndexer | from .orchestrator import OrchestratorAllContentsIndexer | ||||
from .orchestrator import OrchestratorTextContentsIndexer | from .orchestrator import OrchestratorTextContentsIndexer | ||||
from .mimetype import ContentMimetypeIndexer | from .mimetype import ContentMimetypeIndexer | ||||
from .language import ContentLanguageIndexer | from .language import ContentLanguageIndexer | ||||
from .ctags import CtagsIndexer | from .ctags import CtagsIndexer | ||||
from .fossology_license import ContentFossologyLicenseIndexer | from .fossology_license import ContentFossologyLicenseIndexer | ||||
from .rehash import RecomputeChecksums | from .rehash import RecomputeChecksums | ||||
from .metadata import RevisionMetadataIndexer, OriginMetadataIndexer | |||||
logging.basicConfig(level=logging.INFO) | logging.basicConfig(level=logging.INFO) | ||||
class SWHOrchestratorAllContentsTask(Task): | class Task(celery.Task): | ||||
def run_task(self, *args, **kwargs): | |||||
indexer = self.Indexer().run(*args, **kwargs) | |||||
ardumont: Something is not clear here, what's the indexer? Is that an indexer or a chain?
At first i… | |||||
Done Inline ActionsOops, good catch. run was not meant to be called twice, that's totally a TypeError (which is not caught by tests because this code is mocked). vlorentz: Oops, good catch. `run` was not meant to be called twice, that's totally a TypeError (which is… | |||||
return indexer.results | |||||
class OrchestratorAllContentsTask(Task): | |||||
Not Done Inline Actions@vlorentz And that's wrong. I did not see that ;) In production: Oct 25 17:32:54 worker01 python3[79159]: [2018-10-25 17:32:54,017: ERROR/MainProcess] Task swh.indexer.tasks.OrchestratorAllContents[b39b8a5b-a046-4e0e-98b0-88aeab65af5a] raised unexpected: NotImplementedError('Tasks must define the run method.',) Traceback (most recent call last): File "/usr/lib/python3/dist-packages/celery/app/trace.py", line 240, in trace_task R = retval = fun(*args, **kwargs) File "/usr/lib/python3/dist-packages/celery/app/trace.py", line 438, in __protected_call__ return self.run(*args, **kwargs) File "/usr/lib/python3/dist-packages/celery/app/task.py", line 437, in run raise NotImplementedError('Tasks must define the run method.') NotImplementedError: Tasks must define the run method. We must keep the swh.scheduler.task here. ardumont: @vlorentz And that's wrong. I did not see that ;)
In production:
```
Oct 25 17:32:54 worker01… | |||||
Done Inline ActionsLet's just rename run_task to run vlorentz: Let's just rename `run_task` to `run` | |||||
"""Main task in charge of reading batch contents (of any type) and | """Main task in charge of reading batch contents (of any type) and | ||||
broadcasting them back to other tasks. | broadcasting them back to other tasks. | ||||
""" | """ | ||||
task_queue = 'swh_indexer_orchestrator_content_all' | task_queue = 'swh_indexer_orchestrator_content_all' | ||||
def run_task(self, *args, **kwargs): | Indexer = OrchestratorAllContentsIndexer | ||||
OrchestratorAllContentsIndexer().run(*args, **kwargs) | |||||
class SWHOrchestratorTextContentsTask(Task): | class OrchestratorTextContentsTask(Task): | ||||
"""Main task in charge of reading batch contents (of type text) and | """Main task in charge of reading batch contents (of type text) and | ||||
broadcasting them back to other tasks. | broadcasting them back to other tasks. | ||||
""" | """ | ||||
task_queue = 'swh_indexer_orchestrator_content_text' | task_queue = 'swh_indexer_orchestrator_content_text' | ||||
def run_task(self, *args, **kwargs): | Indexer = OrchestratorTextContentsIndexer | ||||
OrchestratorTextContentsIndexer().run(*args, **kwargs) | |||||
class RevisionMetadataTask(Task): | |||||
task_queue = 'swh_indexer_revision_metadata' | |||||
serializer = 'msgpack' | |||||
Indexer = RevisionMetadataIndexer | |||||
class SWHContentMimetypeTask(Task): | |||||
class OriginMetadataTask(Task): | |||||
task_queue = 'swh_indexer_origin_intrinsic_metadata' | |||||
Indexer = OriginMetadataIndexer | |||||
class ContentMimetypeTask(Task): | |||||
"""Task which computes the mimetype, encoding from the sha1's content. | """Task which computes the mimetype, encoding from the sha1's content. | ||||
""" | """ | ||||
task_queue = 'swh_indexer_content_mimetype' | task_queue = 'swh_indexer_content_mimetype' | ||||
def run_task(self, *args, **kwargs): | Indexer = ContentMimetypeIndexer | ||||
ContentMimetypeIndexer().run(*args, **kwargs) | |||||
class SWHContentLanguageTask(Task): | class ContentLanguageTask(Task): | ||||
"""Task which computes the language from the sha1's content. | """Task which computes the language from the sha1's content. | ||||
""" | """ | ||||
task_queue = 'swh_indexer_content_language' | task_queue = 'swh_indexer_content_language' | ||||
def run_task(self, *args, **kwargs): | def run_task(self, *args, **kwargs): | ||||
ContentLanguageIndexer().run(*args, **kwargs) | ContentLanguageIndexer().run(*args, **kwargs) | ||||
class SWHCtagsTask(Task): | class CtagsTask(Task): | ||||
"""Task which computes ctags from the sha1's content. | """Task which computes ctags from the sha1's content. | ||||
""" | """ | ||||
task_queue = 'swh_indexer_content_ctags' | task_queue = 'swh_indexer_content_ctags' | ||||
def run_task(self, *args, **kwargs): | Indexer = CtagsIndexer | ||||
CtagsIndexer().run(*args, **kwargs) | |||||
class SWHContentFossologyLicenseTask(Task): | class ContentFossologyLicenseTask(Task): | ||||
"""Task which computes licenses from the sha1's content. | """Task which computes licenses from the sha1's content. | ||||
""" | """ | ||||
task_queue = 'swh_indexer_content_fossology_license' | task_queue = 'swh_indexer_content_fossology_license' | ||||
def run_task(self, *args, **kwargs): | Indexer = ContentFossologyLicenseIndexer | ||||
ContentFossologyLicenseIndexer().run(*args, **kwargs) | |||||
class SWHRecomputeChecksumsTask(Task): | class RecomputeChecksumsTask(Task): | ||||
"""Task which recomputes hashes and possibly new ones. | """Task which recomputes hashes and possibly new ones. | ||||
""" | """ | ||||
task_queue = 'swh_indexer_content_rehash' | task_queue = 'swh_indexer_content_rehash' | ||||
def run_task(self, *args, **kwargs): | Indexer = RecomputeChecksums | ||||
RecomputeChecksums().run(*args, **kwargs) |
Something is not clear here, what's the indexer? Is that an indexer or a chain?
At first i thought you called the indexer twice...
Now i'm just wondering ;)
I have the feeling that's the indexer's next_step call which returns a celery chain (or something, well, a celery task or group of tasks...). But i'm 100% sure ;)
Can you please rename indexer to something more explicit?