Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/indexer.py
# Copyright (C) 2016-2018 The Software Heritage developers | # Copyright (C) 2016-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import abc | import abc | ||||
import ast | |||||
import os | import os | ||||
import logging | import logging | ||||
import shutil | import shutil | ||||
import tempfile | import tempfile | ||||
import datetime | import datetime | ||||
from copy import deepcopy | from copy import deepcopy | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
▲ Show 20 Lines • Show All 505 Lines • ▼ Show 20 Lines | def run(self, start, end, skip_existing=True, **kwargs): | ||||
if not self.catch_exceptions: | if not self.catch_exceptions: | ||||
raise | raise | ||||
self.log.exception( | self.log.exception( | ||||
'Problem when computing metadata.') | 'Problem when computing metadata.') | ||||
finally: | finally: | ||||
return with_indexed_data | return with_indexed_data | ||||
def origin_get_params(id_): | |||||
"""From any of the two types of origin identifiers (int or | |||||
type+url), returns a dict that can be passed to Storage.origin_get. | |||||
Also accepts JSON-encoded forms of these (used via the task scheduler). | |||||
>>> from pprint import pprint | |||||
>>> origin_get_params(123) | |||||
{'id': 123} | |||||
>>> pprint(origin_get_params(['git', 'https://example.com/foo.git'])) | |||||
{'type': 'git', 'url': 'https://example.com/foo.git'} | |||||
>>> origin_get_params("123") | |||||
{'id': 123} | |||||
>>> pprint(origin_get_params('["git", "https://example.com/foo.git"]')) | |||||
{'type': 'git', 'url': 'https://example.com/foo.git'} | |||||
""" | |||||
if isinstance(id_, str): | |||||
# Data coming from JSON, which requires string keys, so | |||||
# one extra level of deserialization is needed | |||||
id_ = ast.literal_eval(id_) | |||||
if isinstance(id_, (tuple, list)): | |||||
if len(id_) != 2: | |||||
raise TypeError('Expected a (type, url) tuple.') | |||||
(type_, url) = id_ | |||||
params = {'type': type_, 'url': url} | |||||
elif isinstance(id_, int): | |||||
params = {'id': id_} | |||||
else: | |||||
raise TypeError('Invalid value in "ids": %r' % id_) | |||||
return params | |||||
class OriginIndexer(BaseIndexer): | class OriginIndexer(BaseIndexer): | ||||
"""An object type indexer, inherits from the :class:`BaseIndexer` and | """An object type indexer, inherits from the :class:`BaseIndexer` and | ||||
implements Origin indexing using the run method | implements Origin indexing using the run method | ||||
Note: the :class:`OriginIndexer` is not an instantiable object. | Note: the :class:`OriginIndexer` is not an instantiable object. | ||||
To use it in another context one should inherit from this class | To use it in another context one should inherit from this class | ||||
and override the methods mentioned in the :class:`BaseIndexer` | and override the methods mentioned in the :class:`BaseIndexer` | ||||
class. | class. | ||||
""" | """ | ||||
def run(self, ids, policy_update='update-dups', parse_ids=True, | def run(self, origin_urls, policy_update='update-dups', | ||||
next_step=None, **kwargs): | next_step=None, **kwargs): | ||||
"""Given a list of origin ids: | """Given a list of origin ids: | ||||
- retrieve origins from storage | - retrieve origins from storage | ||||
- execute the indexing computations | - execute the indexing computations | ||||
- store the results (according to policy_update) | - store the results (according to policy_update) | ||||
Args: | Args: | ||||
ids ([Union[int, Tuple[str, bytes]]]): list of origin ids or | ids ([Union[int, Tuple[str, bytes]]]): list of origin ids or | ||||
(type, url) tuples. | (type, url) tuples. | ||||
policy_update (str): either 'update-dups' or 'ignore-dups' to | policy_update (str): either 'update-dups' or 'ignore-dups' to | ||||
respectively update duplicates (default) or ignore them | respectively update duplicates (default) or ignore them | ||||
next_step (dict): a dict in the form expected by | next_step (dict): a dict in the form expected by | ||||
`scheduler.backend.SchedulerBackend.create_tasks` without | `scheduler.backend.SchedulerBackend.create_tasks` without | ||||
`next_run`, plus an optional `result_name` key. | `next_run`, plus an optional `result_name` key. | ||||
parse_ids (bool): Do we need to parse id or not (default) | parse_ids (bool): Do we need to parse id or not (default) | ||||
**kwargs: passed to the `index` method | **kwargs: passed to the `index` method | ||||
""" | """ | ||||
if parse_ids: | results = self.index_list(origin_urls, **kwargs) | ||||
ids = [o.split('+', 1) if ':' in o else int(o) # type+url or id | |||||
for o in ids] | |||||
origins_filtered = [] | |||||
origins = self.storage.origin_get( | |||||
[origin_get_params(id_) for id_ in ids]) | |||||
for (id_, origin) in zip(ids, origins): | |||||
if not origin: | |||||
self.log.warning('Origin %s not found in storage' % | |||||
id_) | |||||
continue | |||||
origins_filtered.append(origin) | |||||
results = self.index_list(origins_filtered, **kwargs) | |||||
self.persist_index_computations(results, policy_update) | self.persist_index_computations(results, policy_update) | ||||
self.results = results | self.results = results | ||||
return self.next_step(results, task=next_step) | return self.next_step(results, task=next_step) | ||||
def index_list(self, origins, **kwargs): | def index_list(self, origins, **kwargs): | ||||
results = [] | results = [] | ||||
for origin in origins: | for origin in origins: | ||||
▲ Show 20 Lines • Show All 58 Lines • Show Last 20 Lines |