Changeset View
Standalone View
swh/indexer/indexer.py
# Copyright (C) 2016-2018 The Software Heritage developers | # Copyright (C) 2016-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import abc | import abc | ||||
import os | import os | ||||
import logging | import logging | ||||
import shutil | import shutil | ||||
import tempfile | import tempfile | ||||
import datetime | import datetime | ||||
from copy import deepcopy | from copy import deepcopy | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
from typing import Any, Dict, Tuple | from typing import Any, Dict, Tuple, Generator, Union, List, Optional | ||||
from typing import Set | |||||
from swh.scheduler import get_scheduler | from swh.scheduler import get_scheduler | ||||
from swh.scheduler import CONFIG as SWH_CONFIG | from swh.scheduler import CONFIG as SWH_CONFIG | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.core.config import SWHConfig | from swh.core.config import SWHConfig | ||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY | from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.core import utils | from swh.core import utils | ||||
@contextmanager | @contextmanager | ||||
def write_to_temp(filename, data, working_directory): | def write_to_temp( | ||||
filename: str, data: bytes, working_directory: str | |||||
vlorentz: it yields strings | |||||
) -> Generator[str, None, None]: | |||||
"""Write the sha1's content in a temporary file. | """Write the sha1's content in a temporary file. | ||||
Args: | Args: | ||||
filename (str): one of sha1's many filenames | filename: one of sha1's many filenames | ||||
data (bytes): the sha1's content to write in temporary | data: the sha1's content to write in temporary | ||||
file | file | ||||
working_directory: the directory into which the | |||||
file is written | |||||
Done Inline ActionsAdd the working directory docstring, it's where the file is written. ardumont: Add the working directory docstring, it's where the file is written.
i also think it's a string… | |||||
Done Inline ActionsAdded to the docstring. Regarding the type of working_directory, I wasn't able to figure it out based on checking its use in this method, as well as from checking the methods that call this method through using grep. According to this, the type can either be either a string or bytes object representing a path. . So if we're sure it's a string, I'll change it to str 👍 krithikvaidya: Added to the docstring.
Regarding the type of //working_directory//, I wasn't able to figure… | |||||
Done Inline ActionsYes, it could be bytes also. ardumont: Yes, it could be bytes also.
but i believe we only use it with string. | |||||
Returns: | Returns: | ||||
The path to the temporary file created. That file is | The path to the temporary file created. That file is | ||||
filled in with the raw content's data. | filled in with the raw content's data. | ||||
""" | """ | ||||
os.makedirs(working_directory, exist_ok=True) | os.makedirs(working_directory, exist_ok=True) | ||||
temp_dir = tempfile.mkdtemp(dir=working_directory) | temp_dir = tempfile.mkdtemp(dir=working_directory) | ||||
▲ Show 20 Lines • Show All 53 Lines • ▼ Show 20 Lines | :meth:`~BaseIndexer.check`: | ||||
Configuration check for the indexer. When overriding, this must call the | Configuration check for the indexer. When overriding, this must call the | ||||
`super().check()` instruction. | `super().check()` instruction. | ||||
:meth:`~BaseIndexer.register_tools`: | :meth:`~BaseIndexer.register_tools`: | ||||
This should return a dict of the tool(s) to use when indexing or | This should return a dict of the tool(s) to use when indexing or | ||||
filtering. | filtering. | ||||
""" | """ | ||||
results: List[Dict] | |||||
CONFIG = 'indexer/base' | CONFIG = 'indexer/base' | ||||
DEFAULT_CONFIG = { | DEFAULT_CONFIG = { | ||||
INDEXER_CFG_KEY: ('dict', { | INDEXER_CFG_KEY: ('dict', { | ||||
'cls': 'remote', | 'cls': 'remote', | ||||
'args': { | 'args': { | ||||
'url': 'http://localhost:5007/' | 'url': 'http://localhost:5007/' | ||||
} | } | ||||
Show All 15 Lines | class BaseIndexer(SWHConfig, metaclass=abc.ABCMeta): | ||||
ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] | ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] | ||||
USE_TOOLS = True | USE_TOOLS = True | ||||
catch_exceptions = True | catch_exceptions = True | ||||
"""Prevents exceptions in `index()` from raising too high. Set to False | """Prevents exceptions in `index()` from raising too high. Set to False | ||||
in tests to properly catch all exceptions.""" | in tests to properly catch all exceptions.""" | ||||
def __init__(self, config=None, **kw): | scheduler: Any | ||||
def __init__(self, config=None, **kw) -> None: | |||||
"""Prepare and check that the indexer is ready to run. | """Prepare and check that the indexer is ready to run. | ||||
""" | """ | ||||
super().__init__() | super().__init__() | ||||
if config is not None: | if config is not None: | ||||
self.config = config | self.config = config | ||||
elif SWH_CONFIG: | elif SWH_CONFIG: | ||||
self.config = SWH_CONFIG.copy() | self.config = SWH_CONFIG.copy() | ||||
else: | else: | ||||
config_keys = ('base_filename', 'config_filename', | config_keys = ('base_filename', 'config_filename', | ||||
'additional_configs', 'global_config') | 'additional_configs', 'global_config') | ||||
config_args = {k: v for k, v in kw.items() if k in config_keys} | config_args = {k: v for k, v in kw.items() if k in config_keys} | ||||
if self.ADDITIONAL_CONFIG: | if self.ADDITIONAL_CONFIG: | ||||
config_args.setdefault('additional_configs', []).append( | config_args.setdefault('additional_configs', []).append( | ||||
self.ADDITIONAL_CONFIG) | self.ADDITIONAL_CONFIG) | ||||
self.config = self.parse_config_file(**config_args) | self.config = self.parse_config_file(**config_args) | ||||
self.prepare() | self.prepare() | ||||
self.check() | self.check() | ||||
self.log.debug('%s: config=%s', self, self.config) | self.log.debug('%s: config=%s', self, self.config) | ||||
def prepare(self): | def prepare(self) -> None: | ||||
"""Prepare the indexer's needed runtime configuration. | """Prepare the indexer's needed runtime configuration. | ||||
Without this step, the indexer cannot possibly run. | Without this step, the indexer cannot possibly run. | ||||
""" | """ | ||||
config_storage = self.config.get('storage') | config_storage = self.config.get('storage') | ||||
if config_storage: | if config_storage: | ||||
self.storage = get_storage(**config_storage) | self.storage = get_storage(**config_storage) | ||||
objstorage = self.config['objstorage'] | objstorage = self.config['objstorage'] | ||||
self.objstorage = get_objstorage(objstorage['cls'], | self.objstorage = get_objstorage(objstorage['cls'], | ||||
objstorage['args']) | objstorage['args']) | ||||
idx_storage = self.config[INDEXER_CFG_KEY] | idx_storage = self.config[INDEXER_CFG_KEY] | ||||
self.idx_storage = get_indexer_storage(**idx_storage) | self.idx_storage = get_indexer_storage(**idx_storage) | ||||
_log = logging.getLogger('requests.packages.urllib3.connectionpool') | _log = logging.getLogger('requests.packages.urllib3.connectionpool') | ||||
_log.setLevel(logging.WARN) | _log.setLevel(logging.WARN) | ||||
self.log = logging.getLogger('swh.indexer') | self.log = logging.getLogger('swh.indexer') | ||||
if self.USE_TOOLS: | if self.USE_TOOLS: | ||||
self.tools = list(self.register_tools( | self.tools = list(self.register_tools( | ||||
self.config.get('tools', []))) | self.config.get('tools', []))) | ||||
self.results = [] | self.results = [] | ||||
Done Inline Actionsannotation should preferably be on the class itself. vlorentz: annotation should preferably be on the class itself. | |||||
@property | @property | ||||
def tool(self): | def tool(self) -> Dict: | ||||
Done Inline ActionsIt's a Dict vlorentz: It's a Dict | |||||
return self.tools[0] | return self.tools[0] | ||||
def check(self): | def check(self) -> None: | ||||
"""Check the indexer's configuration is ok before proceeding. | """Check the indexer's configuration is ok before proceeding. | ||||
If ok, does nothing. If not raise error. | If ok, does nothing. If not raise error. | ||||
""" | """ | ||||
if self.USE_TOOLS and not self.tools: | if self.USE_TOOLS and not self.tools: | ||||
raise ValueError('Tools %s is unknown, cannot continue' % | raise ValueError('Tools %s is unknown, cannot continue' % | ||||
self.tools) | self.tools) | ||||
def _prepare_tool(self, tool): | def _prepare_tool(self, tool: Dict[str, Any]) -> Dict[str, Any]: | ||||
"""Prepare the tool dict to be compliant with the storage api. | """Prepare the tool dict to be compliant with the storage api. | ||||
""" | """ | ||||
return {'tool_%s' % key: value for key, value in tool.items()} | return {'tool_%s' % key: value for key, value in tool.items()} | ||||
def register_tools(self, tools): | def register_tools( | ||||
self, tools: Union[Dict[str, Any], List[Dict[str, Any]]] | |||||
) -> List[Dict[str, Any]]: | |||||
"""Permit to register tools to the storage. | """Permit to register tools to the storage. | ||||
Add a sensible default which can be overridden if not | Add a sensible default which can be overridden if not | ||||
sufficient. (For now, all indexers use only one tool) | sufficient. (For now, all indexers use only one tool) | ||||
Expects the self.config['tools'] property to be set with | Expects the self.config['tools'] property to be set with | ||||
one or more tools. | one or more tools. | ||||
Args: | Args: | ||||
tools (dict/[dict]): Either a dict or a list of dict. | tools: Either a dict or a list of dict. | ||||
Returns: | Returns: | ||||
list: List of dicts with additional id key. | list: List of dicts with additional id key. | ||||
Raises: | Raises: | ||||
ValueError: if not a list nor a dict. | ValueError: if not a list nor a dict. | ||||
""" | """ | ||||
if isinstance(tools, list): | if isinstance(tools, list): | ||||
tools = list(map(self._prepare_tool, tools)) | tools = list(map(self._prepare_tool, tools)) | ||||
elif isinstance(tools, dict): | elif isinstance(tools, dict): | ||||
tools = [self._prepare_tool(tools)] | tools = [self._prepare_tool(tools)] | ||||
else: | else: | ||||
raise ValueError('Configuration tool(s) must be a dict or list!') | raise ValueError('Configuration tool(s) must be a dict or list!') | ||||
if tools: | if tools: | ||||
return self.idx_storage.indexer_configuration_add(tools) | return self.idx_storage.indexer_configuration_add(tools) | ||||
else: | else: | ||||
return [] | return [] | ||||
def index(self, id, data): | def index( | ||||
self, id: bytes, data: bytes | |||||
) -> Dict[str, Any]: | |||||
"""Index computation for the id and associated raw data. | """Index computation for the id and associated raw data. | ||||
Args: | Args: | ||||
id (bytes): identifier | id: identifier | ||||
data (bytes): id's data from storage or objstorage depending on | data: id's data from storage or objstorage depending on | ||||
object type | object type | ||||
Returns: | Returns: | ||||
dict: a dict that makes sense for the | dict: a dict that makes sense for the | ||||
:meth:`.persist_index_computations` method. | :meth:`.persist_index_computations` method. | ||||
""" | """ | ||||
raise NotImplementedError() | raise NotImplementedError() | ||||
def filter(self, ids): | def filter(self, ids: List[bytes]) -> Generator[bytes, None, None]: | ||||
Done Inline Actionsit yields bytes vlorentz: it yields bytes | |||||
"""Filter missing ids for that particular indexer. | """Filter missing ids for that particular indexer. | ||||
Args: | Args: | ||||
ids ([bytes]): list of ids | ids: list of ids | ||||
Yields: | Yields: | ||||
iterator of missing ids | iterator of missing ids | ||||
""" | """ | ||||
yield from ids | yield from ids | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def persist_index_computations(self, results, policy_update): | def persist_index_computations(self, results, policy_update): | ||||
"""Persist the computation resulting from the index. | """Persist the computation resulting from the index. | ||||
Args: | Args: | ||||
results ([result]): List of results. One result is the | results ([result]): List of results. One result is the | ||||
result of the index function. | result of the index function. | ||||
policy_update ([str]): either 'update-dups' or 'ignore-dups' to | policy_update ([str]): either 'update-dups' or 'ignore-dups' to | ||||
respectively update duplicates or ignore them | respectively update duplicates or ignore them | ||||
Returns: | Returns: | ||||
None | None | ||||
""" | """ | ||||
pass | pass | ||||
def next_step(self, results, task): | def next_step( | ||||
self, results: List[Dict], task: Optional[Dict[str, Any]] | |||||
) -> None: | |||||
"""Do something else with computations results (e.g. send to another | """Do something else with computations results (e.g. send to another | ||||
queue, ...). | queue, ...). | ||||
(This is not an abstractmethod since it is optional). | (This is not an abstractmethod since it is optional). | ||||
Args: | Args: | ||||
results ([result]): List of results (dict) as returned | results: List of results (dict) as returned | ||||
by index function. | by index function. | ||||
task (dict): a dict in the form expected by | task: a dict in the form expected by | ||||
`scheduler.backend.SchedulerBackend.create_tasks` | `scheduler.backend.SchedulerBackend.create_tasks` | ||||
without `next_run`, plus an optional `result_name` key. | without `next_run`, plus an optional `result_name` key. | ||||
Returns: | Returns: | ||||
None | None | ||||
""" | """ | ||||
if task: | if task: | ||||
▲ Show 20 Lines • Show All 94 Lines • ▼ Show 20 Lines | class ContentRangeIndexer(BaseIndexer): | ||||
To work on a list of ids, use the :class:`ContentIndexer` instead. | To work on a list of ids, use the :class:`ContentIndexer` instead. | ||||
Note: :class:`ContentRangeIndexer` is not an instantiable | Note: :class:`ContentRangeIndexer` is not an instantiable | ||||
object. To use it, one should inherit from this class and override | object. To use it, one should inherit from this class and override | ||||
the methods mentioned in the :class:`BaseIndexer` class. | the methods mentioned in the :class:`BaseIndexer` class. | ||||
""" | """ | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def indexed_contents_in_range(self, start, end): | def indexed_contents_in_range( | ||||
self, start: bytes, end: bytes | |||||
) -> Any: | |||||
"""Retrieve indexed contents within range [start, end]. | """Retrieve indexed contents within range [start, end]. | ||||
Args: | Args: | ||||
start (bytes): Starting bound from range identifier | start: Starting bound from range identifier | ||||
end (bytes): End range identifier | end: End range identifier | ||||
Yields: | Yields: | ||||
bytes: Content identifier present in the range ``[start, end]`` | bytes: Content identifier present in the range ``[start, end]`` | ||||
""" | """ | ||||
pass | pass | ||||
def _list_contents_to_index(self, start, end, indexed): | def _list_contents_to_index( | ||||
self, start: bytes, end: bytes, indexed: Set[bytes] | |||||
) -> Generator[bytes, None, None]: | |||||
"""Compute from storage the new contents to index in the range [start, | """Compute from storage the new contents to index in the range [start, | ||||
end]. The already indexed contents are skipped. | end]. The already indexed contents are skipped. | ||||
Args: | Args: | ||||
start (bytes): Starting bound from range identifier | start: Starting bound from range identifier | ||||
end (bytes): End range identifier | end: End range identifier | ||||
indexed (Set[bytes]): Set of content already indexed. | indexed: Set of content already indexed. | ||||
Yields: | Yields: | ||||
bytes: Identifier of contents to index. | bytes: Identifier of contents to index. | ||||
""" | """ | ||||
if not isinstance(start, bytes) or not isinstance(end, bytes): | if not isinstance(start, bytes) or not isinstance(end, bytes): | ||||
raise TypeError('identifiers must be bytes, not %r and %r.' % | raise TypeError('identifiers must be bytes, not %r and %r.' % | ||||
(start, end)) | (start, end)) | ||||
while start: | while start: | ||||
result = self.storage.content_get_range(start, end) | result = self.storage.content_get_range(start, end) | ||||
contents = result['contents'] | contents = result['contents'] | ||||
for c in contents: | for c in contents: | ||||
_id = hashutil.hash_to_bytes(c['sha1']) | _id = hashutil.hash_to_bytes(c['sha1']) | ||||
if _id in indexed: | if _id in indexed: | ||||
continue | continue | ||||
yield _id | yield _id | ||||
start = result['next'] | start = result['next'] | ||||
def _index_contents(self, start, end, indexed, **kwargs): | def _index_contents( | ||||
self, start: bytes, end: bytes, indexed: Set[bytes], **kwargs: Any | |||||
Done Inline Actionsit yields dicts vlorentz: it yields dicts | |||||
Done Inline ActionsSendType should be None, not Any. https://docs.python.org/3/library/typing.html#typing.Generator vlorentz: SendType should be `None`, not `Any`. https://docs.python.org/3/library/typing.html#typing. | |||||
) -> Generator[Dict, None, None]: | |||||
"""Index the contents from within range [start, end] | """Index the contents from within range [start, end] | ||||
Args: | Args: | ||||
start (bytes): Starting bound from range identifier | start: Starting bound from range identifier | ||||
end (bytes): End range identifier | end: End range identifier | ||||
indexed (Set[bytes]): Set of content already indexed. | indexed: Set of content already indexed. | ||||
Yields: | Yields: | ||||
dict: Data indexed to persist using the indexer storage | dict: Data indexed to persist using the indexer storage | ||||
""" | """ | ||||
for sha1 in self._list_contents_to_index(start, end, indexed): | for sha1 in self._list_contents_to_index(start, end, indexed): | ||||
try: | try: | ||||
raw_content = self.objstorage.get(sha1) | raw_content = self.objstorage.get(sha1) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
self.log.warning('Content %s not found in objstorage' % | self.log.warning('Content %s not found in objstorage' % | ||||
hashutil.hash_to_hex(sha1)) | hashutil.hash_to_hex(sha1)) | ||||
continue | continue | ||||
res = self.index(sha1, raw_content, **kwargs) | res = self.index(sha1, raw_content, **kwargs) # type: ignore | ||||
Done Inline ActionsNot sure which overriden index() is being called here krithikvaidya: Not sure which overriden index() is being called here | |||||
Done Inline Actionsyeah, it's fine for now [1] The one that matters here is the main class BaseIndex's index definition which you typed. [1] indexer has grown a bit and after a while and getting back to it, it's a tad unclear. ardumont: yeah, it's fine for now [1]
The one that matters here is the main class BaseIndex's index… | |||||
Done Inline ActionsAh I see krithikvaidya: Ah I see | |||||
if res: | if res: | ||||
if not isinstance(res['id'], bytes): | if not isinstance(res['id'], bytes): | ||||
raise TypeError( | raise TypeError( | ||||
'%r.index should return ids as bytes, not %r' % | '%r.index should return ids as bytes, not %r' % | ||||
(self.__class__.__name__, res['id'])) | (self.__class__.__name__, res['id'])) | ||||
yield res | yield res | ||||
def _index_with_skipping_already_done(self, start, end): | def _index_with_skipping_already_done( | ||||
self, start: bytes, end: bytes | |||||
Done Inline Actionsyields bytes vlorentz: yields bytes | |||||
Done Inline ActionsI'm not sure that this is the case, since this method yields from _index_contents(), and _index_contents() yields Dicts. krithikvaidya: I'm not sure that this is the case, since this method //yields from// _index_contents(), and… | |||||
Done Inline ActionsThen you should also fix the docstring vlorentz: Then you should also fix the docstring | |||||
Done Inline Actionssame comment on SentType vlorentz: same comment on SentType | |||||
) -> Generator[Dict, None, None]: | |||||
"""Index not already indexed contents in range [start, end]. | """Index not already indexed contents in range [start, end]. | ||||
Args: | Args: | ||||
start** (Union[bytes, str]): Starting range identifier | start: Starting range identifier | ||||
end (Union[bytes, str]): Ending range identifier | end: Ending range identifier | ||||
Done Inline Actionsremove the ** ardumont: remove the `**` | |||||
Yields: | Yields: | ||||
bytes: Content identifier present in the range | dict: Content identifier present in the range | ||||
``[start, end]`` which are not already indexed. | ``[start, end]`` which are not already indexed. | ||||
""" | """ | ||||
while start: | while start: | ||||
indexed_page = self.indexed_contents_in_range(start, end) | indexed_page = self.indexed_contents_in_range(start, end) | ||||
contents = indexed_page['ids'] | contents = indexed_page['ids'] | ||||
_end = contents[-1] if contents else end | _end = contents[-1] if contents else end | ||||
yield from self._index_contents( | yield from self._index_contents( | ||||
▲ Show 20 Lines • Show All 73 Lines • ▼ Show 20 Lines | def run(self, origin_urls, policy_update='update-dups', | ||||
""" | """ | ||||
results = self.index_list(origin_urls, **kwargs) | results = self.index_list(origin_urls, **kwargs) | ||||
self.persist_index_computations(results, policy_update) | self.persist_index_computations(results, policy_update) | ||||
self.results = results | self.results = results | ||||
return self.next_step(results, task=next_step) | return self.next_step(results, task=next_step) | ||||
def index_list(self, origins, **kwargs): | def index_list(self, origins: List[Any], **kwargs: Any) -> List[Dict]: | ||||
results = [] | results = [] | ||||
for origin in origins: | for origin in origins: | ||||
try: | try: | ||||
res = self.index(origin, **kwargs) | res = self.index(origin, **kwargs) | ||||
if res: # If no results, skip it | if res: # If no results, skip it | ||||
results.append(res) | results.append(res) | ||||
except Exception: | except Exception: | ||||
if not self.catch_exceptions: | if not self.catch_exceptions: | ||||
▲ Show 20 Lines • Show All 52 Lines • Show Last 20 Lines |
it yields strings