Page MenuHomeSoftware Heritage

D2753.diff
No OneTemporary

D2753.diff

diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py
--- a/swh/indexer/indexer.py
+++ b/swh/indexer/indexer.py
@@ -8,14 +8,11 @@
import logging
import shutil
import tempfile
-import datetime
-from copy import deepcopy
from contextlib import contextmanager
-from typing import Any, Dict, Tuple, Generator, Union, List, Optional
+from typing import Any, Dict, Tuple, Generator, Union, List
from typing import Set
-from swh.scheduler import get_scheduler
from swh.scheduler import CONFIG as SWH_CONFIG
from swh.storage import get_storage
@@ -287,40 +284,8 @@
"""
pass
- def next_step(
- self, results: List[Dict], task: Optional[Dict[str, Any]]
- ) -> None:
- """Do something else with computations results (e.g. send to another
- queue, ...).
-
- (This is not an abstractmethod since it is optional).
-
- Args:
- results: List of results (dict) as returned
- by index function.
- task: a dict in the form expected by
- `scheduler.backend.SchedulerBackend.create_tasks`
- without `next_run`, plus an optional `result_name` key.
-
- Returns:
- None
-
- """
- if task:
- if getattr(self, 'scheduler', None):
- scheduler = self.scheduler
- else:
- scheduler = get_scheduler(**self.config['scheduler'])
- task = deepcopy(task)
- result_name = task.pop('result_name', None)
- task['next_run'] = datetime.datetime.now()
- if result_name:
- task['arguments']['kwargs'][result_name] = self.results
- scheduler.create_tasks([task])
-
@abc.abstractmethod
- def run(self, ids, policy_update,
- next_step=None, **kwargs):
+ def run(self, ids, policy_update, **kwargs):
"""Given a list of ids:
- retrieves the data from the storage
@@ -331,9 +296,6 @@
ids ([bytes]): id's identifier list
policy_update (str): either 'update-dups' or 'ignore-dups' to
respectively update duplicates or ignore them
- next_step (dict): a dict in the form expected by
- `scheduler.backend.SchedulerBackend.create_tasks`
- without `next_run`, plus a `result_name` key.
**kwargs: passed to the `index` method
"""
@@ -352,8 +314,7 @@
"""
- def run(self, ids, policy_update,
- next_step=None, **kwargs):
+ def run(self, ids, policy_update, **kwargs):
"""Given a list of ids:
- retrieve the content from the storage
@@ -365,9 +326,6 @@
policy_update (str): either 'update-dups' or 'ignore-dups' to
respectively update duplicates or ignore
them
- next_step (dict): a dict in the form expected by
- `scheduler.backend.SchedulerBackend.create_tasks`
- without `next_run`, plus an optional `result_name` key.
**kwargs: passed to the `index` method
"""
@@ -388,7 +346,6 @@
self.persist_index_computations(results, policy_update)
self.results = results
- return self.next_step(results, task=next_step)
except Exception:
if not self.catch_exceptions:
raise
@@ -568,9 +525,6 @@
origin_urls ([str]): list of origin urls.
policy_update (str): either 'update-dups' or 'ignore-dups' to
respectively update duplicates (default) or ignore them
- next_step (dict): a dict in the form expected by
- `scheduler.backend.SchedulerBackend.create_tasks` without
- `next_run`, plus an optional `result_name` key.
parse_ids (bool): Do we need to parse id or not (default)
**kwargs: passed to the `index` method
@@ -579,7 +533,6 @@
self.persist_index_computations(results, policy_update)
self.results = results
- return self.next_step(results, task=next_step)
def index_list(self, origins: List[Any], **kwargs: Any) -> List[Dict]:
results = []
@@ -607,7 +560,7 @@
class.
"""
- def run(self, ids, policy_update, next_step=None):
+ def run(self, ids, policy_update):
"""Given a list of sha1_gits:
- retrieve revisions from storage
@@ -641,4 +594,3 @@
'Problem when processing revision')
self.persist_index_computations(results, policy_update)
self.results = results
- return self.next_step(results, task=next_step)

File Metadata

Mime Type
text/plain
Expires
Tue, Dec 17, 4:59 AM (2 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220488

Event Timeline