diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -8,14 +8,11 @@ import logging import shutil import tempfile -import datetime -from copy import deepcopy from contextlib import contextmanager -from typing import Any, Dict, Tuple, Generator, Union, List, Optional +from typing import Any, Dict, Tuple, Generator, Union, List from typing import Set -from swh.scheduler import get_scheduler from swh.scheduler import CONFIG as SWH_CONFIG from swh.storage import get_storage @@ -287,40 +284,8 @@ """ pass - def next_step( - self, results: List[Dict], task: Optional[Dict[str, Any]] - ) -> None: - """Do something else with computations results (e.g. send to another - queue, ...). - - (This is not an abstractmethod since it is optional). - - Args: - results: List of results (dict) as returned - by index function. - task: a dict in the form expected by - `scheduler.backend.SchedulerBackend.create_tasks` - without `next_run`, plus an optional `result_name` key. - - Returns: - None - - """ - if task: - if getattr(self, 'scheduler', None): - scheduler = self.scheduler - else: - scheduler = get_scheduler(**self.config['scheduler']) - task = deepcopy(task) - result_name = task.pop('result_name', None) - task['next_run'] = datetime.datetime.now() - if result_name: - task['arguments']['kwargs'][result_name] = self.results - scheduler.create_tasks([task]) - @abc.abstractmethod - def run(self, ids, policy_update, - next_step=None, **kwargs): + def run(self, ids, policy_update, **kwargs): """Given a list of ids: - retrieves the data from the storage @@ -331,9 +296,6 @@ ids ([bytes]): id's identifier list policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them - next_step (dict): a dict in the form expected by - `scheduler.backend.SchedulerBackend.create_tasks` - without `next_run`, plus a `result_name` key. **kwargs: passed to the `index` method """ @@ -352,8 +314,7 @@ """ - def run(self, ids, policy_update, - next_step=None, **kwargs): + def run(self, ids, policy_update, **kwargs): """Given a list of ids: - retrieve the content from the storage @@ -365,9 +326,6 @@ policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them - next_step (dict): a dict in the form expected by - `scheduler.backend.SchedulerBackend.create_tasks` - without `next_run`, plus an optional `result_name` key. **kwargs: passed to the `index` method """ @@ -388,7 +346,6 @@ self.persist_index_computations(results, policy_update) self.results = results - return self.next_step(results, task=next_step) except Exception: if not self.catch_exceptions: raise @@ -568,9 +525,6 @@ origin_urls ([str]): list of origin urls. policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates (default) or ignore them - next_step (dict): a dict in the form expected by - `scheduler.backend.SchedulerBackend.create_tasks` without - `next_run`, plus an optional `result_name` key. parse_ids (bool): Do we need to parse id or not (default) **kwargs: passed to the `index` method @@ -579,7 +533,6 @@ self.persist_index_computations(results, policy_update) self.results = results - return self.next_step(results, task=next_step) def index_list(self, origins: List[Any], **kwargs: Any) -> List[Dict]: results = [] @@ -607,7 +560,7 @@ class. """ - def run(self, ids, policy_update, next_step=None): + def run(self, ids, policy_update): """Given a list of sha1_gits: - retrieve revisions from storage @@ -641,4 +594,3 @@ 'Problem when processing revision') self.persist_index_computations(results, policy_update) self.results = results - return self.next_step(results, task=next_step)