Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/rehash.py
# Copyright (C) 2017-2018 The Software Heritage developers | # Copyright (C) 2017-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
import itertools | import itertools | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from typing import Dict, Any, Tuple, List, Generator | |||||
from swh.core import utils | from swh.core import utils | ||||
from swh.core.config import SWHConfig | from swh.core.config import SWHConfig | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | DEFAULT_CONFIG = { | ||||
# Number of contents to retrieve blobs at the same time | # Number of contents to retrieve blobs at the same time | ||||
'batch_size_retrieve_content': ('int', 10), | 'batch_size_retrieve_content': ('int', 10), | ||||
# Number of contents to update at the same time | # Number of contents to update at the same time | ||||
'batch_size_update': ('int', 100), | 'batch_size_update': ('int', 100), | ||||
} | } | ||||
CONFIG_BASE_FILENAME = 'indexer/rehash' | CONFIG_BASE_FILENAME = 'indexer/rehash' | ||||
def __init__(self): | def __init__(self) -> None: | ||||
self.config = self.parse_config_file() | self.config = self.parse_config_file() | ||||
self.storage = get_storage(**self.config['storage']) | self.storage = get_storage(**self.config['storage']) | ||||
self.objstorage = get_objstorage(**self.config['objstorage']) | self.objstorage = get_objstorage(**self.config['objstorage']) | ||||
self.compute_checksums = self.config['compute_checksums'] | self.compute_checksums = self.config['compute_checksums'] | ||||
self.recompute_checksums = self.config[ | self.recompute_checksums = self.config[ | ||||
'recompute_checksums'] | 'recompute_checksums'] | ||||
self.batch_size_retrieve_content = self.config[ | self.batch_size_retrieve_content = self.config[ | ||||
'batch_size_retrieve_content'] | 'batch_size_retrieve_content'] | ||||
self.batch_size_update = self.config[ | self.batch_size_update = self.config[ | ||||
'batch_size_update'] | 'batch_size_update'] | ||||
self.log = logging.getLogger('swh.indexer.rehash') | self.log = logging.getLogger('swh.indexer.rehash') | ||||
if not self.compute_checksums: | if not self.compute_checksums: | ||||
raise ValueError('Checksums list should not be empty.') | raise ValueError('Checksums list should not be empty.') | ||||
def _read_content_ids(self, contents): | def _read_content_ids( | ||||
self, contents: List[Dict[str, Any]] | |||||
vlorentz: Generator | |||||
) -> Generator[bytes, Any, None]: | |||||
"""Read the content identifiers from the contents. | """Read the content identifiers from the contents. | ||||
""" | """ | ||||
for c in contents: | for c in contents: | ||||
h = c['sha1'] | h = c['sha1'] | ||||
if isinstance(h, str): | if isinstance(h, str): | ||||
h = hashutil.hash_to_bytes(h) | h = hashutil.hash_to_bytes(h) | ||||
yield h | yield h | ||||
def get_new_contents_metadata(self, all_contents): | def get_new_contents_metadata( | ||||
self, all_contents: List[Dict[str, Any]] | |||||
) -> Generator[Tuple[Dict[str, Any], List[Any]], Any, None]: | |||||
"""Retrieve raw contents and compute new checksums on the | """Retrieve raw contents and compute new checksums on the | ||||
contents. Unknown or corrupted contents are skipped. | contents. Unknown or corrupted contents are skipped. | ||||
Args: | Args: | ||||
all_contents ([dict]): List of contents as dictionary with | all_contents: List of contents as dictionary with | ||||
the necessary primary keys | the necessary primary keys | ||||
checksum_algorithms ([str]): List of checksums to compute | |||||
Yields: | Yields: | ||||
tuple: tuple of (content to update, list of checksums computed) | tuple: tuple of (content to update, list of checksums computed) | ||||
""" | """ | ||||
content_ids = self._read_content_ids(all_contents) | content_ids = self._read_content_ids(all_contents) | ||||
for contents in utils.grouper(content_ids, | for contents in utils.grouper(content_ids, | ||||
self.batch_size_retrieve_content): | self.batch_size_retrieve_content): | ||||
Show All 26 Lines | ) -> Generator[Tuple[Dict[str, Any], List[Any]], Any, None]: | ||||
content['sha1']) | content['sha1']) | ||||
continue | continue | ||||
content_hashes = hashutil.MultiHash.from_data( | content_hashes = hashutil.MultiHash.from_data( | ||||
raw_content, hash_names=checksums_to_compute).digest() | raw_content, hash_names=checksums_to_compute).digest() | ||||
content.update(content_hashes) | content.update(content_hashes) | ||||
yield content, checksums_to_compute | yield content, checksums_to_compute | ||||
def run(self, contents): | def run(self, contents: List[Dict[str, Any]]) -> None: | ||||
"""Given a list of content: | """Given a list of content: | ||||
- (re)compute a given set of checksums on contents available in our | - (re)compute a given set of checksums on contents available in our | ||||
object storage | object storage | ||||
- update those contents with the new metadata | - update those contents with the new metadata | ||||
Args: | Args: | ||||
contents (dict): contents as dictionary with necessary keys. | contents: contents as dictionary with necessary keys. | ||||
key present in such dictionary should be the ones defined in | key present in such dictionary should be the ones defined in | ||||
the 'primary_key' option. | the 'primary_key' option. | ||||
""" | """ | ||||
for data in utils.grouper( | for data in utils.grouper( | ||||
self.get_new_contents_metadata(contents), | self.get_new_contents_metadata(contents), | ||||
self.batch_size_update): | self.batch_size_update): | ||||
groups = defaultdict(list) | groups: Dict[str, List[Any]] = defaultdict(list) | ||||
for content, keys_to_update in data: | for content, keys_to_update in data: | ||||
keys = ','.join(keys_to_update) | keys = ','.join(keys_to_update) | ||||
groups[keys].append(content) | groups[keys].append(content) | ||||
for keys_to_update, contents in groups.items(): | for keys_to_update, contents in groups.items(): | ||||
keys = keys_to_update.split(',') | keys = keys_to_update.split(',') | ||||
try: | try: | ||||
self.storage.content_update(contents, | self.storage.content_update(contents, | ||||
keys=keys) | keys=keys) | ||||
except Exception: | except Exception: | ||||
self.log.exception('Problem during update.') | self.log.exception('Problem during update.') | ||||
continue | continue |
Generator