Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/indexer.py
Show First 20 Lines • Show All 313 Lines • ▼ Show 20 Lines | def next_step(self, results): | ||||
Returns: | Returns: | ||||
None | None | ||||
""" | """ | ||||
pass | pass | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def run(self, ids, policy_update): | def run(self, ids, policy_update, **kwargs): | ||||
"""Given a list of ids: | """Given a list of ids: | ||||
- retrieves the data from the storage | - retrieves the data from the storage | ||||
- executes the indexing computations | - executes the indexing computations | ||||
- stores the results (according to policy_update) | - stores the results (according to policy_update) | ||||
Args: | Args: | ||||
ids ([bytes]): id's identifier list | ids ([bytes]): id's identifier list | ||||
policy_update ([str]): either 'update-dups' or 'ignore-dups' to | policy_update ([str]): either 'update-dups' or 'ignore-dups' to | ||||
respectively update duplicates or ignore them | respectively update duplicates or ignore them | ||||
**kwargs: passed to the `index` method | |||||
""" | """ | ||||
pass | pass | ||||
class ContentIndexer(BaseIndexer): | class ContentIndexer(BaseIndexer): | ||||
"""An object type indexer, inherits from the :class:`BaseIndexer` and | """An object type indexer, inherits from the :class:`BaseIndexer` and | ||||
implements Content indexing using the run method | implements Content indexing using the run method | ||||
Note: the :class:`ContentIndexer` is not an instantiable | Note: the :class:`ContentIndexer` is not an instantiable | ||||
object. To use it in another context, one should inherit from this | object. To use it in another context, one should inherit from this | ||||
class and override the methods mentioned in the | class and override the methods mentioned in the | ||||
:class:`BaseIndexer` class. | :class:`BaseIndexer` class. | ||||
""" | """ | ||||
def run(self, ids, policy_update): | def run(self, ids, policy_update, **kwargs): | ||||
"""Given a list of ids: | """Given a list of ids: | ||||
- retrieve the content from the storage | - retrieve the content from the storage | ||||
- execute the indexing computations | - execute the indexing computations | ||||
- store the results (according to policy_update) | - store the results (according to policy_update) | ||||
Args: | Args: | ||||
ids ([bytes]): sha1's identifier list | ids ([bytes]): sha1's identifier list | ||||
policy_update ([str]): either 'update-dups' or 'ignore-dups' to | policy_update ([str]): either 'update-dups' or 'ignore-dups' to | ||||
respectively update duplicates or ignore | respectively update duplicates or ignore | ||||
them | them | ||||
**kwargs: passed to the `index` method | |||||
""" | """ | ||||
results = [] | results = [] | ||||
try: | try: | ||||
for sha1 in ids: | for sha1 in ids: | ||||
try: | try: | ||||
raw_content = self.objstorage.get(sha1) | raw_content = self.objstorage.get(sha1) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
self.log.warn('Content %s not found in objstorage' % | self.log.warn('Content %s not found in objstorage' % | ||||
hashutil.hash_to_hex(sha1)) | hashutil.hash_to_hex(sha1)) | ||||
continue | continue | ||||
res = self.index(sha1, raw_content) | res = self.index(sha1, raw_content, **kwargs) | ||||
if res: # If no results, skip it | if res: # If no results, skip it | ||||
results.append(res) | results.append(res) | ||||
self.persist_index_computations(results, policy_update) | self.persist_index_computations(results, policy_update) | ||||
self.next_step(results) | self.results = results | ||||
moranegg: ping @ardumont
I remember that there was a reason why there is no return to run, but maybe I'm… | |||||
Not Done Inline ActionsPreviously next_step did not return anything (triggered a new delayed task - mimetype indexer towards orchestrator text, and that was all IIRC). I think that changes in that diff (yes, take a look at the OriginHeadIndexer's implementation below). To sum up, as usual, it's ok to adapt if it's needed. ardumont: Previously `next_step` did not return anything (triggered a new delayed task - mimetype indexer… | |||||
return self.next_step(results) | |||||
except Exception: | except Exception: | ||||
self.log.exception( | self.log.exception( | ||||
'Problem when reading contents metadata.') | 'Problem when reading contents metadata.') | ||||
if self.rescheduling_task: | if self.rescheduling_task: | ||||
self.log.warn('Rescheduling batch') | self.log.warn('Rescheduling batch') | ||||
self.rescheduling_task.delay(ids, policy_update) | self.rescheduling_task.delay(ids, policy_update) | ||||
class OriginIndexer(BaseIndexer): | class OriginIndexer(BaseIndexer): | ||||
"""An object type indexer, inherits from the :class:`BaseIndexer` and | """An object type indexer, inherits from the :class:`BaseIndexer` and | ||||
implements Origin indexing using the run method | implements Origin indexing using the run method | ||||
Note: the :class:`OriginIndexer` is not an instantiable object. | Note: the :class:`OriginIndexer` is not an instantiable object. | ||||
To use it in another context one should inherit from this class | To use it in another context one should inherit from this class | ||||
and override the methods mentioned in the :class:`BaseIndexer` | and override the methods mentioned in the :class:`BaseIndexer` | ||||
class. | class. | ||||
""" | """ | ||||
def run(self, ids, policy_update, parse_ids=False): | def run(self, ids, policy_update, parse_ids=False, **kwargs): | ||||
"""Given a list of origin ids: | """Given a list of origin ids: | ||||
- retrieve origins from storage | - retrieve origins from storage | ||||
- execute the indexing computations | - execute the indexing computations | ||||
- store the results (according to policy_update) | - store the results (according to policy_update) | ||||
Args: | Args: | ||||
ids ([Union[int, Tuple[str, bytes]]]): list of origin ids or | ids ([Union[int, Tuple[str, bytes]]]): list of origin ids or | ||||
(type, url) tuples. | (type, url) tuples. | ||||
policy_update ([str]): either 'update-dups' or 'ignore-dups' to | policy_update ([str]): either 'update-dups' or 'ignore-dups' to | ||||
respectively update duplicates or ignore | respectively update duplicates or ignore | ||||
them | them | ||||
parse_ids ([bool]: If `True`, will try to convert `ids` | parse_ids ([bool]: If `True`, will try to convert `ids` | ||||
from a human input to the valid type. | from a human input to the valid type. | ||||
**kwargs: passed to the `index` method | |||||
""" | """ | ||||
if parse_ids: | if parse_ids: | ||||
ids = [ | ids = [ | ||||
o.split('+', 1) if ':' in o else int(o) # type+url or id | o.split('+', 1) if ':' in o else int(o) # type+url or id | ||||
for o in ids] | for o in ids] | ||||
results = [] | results = [] | ||||
for id_ in ids: | for id_ in ids: | ||||
if isinstance(id_, (tuple, list)): | if isinstance(id_, (tuple, list)): | ||||
if len(id_) != 2: | if len(id_) != 2: | ||||
raise TypeError('Expected a (type, url) tuple.') | raise TypeError('Expected a (type, url) tuple.') | ||||
(type_, url) = id_ | (type_, url) = id_ | ||||
params = {'type': type_, 'url': url} | params = {'type': type_, 'url': url} | ||||
elif isinstance(id_, int): | elif isinstance(id_, int): | ||||
params = {'id': id_} | params = {'id': id_} | ||||
else: | else: | ||||
raise TypeError('Invalid value for "ids": %r' % id_) | raise TypeError('Invalid value in "ids": %r' % id_) | ||||
origin = self.storage.origin_get(params) | origin = self.storage.origin_get(params) | ||||
if not origin: | if not origin: | ||||
self.log.warn('Origins %s not found in storage' % | self.log.warn('Origins %s not found in storage' % | ||||
list(ids)) | list(ids)) | ||||
continue | continue | ||||
try: | try: | ||||
res = self.index(origin) | res = self.index(origin, **kwargs) | ||||
if origin: # If no results, skip it | if origin: # If no results, skip it | ||||
results.append(res) | results.append(res) | ||||
except Exception: | except Exception: | ||||
self.log.exception( | self.log.exception( | ||||
'Problem when processing origin %s' % id_) | 'Problem when processing origin %s' % id_) | ||||
self.persist_index_computations(results, policy_update) | self.persist_index_computations(results, policy_update) | ||||
self.results = results | |||||
return self.next_step(results) | |||||
class RevisionIndexer(BaseIndexer): | class RevisionIndexer(BaseIndexer): | ||||
"""An object type indexer, inherits from the :class:`BaseIndexer` and | """An object type indexer, inherits from the :class:`BaseIndexer` and | ||||
implements Revision indexing using the run method | implements Revision indexing using the run method | ||||
Note: the :class:`RevisionIndexer` is not an instantiable object. | Note: the :class:`RevisionIndexer` is not an instantiable object. | ||||
To use it in another context one should inherit from this class | To use it in another context one should inherit from this class | ||||
Show All 26 Lines | def run(self, ids, policy_update): | ||||
try: | try: | ||||
res = self.index(rev) | res = self.index(rev) | ||||
if res: # If no results, skip it | if res: # If no results, skip it | ||||
results.append(res) | results.append(res) | ||||
except Exception: | except Exception: | ||||
self.log.exception( | self.log.exception( | ||||
'Problem when processing revision') | 'Problem when processing revision') | ||||
self.persist_index_computations(results, policy_update) | self.persist_index_computations(results, policy_update) | ||||
self.results = results | |||||
return self.next_step(results) |
ping @ardumont
I remember that there was a reason why there is no return to run, but maybe I'm wrong