Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7066320
D2622.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
22 KB
Subscribers
None
D2622.diff
View Options
diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py
--- a/swh/indexer/indexer.py
+++ b/swh/indexer/indexer.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2016-2018 The Software Heritage developers
+# Copyright (C) 2016-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -12,7 +12,8 @@
from copy import deepcopy
from contextlib import contextmanager
-from typing import Any, Dict, Tuple
+from typing import Any, Dict, Tuple, Generator, Union, List, Optional
+from typing import Set
from swh.scheduler import get_scheduler
from swh.scheduler import CONFIG as SWH_CONFIG
@@ -27,13 +28,17 @@
@contextmanager
-def write_to_temp(filename, data, working_directory):
+def write_to_temp(
+ filename: str, data: bytes, working_directory: str
+) -> Generator[str, None, None]:
"""Write the sha1's content in a temporary file.
Args:
- filename (str): one of sha1's many filenames
- data (bytes): the sha1's content to write in temporary
+ filename: one of sha1's many filenames
+ data: the sha1's content to write in temporary
file
+ working_directory: the directory into which the
+ file is written
Returns:
The path to the temporary file created. That file is
@@ -103,6 +108,8 @@
filtering.
"""
+ results: List[Dict]
+
CONFIG = 'indexer/base'
DEFAULT_CONFIG = {
@@ -134,7 +141,9 @@
"""Prevents exceptions in `index()` from raising too high. Set to False
in tests to properly catch all exceptions."""
- def __init__(self, config=None, **kw):
+ scheduler: Any
+
+ def __init__(self, config=None, **kw) -> None:
"""Prepare and check that the indexer is ready to run.
"""
@@ -155,7 +164,7 @@
self.check()
self.log.debug('%s: config=%s', self, self.config)
- def prepare(self):
+ def prepare(self) -> None:
"""Prepare the indexer's needed runtime configuration.
Without this step, the indexer cannot possibly run.
@@ -181,10 +190,10 @@
self.results = []
@property
- def tool(self):
+ def tool(self) -> Dict:
return self.tools[0]
- def check(self):
+ def check(self) -> None:
"""Check the indexer's configuration is ok before proceeding.
If ok, does nothing. If not raise error.
@@ -193,13 +202,15 @@
raise ValueError('Tools %s is unknown, cannot continue' %
self.tools)
- def _prepare_tool(self, tool):
+ def _prepare_tool(self, tool: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare the tool dict to be compliant with the storage api.
"""
return {'tool_%s' % key: value for key, value in tool.items()}
- def register_tools(self, tools):
+ def register_tools(
+ self, tools: Union[Dict[str, Any], List[Dict[str, Any]]]
+ ) -> List[Dict[str, Any]]:
"""Permit to register tools to the storage.
Add a sensible default which can be overridden if not
@@ -209,7 +220,7 @@
one or more tools.
Args:
- tools (dict/[dict]): Either a dict or a list of dict.
+ tools: Either a dict or a list of dict.
Returns:
list: List of dicts with additional id key.
@@ -230,12 +241,14 @@
else:
return []
- def index(self, id, data):
+ def index(
+ self, id: bytes, data: bytes
+ ) -> Dict[str, Any]:
"""Index computation for the id and associated raw data.
Args:
- id (bytes): identifier
- data (bytes): id's data from storage or objstorage depending on
+ id: identifier
+ data: id's data from storage or objstorage depending on
object type
Returns:
@@ -245,11 +258,11 @@
"""
raise NotImplementedError()
- def filter(self, ids):
+ def filter(self, ids: List[bytes]) -> Generator[bytes, None, None]:
"""Filter missing ids for that particular indexer.
Args:
- ids ([bytes]): list of ids
+ ids: list of ids
Yields:
iterator of missing ids
@@ -274,16 +287,18 @@
"""
pass
- def next_step(self, results, task):
+ def next_step(
+ self, results: List[Dict], task: Optional[Dict[str, Any]]
+ ) -> None:
"""Do something else with computations results (e.g. send to another
queue, ...).
(This is not an abstractmethod since it is optional).
Args:
- results ([result]): List of results (dict) as returned
+ results: List of results (dict) as returned
by index function.
- task (dict): a dict in the form expected by
+ task: a dict in the form expected by
`scheduler.backend.SchedulerBackend.create_tasks`
without `next_run`, plus an optional `result_name` key.
@@ -394,12 +409,14 @@
"""
@abc.abstractmethod
- def indexed_contents_in_range(self, start, end):
+ def indexed_contents_in_range(
+ self, start: bytes, end: bytes
+ ) -> Any:
"""Retrieve indexed contents within range [start, end].
Args:
- start (bytes): Starting bound from range identifier
- end (bytes): End range identifier
+ start: Starting bound from range identifier
+ end: End range identifier
Yields:
bytes: Content identifier present in the range ``[start, end]``
@@ -407,14 +424,16 @@
"""
pass
- def _list_contents_to_index(self, start, end, indexed):
+ def _list_contents_to_index(
+ self, start: bytes, end: bytes, indexed: Set[bytes]
+ ) -> Generator[bytes, None, None]:
"""Compute from storage the new contents to index in the range [start,
end]. The already indexed contents are skipped.
Args:
- start (bytes): Starting bound from range identifier
- end (bytes): End range identifier
- indexed (Set[bytes]): Set of content already indexed.
+ start: Starting bound from range identifier
+ end: End range identifier
+ indexed: Set of content already indexed.
Yields:
bytes: Identifier of contents to index.
@@ -433,13 +452,15 @@
yield _id
start = result['next']
- def _index_contents(self, start, end, indexed, **kwargs):
+ def _index_contents(
+ self, start: bytes, end: bytes, indexed: Set[bytes], **kwargs: Any
+ ) -> Generator[Dict, None, None]:
"""Index the contents from within range [start, end]
Args:
- start (bytes): Starting bound from range identifier
- end (bytes): End range identifier
- indexed (Set[bytes]): Set of content already indexed.
+ start: Starting bound from range identifier
+ end: End range identifier
+ indexed: Set of content already indexed.
Yields:
dict: Data indexed to persist using the indexer storage
@@ -452,7 +473,7 @@
self.log.warning('Content %s not found in objstorage' %
hashutil.hash_to_hex(sha1))
continue
- res = self.index(sha1, raw_content, **kwargs)
+ res = self.index(sha1, raw_content, **kwargs) # type: ignore
if res:
if not isinstance(res['id'], bytes):
raise TypeError(
@@ -460,15 +481,17 @@
(self.__class__.__name__, res['id']))
yield res
- def _index_with_skipping_already_done(self, start, end):
+ def _index_with_skipping_already_done(
+ self, start: bytes, end: bytes
+ ) -> Generator[Dict, None, None]:
"""Index not already indexed contents in range [start, end].
Args:
- start** (Union[bytes, str]): Starting range identifier
- end (Union[bytes, str]): Ending range identifier
+ start: Starting range identifier
+ end: Ending range identifier
Yields:
- bytes: Content identifier present in the range
+ dict: Content identifier present in the range
``[start, end]`` which are not already indexed.
"""
@@ -558,7 +581,7 @@
self.results = results
return self.next_step(results, task=next_step)
- def index_list(self, origins, **kwargs):
+ def index_list(self, origins: List[Any], **kwargs: Any) -> List[Dict]:
results = []
for origin in origins:
try:
diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py
--- a/swh/indexer/metadata.py
+++ b/swh/indexer/metadata.py
@@ -1,10 +1,12 @@
-# Copyright (C) 2017-2018 The Software Heritage developers
+# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from copy import deepcopy
+from typing import Any, List, Dict, Tuple, Callable, Generator
+
from swh.core.utils import grouper
from swh.indexer.codemeta import merge_documents
@@ -21,7 +23,10 @@
ORIGIN_GET_BATCH_SIZE = 10
-def call_with_batches(f, args, batch_size):
+def call_with_batches(
+ f: Callable[[List[Dict[str, Any]]], Dict['str', Any]],
+ args: List[Dict[str, str]], batch_size: int
+) -> Generator[str, None, None]:
"""Calls a function with batches of args, and concatenates the results.
"""
groups = grouper(args, batch_size)
@@ -82,15 +87,17 @@
return None
return result
- def persist_index_computations(self, results, policy_update):
+ def persist_index_computations(
+ self, results: List[Dict], policy_update: str
+ ) -> None:
"""Persist the results in storage.
Args:
- results ([dict]): list of content_metadata, dict with the
+ results: list of content_metadata, dict with the
following keys:
- id (bytes): content's identifier (sha1)
- metadata (jsonb): detected metadata
- policy_update ([str]): either 'update-dups' or 'ignore-dups' to
+ policy_update: either 'update-dups' or 'ignore-dups' to
respectively update duplicates or ignore them
"""
@@ -179,16 +186,18 @@
'Problem when indexing rev: %r', e)
return result
- def persist_index_computations(self, results, policy_update):
+ def persist_index_computations(
+ self, results: List[Dict], policy_update: str
+ ) -> None:
"""Persist the results in storage.
Args:
- results ([dict]): list of content_mimetype, dict with the
+ results: list of content_mimetype, dict with the
following keys:
- id (bytes): content's identifier (sha1)
- mimetype (bytes): mimetype in bytes
- encoding (bytes): encoding in bytes
- policy_update ([str]): either 'update-dups' or 'ignore-dups' to
+ policy_update: either 'update-dups' or 'ignore-dups' to
respectively update duplicates or ignore them
"""
@@ -198,13 +207,14 @@
results, conflict_update=(policy_update == 'update-dups'))
def translate_revision_intrinsic_metadata(
- self, detected_files, log_suffix):
+ self, detected_files: Dict[str, List[Any]], log_suffix: str
+ ) -> Tuple[List[Any], List[Any]]:
"""
Determine plan of action to translate metadata when containing
one or multiple detected files:
Args:
- detected_files (dict): dictionary mapping context names (e.g.,
+ detected_files: dictionary mapping context names (e.g.,
"npm", "authors") to list of sha1
Returns:
@@ -272,7 +282,7 @@
USE_TOOLS = False
- def __init__(self, config=None, **kwargs):
+ def __init__(self, config=None, **kwargs) -> None:
super().__init__(config=config, **kwargs)
self.origin_head_indexer = OriginHeadIndexer(config=config)
self.revision_metadata_indexer = RevisionMetadataIndexer(config=config)
@@ -313,14 +323,16 @@
results.append((orig_metadata, rev_metadata))
return results
- def persist_index_computations(self, results, policy_update):
+ def persist_index_computations(
+ self, results: List[Dict], policy_update: str
+ ) -> None:
conflict_update = (policy_update == 'update-dups')
# Deduplicate revisions
- rev_metadata = []
- orig_metadata = []
- revs_to_delete = []
- origs_to_delete = []
+ rev_metadata: List[Any] = []
+ orig_metadata: List[Any] = []
+ revs_to_delete: List[Any] = []
+ origs_to_delete: List[Any] = []
for (orig_item, rev_item) in results:
assert rev_item['metadata'] == orig_item['metadata']
if not rev_item['metadata'] or \
diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py
--- a/swh/indexer/mimetype.py
+++ b/swh/indexer/mimetype.py
@@ -1,12 +1,11 @@
-# Copyright (C) 2016-2018 The Software Heritage developers
+# Copyright (C) 2016-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from typing import Optional, Dict, Any, List
import magic
-from typing import Optional
-
from .indexer import ContentIndexer, ContentRangeIndexer
if not hasattr(magic.Magic, 'from_buffer'):
@@ -15,15 +14,14 @@
'was imported instead.')
-def compute_mimetype_encoding(raw_content):
+def compute_mimetype_encoding(raw_content: bytes) -> Dict[str, bytes]:
"""Determine mimetype and encoding from the raw content.
Args:
- raw_content (bytes): content's raw data
+ raw_content: content's raw data
Returns:
- dict: mimetype and encoding key and corresponding values
- (as bytes).
+ dict: mimetype and encoding key and corresponding values.
"""
m = magic.Magic(mime=True, mime_encoding=True)
@@ -41,6 +39,8 @@
See :class:`MimetypeIndexer` and :class:`MimetypeRangeIndexer`
"""
+ tool: Dict[str, Any]
+ idx_storage: Any
ADDITIONAL_CONFIG = {
'tools': ('dict', {
'name': 'file',
@@ -55,36 +55,38 @@
CONFIG_BASE_FILENAME = 'indexer/mimetype' # type: Optional[str]
- def index(self, id, data):
+ def index(self, id: bytes, data: bytes) -> Dict[str, Any]:
"""Index sha1s' content and store result.
Args:
- id (bytes): content's identifier
- data (bytes): raw content in bytes
+ id: content's identifier
+ data: raw content in bytes
Returns:
dict: content's mimetype; dict keys being
- - **id** (bytes): content's identifier (sha1)
- - **mimetype** (bytes): mimetype in bytes
- - **encoding** (bytes): encoding in bytes
+ - id: content's identifier (sha1)
+ - mimetype: mimetype in bytes
+ - encoding: encoding in bytes
"""
properties = compute_mimetype_encoding(data)
properties.update({
'id': id,
'indexer_configuration_id': self.tool['id'],
- })
+ })
return properties
- def persist_index_computations(self, results, policy_update):
+ def persist_index_computations(
+ self, results: List[Dict], policy_update: List[str]
+ ) -> None:
"""Persist the results in storage.
Args:
- results ([dict]): list of content's mimetype dicts
+ results: list of content's mimetype dicts
(see :meth:`.index`)
- policy_update ([str]): either 'update-dups' or 'ignore-dups' to
+ policy_update: either 'update-dups' or 'ignore-dups' to
respectively update duplicates or ignore them
"""
@@ -128,18 +130,21 @@
- stores result in storage
"""
- def indexed_contents_in_range(self, start, end):
+
+ def indexed_contents_in_range(
+ self, start: bytes, end: bytes
+ ) -> Dict[str, Optional[bytes]]:
"""Retrieve indexed content id within range [start, end].
Args:
- start (bytes): Starting bound from range identifier
- end (bytes): End range identifier
+ start: Starting bound from range identifier
+ end: End range identifier
Returns:
dict: a dict with keys:
- - **ids** [bytes]: iterable of content ids within the range.
- - **next** (Optional[bytes]): The next range of sha1 starts at
+ - ids: iterable of content ids within the range.
+ - next: The next range of sha1 starts at
this sha1 if any
"""
diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py
--- a/swh/indexer/origin_head.py
+++ b/swh/indexer/origin_head.py
@@ -1,8 +1,10 @@
-# Copyright (C) 2018 The Software Heritage developers
+# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from typing import List, Tuple, Any, Dict, Union
+
import re
import click
import logging
@@ -20,7 +22,9 @@
USE_TOOLS = False
- def persist_index_computations(self, results, policy_update):
+ def persist_index_computations(
+ self, results: Any, policy_update: str
+ ) -> None:
"""Do nothing. The indexer's results are not persistent, they
should only be piped to another indexer."""
pass
@@ -58,7 +62,9 @@
rb'$')
@classmethod
- def _parse_version(cls, filename):
+ def _parse_version(
+ cls: Any, filename: str
+ ) -> Tuple[Union[float, int], ...]:
"""Extracts the release version from an archive filename,
to get an ordering whose maximum is likely to be the last
version of the software
@@ -92,7 +98,7 @@
assert False, res.group('preversion')
return tuple(version)
- def _try_get_ftp_head(self, snapshot):
+ def _try_get_ftp_head(self, snapshot: Dict[str, Any]) -> Any:
archive_names = list(snapshot['branches'])
max_archive_name = max(archive_names, key=self._parse_version)
r = self._try_resolve_target(snapshot['branches'], max_archive_name)
@@ -100,7 +106,9 @@
# Generic
- def _try_get_head_generic(self, snapshot):
+ def _try_get_head_generic(
+ self, snapshot: Dict[str, Any]
+ ) -> Any:
# Works on 'deposit', 'pypi', and VCSs.
try:
branches = snapshot['branches']
@@ -112,7 +120,7 @@
self._try_resolve_target(branches, b'master')
)
- def _try_resolve_target(self, branches, target_name):
+ def _try_resolve_target(self, branches: Dict, target_name: bytes) -> Any:
try:
target = branches[target_name]
if target is None:
@@ -140,7 +148,7 @@
@click.option('--origins', '-i',
help='Origins to lookup, in the "type+url" format',
multiple=True)
-def main(origins):
+def main(origins: List[str]) -> None:
rev_metadata_indexer = OriginHeadIndexer()
rev_metadata_indexer.run(origins)
diff --git a/swh/indexer/rehash.py b/swh/indexer/rehash.py
--- a/swh/indexer/rehash.py
+++ b/swh/indexer/rehash.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2017-2018 The Software Heritage developers
+# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -7,6 +7,7 @@
import itertools
from collections import defaultdict
+from typing import Dict, Any, Tuple, List, Generator
from swh.core import utils
from swh.core.config import SWHConfig
@@ -64,7 +65,7 @@
CONFIG_BASE_FILENAME = 'indexer/rehash'
- def __init__(self):
+ def __init__(self) -> None:
self.config = self.parse_config_file()
self.storage = get_storage(**self.config['storage'])
self.objstorage = get_objstorage(**self.config['objstorage'])
@@ -80,7 +81,9 @@
if not self.compute_checksums:
raise ValueError('Checksums list should not be empty.')
- def _read_content_ids(self, contents):
+ def _read_content_ids(
+ self, contents: List[Dict[str, Any]]
+ ) -> Generator[bytes, Any, None]:
"""Read the content identifiers from the contents.
"""
@@ -91,14 +94,15 @@
yield h
- def get_new_contents_metadata(self, all_contents):
+ def get_new_contents_metadata(
+ self, all_contents: List[Dict[str, Any]]
+ ) -> Generator[Tuple[Dict[str, Any], List[Any]], Any, None]:
"""Retrieve raw contents and compute new checksums on the
contents. Unknown or corrupted contents are skipped.
Args:
- all_contents ([dict]): List of contents as dictionary with
+ all_contents: List of contents as dictionary with
the necessary primary keys
- checksum_algorithms ([str]): List of checksums to compute
Yields:
tuple: tuple of (content to update, list of checksums computed)
@@ -141,7 +145,7 @@
content.update(content_hashes)
yield content, checksums_to_compute
- def run(self, contents):
+ def run(self, contents: List[Dict[str, Any]]) -> None:
"""Given a list of content:
- (re)compute a given set of checksums on contents available in our
@@ -149,7 +153,7 @@
- update those contents with the new metadata
Args:
- contents (dict): contents as dictionary with necessary keys.
+ contents: contents as dictionary with necessary keys.
key present in such dictionary should be the ones defined in
the 'primary_key' option.
@@ -158,7 +162,7 @@
self.get_new_contents_metadata(contents),
self.batch_size_update):
- groups = defaultdict(list)
+ groups: Dict[str, List[Any]] = defaultdict(list)
for content, keys_to_update in data:
keys = ','.join(keys_to_update)
groups[keys].append(content)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Nov 5 2024, 5:46 AM (10 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222514
Attached To
D2622: Add type annotations to indexer classes
Event Timeline
Log In to Comment