Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/orchestrator.py
Show First 20 Lines • Show All 87 Lines • ▼ Show 20 Lines | def prepare_tasks(self): | ||||
opts['check_presence'], | opts['check_presence'], | ||||
opts['batch_size']) | opts['batch_size']) | ||||
tasks[name] = utils.get_task(self.TASK_NAMES[name]) | tasks[name] = utils.get_task(self.TASK_NAMES[name]) | ||||
self.indexers = indexers | self.indexers = indexers | ||||
self.tasks = tasks | self.tasks = tasks | ||||
def run(self, ids): | def run(self, ids): | ||||
all_results = [] | |||||
for name, (idx_class, filtering, batch_size) in self.indexers.items(): | for name, (idx_class, filtering, batch_size) in self.indexers.items(): | ||||
if filtering: | if filtering: | ||||
policy_update = 'ignore-dups' | policy_update = 'ignore-dups' | ||||
indexer_class = get_class(idx_class) | indexer_class = get_class(idx_class) | ||||
ids_filtered = list(indexer_class().filter(ids)) | ids_filtered = list(indexer_class().filter(ids)) | ||||
if not ids_filtered: | if not ids_filtered: | ||||
continue | continue | ||||
else: | else: | ||||
policy_update = 'update-dups' | policy_update = 'update-dups' | ||||
ids_filtered = ids | ids_filtered = ids | ||||
celery_tasks = [] | celery_tasks = [] | ||||
for ids_to_send in grouper(ids_filtered, batch_size): | for ids_to_send in grouper(ids_filtered, batch_size): | ||||
celery_task = self.tasks[name].s( | celery_task = self.tasks[name].s( | ||||
ids=list(ids_to_send), | ids=list(ids_to_send), | ||||
policy_update=policy_update) | policy_update=policy_update) | ||||
celery_tasks.append(celery_task) | celery_tasks.append(celery_task) | ||||
self._run_tasks(celery_tasks) | all_results.append(self._run_tasks(celery_tasks)) | ||||
return all_results | |||||
def _run_tasks(self, celery_tasks): | def _run_tasks(self, celery_tasks): | ||||
group(celery_tasks).delay() | return group(celery_tasks).delay() | ||||
class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer): | class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer): | ||||
"""Orchestrator which deals with batch of any types of contents. | """Orchestrator which deals with batch of any types of contents. | ||||
""" | """ | ||||
class OrchestratorTextContentsIndexer(BaseOrchestratorIndexer): | class OrchestratorTextContentsIndexer(BaseOrchestratorIndexer): | ||||
"""Orchestrator which deals with batch of text contents. | """Orchestrator which deals with batch of text contents. | ||||
""" | """ | ||||
CONFIG_BASE_FILENAME = 'indexer/orchestrator_text' | CONFIG_BASE_FILENAME = 'indexer/orchestrator_text' |