diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2018 The Software Heritage developers +# Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -12,7 +12,8 @@ from copy import deepcopy from contextlib import contextmanager -from typing import Any, Dict, Tuple +from typing import Any, Dict, Tuple, Generator, Union, List, Optional +from typing import Set from swh.scheduler import get_scheduler from swh.scheduler import CONFIG as SWH_CONFIG @@ -27,13 +28,17 @@ @contextmanager -def write_to_temp(filename, data, working_directory): +def write_to_temp( + filename: str, data: bytes, working_directory: Any +) -> Generator[str, Any, None]: """Write the sha1's content in a temporary file. Args: - filename (str): one of sha1's many filenames - data (bytes): the sha1's content to write in temporary + filename: one of sha1's many filenames + data: the sha1's content to write in temporary file + working_directory: the directory into which the + file is written Returns: The path to the temporary file created. That file is @@ -103,6 +108,8 @@ filtering. """ + results: List[Dict] + CONFIG = 'indexer/base' DEFAULT_CONFIG = { @@ -134,7 +141,9 @@ """Prevents exceptions in `index()` from raising too high. Set to False in tests to properly catch all exceptions.""" - def __init__(self, config=None, **kw): + scheduler: Any + + def __init__(self, config=None, **kw) -> None: """Prepare and check that the indexer is ready to run. """ @@ -155,7 +164,7 @@ self.check() self.log.debug('%s: config=%s', self, self.config) - def prepare(self): + def prepare(self) -> None: """Prepare the indexer's needed runtime configuration. Without this step, the indexer cannot possibly run. @@ -181,10 +190,10 @@ self.results = [] @property - def tool(self): + def tool(self) -> Dict: return self.tools[0] - def check(self): + def check(self) -> None: """Check the indexer's configuration is ok before proceeding. If ok, does nothing. If not raise error. @@ -193,13 +202,15 @@ raise ValueError('Tools %s is unknown, cannot continue' % self.tools) - def _prepare_tool(self, tool): + def _prepare_tool(self, tool: Dict[str, Any]) -> Dict[str, Any]: """Prepare the tool dict to be compliant with the storage api. """ return {'tool_%s' % key: value for key, value in tool.items()} - def register_tools(self, tools): + def register_tools( + self, tools: Union[Dict[str, Any], List[Dict[str, Any]]] + ) -> List[Dict[str, Any]]: """Permit to register tools to the storage. Add a sensible default which can be overridden if not @@ -209,7 +220,7 @@ one or more tools. Args: - tools (dict/[dict]): Either a dict or a list of dict. + tools: Either a dict or a list of dict. Returns: list: List of dicts with additional id key. @@ -230,12 +241,14 @@ else: return [] - def index(self, id, data): + def index( + self, id: bytes, data: bytes + ) -> Dict[str, Any]: """Index computation for the id and associated raw data. Args: - id (bytes): identifier - data (bytes): id's data from storage or objstorage depending on + id: identifier + data: id's data from storage or objstorage depending on object type Returns: @@ -245,11 +258,11 @@ """ raise NotImplementedError() - def filter(self, ids): + def filter(self, ids: List[bytes]) -> Generator[bytes, Any, None]: """Filter missing ids for that particular indexer. Args: - ids ([bytes]): list of ids + ids: list of ids Yields: iterator of missing ids @@ -274,16 +287,18 @@ """ pass - def next_step(self, results, task): + def next_step( + self, results: List[Dict], task: Optional[Dict[str, Any]] + ) -> None: """Do something else with computations results (e.g. send to another queue, ...). (This is not an abstractmethod since it is optional). Args: - results ([result]): List of results (dict) as returned + results: List of results (dict) as returned by index function. - task (dict): a dict in the form expected by + task: a dict in the form expected by `scheduler.backend.SchedulerBackend.create_tasks` without `next_run`, plus an optional `result_name` key. @@ -394,12 +409,14 @@ """ @abc.abstractmethod - def indexed_contents_in_range(self, start, end): + def indexed_contents_in_range( + self, start: bytes, end: bytes + ) -> Any: """Retrieve indexed contents within range [start, end]. Args: - start (bytes): Starting bound from range identifier - end (bytes): End range identifier + start: Starting bound from range identifier + end: End range identifier Yields: bytes: Content identifier present in the range ``[start, end]`` @@ -407,14 +424,16 @@ """ pass - def _list_contents_to_index(self, start, end, indexed): + def _list_contents_to_index( + self, start: bytes, end: bytes, indexed: Set[bytes] + ) -> Generator[bytes, Any, None]: """Compute from storage the new contents to index in the range [start, end]. The already indexed contents are skipped. Args: - start (bytes): Starting bound from range identifier - end (bytes): End range identifier - indexed (Set[bytes]): Set of content already indexed. + start: Starting bound from range identifier + end: End range identifier + indexed: Set of content already indexed. Yields: bytes: Identifier of contents to index. @@ -433,13 +452,15 @@ yield _id start = result['next'] - def _index_contents(self, start, end, indexed, **kwargs): + def _index_contents( + self, start: bytes, end: bytes, indexed: Set[bytes], **kwargs: Any + ) -> Generator[Dict, Any, None]: """Index the contents from within range [start, end] Args: - start (bytes): Starting bound from range identifier - end (bytes): End range identifier - indexed (Set[bytes]): Set of content already indexed. + start: Starting bound from range identifier + end: End range identifier + indexed: Set of content already indexed. Yields: dict: Data indexed to persist using the indexer storage @@ -452,7 +473,7 @@ self.log.warning('Content %s not found in objstorage' % hashutil.hash_to_hex(sha1)) continue - res = self.index(sha1, raw_content, **kwargs) + res = self.index(sha1, raw_content, **kwargs) # type: ignore if res: if not isinstance(res['id'], bytes): raise TypeError( @@ -460,15 +481,17 @@ (self.__class__.__name__, res['id'])) yield res - def _index_with_skipping_already_done(self, start, end): + def _index_with_skipping_already_done( + self, start: bytes, end: bytes + ) -> Generator[Dict, Any, None]: """Index not already indexed contents in range [start, end]. Args: - start** (Union[bytes, str]): Starting range identifier - end (Union[bytes, str]): Ending range identifier + start: Starting range identifier + end: Ending range identifier Yields: - bytes: Content identifier present in the range + dict: Content identifier present in the range ``[start, end]`` which are not already indexed. """ @@ -558,7 +581,7 @@ self.results = results return self.next_step(results, task=next_step) - def index_list(self, origins, **kwargs): + def index_list(self, origins: List[Any], **kwargs: Any) -> List[Dict]: results = [] for origin in origins: try: diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,10 +1,12 @@ -# Copyright (C) 2017-2018 The Software Heritage developers +# Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy +from typing import Any, List, Dict, Tuple, Callable, Generator + from swh.core.utils import grouper from swh.indexer.codemeta import merge_documents @@ -21,7 +23,10 @@ ORIGIN_GET_BATCH_SIZE = 10 -def call_with_batches(f, args, batch_size): +def call_with_batches( + f: Callable[[List[Dict[str, Any]]], Dict['str', Any]], + args: List[Dict[str, str]], batch_size: int +) -> Generator: """Calls a function with batches of args, and concatenates the results. """ groups = grouper(args, batch_size) @@ -82,15 +87,17 @@ return None return result - def persist_index_computations(self, results, policy_update): + def persist_index_computations( + self, results: List[Dict], policy_update: str + ) -> None: """Persist the results in storage. Args: - results ([dict]): list of content_metadata, dict with the + results: list of content_metadata, dict with the following keys: - id (bytes): content's identifier (sha1) - metadata (jsonb): detected metadata - policy_update ([str]): either 'update-dups' or 'ignore-dups' to + policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ @@ -179,16 +186,18 @@ 'Problem when indexing rev: %r', e) return result - def persist_index_computations(self, results, policy_update): + def persist_index_computations( + self, results: List[Dict], policy_update: str + ) -> None: """Persist the results in storage. Args: - results ([dict]): list of content_mimetype, dict with the + results: list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes - policy_update ([str]): either 'update-dups' or 'ignore-dups' to + policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ @@ -198,13 +207,14 @@ results, conflict_update=(policy_update == 'update-dups')) def translate_revision_intrinsic_metadata( - self, detected_files, log_suffix): + self, detected_files: Dict[str, List[Any]], log_suffix: str + ) -> Tuple[List[Any], List[Any]]: """ Determine plan of action to translate metadata when containing one or multiple detected files: Args: - detected_files (dict): dictionary mapping context names (e.g., + detected_files: dictionary mapping context names (e.g., "npm", "authors") to list of sha1 Returns: @@ -272,7 +282,7 @@ USE_TOOLS = False - def __init__(self, config=None, **kwargs): + def __init__(self, config=None, **kwargs) -> None: super().__init__(config=config, **kwargs) self.origin_head_indexer = OriginHeadIndexer(config=config) self.revision_metadata_indexer = RevisionMetadataIndexer(config=config) @@ -313,14 +323,16 @@ results.append((orig_metadata, rev_metadata)) return results - def persist_index_computations(self, results, policy_update): + def persist_index_computations( + self, results: List[Dict], policy_update: str + ) -> None: conflict_update = (policy_update == 'update-dups') # Deduplicate revisions - rev_metadata = [] - orig_metadata = [] - revs_to_delete = [] - origs_to_delete = [] + rev_metadata: List[Any] = [] + orig_metadata: List[Any] = [] + revs_to_delete: List[Any] = [] + origs_to_delete: List[Any] = [] for (orig_item, rev_item) in results: assert rev_item['metadata'] == orig_item['metadata'] if not rev_item['metadata'] or \ diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -1,12 +1,11 @@ -# Copyright (C) 2016-2018 The Software Heritage developers +# Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Optional, Dict, Any, List import magic -from typing import Optional - from .indexer import ContentIndexer, ContentRangeIndexer if not hasattr(magic.Magic, 'from_buffer'): @@ -15,15 +14,14 @@ 'was imported instead.') -def compute_mimetype_encoding(raw_content): +def compute_mimetype_encoding(raw_content: bytes) -> Dict[str, bytes]: """Determine mimetype and encoding from the raw content. Args: - raw_content (bytes): content's raw data + raw_content: content's raw data Returns: - dict: mimetype and encoding key and corresponding values - (as bytes). + dict: mimetype and encoding key and corresponding values. """ m = magic.Magic(mime=True, mime_encoding=True) @@ -41,6 +39,8 @@ See :class:`MimetypeIndexer` and :class:`MimetypeRangeIndexer` """ + tool: Dict[str, Any] + idx_storage: Any ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'file', @@ -55,36 +55,38 @@ CONFIG_BASE_FILENAME = 'indexer/mimetype' # type: Optional[str] - def index(self, id, data): + def index(self, id: bytes, data: bytes) -> Dict[str, Any]: """Index sha1s' content and store result. Args: - id (bytes): content's identifier - data (bytes): raw content in bytes + id: content's identifier + data: raw content in bytes Returns: dict: content's mimetype; dict keys being - - **id** (bytes): content's identifier (sha1) - - **mimetype** (bytes): mimetype in bytes - - **encoding** (bytes): encoding in bytes + - id: content's identifier (sha1) + - mimetype: mimetype in bytes + - encoding: encoding in bytes """ properties = compute_mimetype_encoding(data) properties.update({ 'id': id, 'indexer_configuration_id': self.tool['id'], - }) + }) return properties - def persist_index_computations(self, results, policy_update): + def persist_index_computations( + self, results: List[Dict], policy_update: List[str] + ) -> None: """Persist the results in storage. Args: - results ([dict]): list of content's mimetype dicts + results: list of content's mimetype dicts (see :meth:`.index`) - policy_update ([str]): either 'update-dups' or 'ignore-dups' to + policy_update: either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ @@ -128,18 +130,21 @@ - stores result in storage """ - def indexed_contents_in_range(self, start, end): + + def indexed_contents_in_range( + self, start: bytes, end: bytes + ) -> Dict[str, Optional[bytes]]: """Retrieve indexed content id within range [start, end]. Args: - start (bytes): Starting bound from range identifier - end (bytes): End range identifier + start: Starting bound from range identifier + end: End range identifier Returns: dict: a dict with keys: - - **ids** [bytes]: iterable of content ids within the range. - - **next** (Optional[bytes]): The next range of sha1 starts at + - ids: iterable of content ids within the range. + - next: The next range of sha1 starts at this sha1 if any """ diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py --- a/swh/indexer/origin_head.py +++ b/swh/indexer/origin_head.py @@ -1,8 +1,10 @@ -# Copyright (C) 2018 The Software Heritage developers +# Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import List, Tuple, Any, Dict, Union + import re import click import logging @@ -20,7 +22,9 @@ USE_TOOLS = False - def persist_index_computations(self, results, policy_update): + def persist_index_computations( + self, results: Any, policy_update: str + ) -> None: """Do nothing. The indexer's results are not persistent, they should only be piped to another indexer.""" pass @@ -58,7 +62,9 @@ rb'$') @classmethod - def _parse_version(cls, filename): + def _parse_version( + cls: Any, filename: str + ) -> Tuple[Union[float, int], ...]: """Extracts the release version from an archive filename, to get an ordering whose maximum is likely to be the last version of the software @@ -92,7 +98,7 @@ assert False, res.group('preversion') return tuple(version) - def _try_get_ftp_head(self, snapshot): + def _try_get_ftp_head(self, snapshot: Dict[str, Any]) -> Any: archive_names = list(snapshot['branches']) max_archive_name = max(archive_names, key=self._parse_version) r = self._try_resolve_target(snapshot['branches'], max_archive_name) @@ -100,7 +106,9 @@ # Generic - def _try_get_head_generic(self, snapshot): + def _try_get_head_generic( + self, snapshot: Dict[str, Any] + ) -> Any: # Works on 'deposit', 'pypi', and VCSs. try: branches = snapshot['branches'] @@ -112,7 +120,7 @@ self._try_resolve_target(branches, b'master') ) - def _try_resolve_target(self, branches, target_name): + def _try_resolve_target(self, branches: Dict, target_name: bytes) -> Any: try: target = branches[target_name] if target is None: @@ -140,7 +148,7 @@ @click.option('--origins', '-i', help='Origins to lookup, in the "type+url" format', multiple=True) -def main(origins): +def main(origins: List[str]) -> None: rev_metadata_indexer = OriginHeadIndexer() rev_metadata_indexer.run(origins) diff --git a/swh/indexer/rehash.py b/swh/indexer/rehash.py --- a/swh/indexer/rehash.py +++ b/swh/indexer/rehash.py @@ -1,4 +1,4 @@ -# Copyright (C) 2017-2018 The Software Heritage developers +# Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -7,6 +7,7 @@ import itertools from collections import defaultdict +from typing import Dict, Any, Tuple, List, Generator from swh.core import utils from swh.core.config import SWHConfig @@ -64,7 +65,7 @@ CONFIG_BASE_FILENAME = 'indexer/rehash' - def __init__(self): + def __init__(self) -> None: self.config = self.parse_config_file() self.storage = get_storage(**self.config['storage']) self.objstorage = get_objstorage(**self.config['objstorage']) @@ -80,7 +81,9 @@ if not self.compute_checksums: raise ValueError('Checksums list should not be empty.') - def _read_content_ids(self, contents): + def _read_content_ids( + self, contents: List[Dict[str, Any]] + ) -> Generator[bytes, Any, None]: """Read the content identifiers from the contents. """ @@ -91,14 +94,15 @@ yield h - def get_new_contents_metadata(self, all_contents): + def get_new_contents_metadata( + self, all_contents: List[Dict[str, Any]] + ) -> Generator[Tuple[Dict[str, Any], List[Any]], Any, None]: """Retrieve raw contents and compute new checksums on the contents. Unknown or corrupted contents are skipped. Args: - all_contents ([dict]): List of contents as dictionary with + all_contents: List of contents as dictionary with the necessary primary keys - checksum_algorithms ([str]): List of checksums to compute Yields: tuple: tuple of (content to update, list of checksums computed) @@ -141,7 +145,7 @@ content.update(content_hashes) yield content, checksums_to_compute - def run(self, contents): + def run(self, contents: List[Dict[str, Any]]) -> None: """Given a list of content: - (re)compute a given set of checksums on contents available in our @@ -149,7 +153,7 @@ - update those contents with the new metadata Args: - contents (dict): contents as dictionary with necessary keys. + contents: contents as dictionary with necessary keys. key present in such dictionary should be the ones defined in the 'primary_key' option. @@ -158,7 +162,7 @@ self.get_new_contents_metadata(contents), self.batch_size_update): - groups = defaultdict(list) + groups: Dict[str, List[Any]] = defaultdict(list) for content, keys_to_update in data: keys = ','.join(keys_to_update) groups[keys].append(content)