Page MenuHomeSoftware Heritage

rehash.py
No OneTemporary

rehash.py

# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
import itertools
from collections import defaultdict
from typing import Dict, Any, Tuple, List, Generator
from swh.core import utils
from swh.core.config import SWHConfig
from swh.model import hashutil
from swh.objstorage import get_objstorage
from swh.objstorage.exc import ObjNotFoundError
from swh.storage import get_storage
class RecomputeChecksums(SWHConfig):
"""Class in charge of (re)computing content's hashes.
Hashes to compute are defined across 2 configuration options:
compute_checksums ([str])
list of hash algorithms that
py:func:`swh.model.hashutil.MultiHash.from_data` function should
be able to deal with. For variable-length checksums, a desired
checksum length should also be provided. Their format is
<algorithm's name>:<variable-length> e.g: blake2:512
recompute_checksums (bool)
a boolean to notify that we also want to recompute potential existing
hashes specified in compute_checksums. Default to False.
"""
DEFAULT_CONFIG = {
# The storage to read from or update metadata to
'storage': ('dict', {
'cls': 'remote',
'args': {
'url': 'http://localhost:5002/'
},
}),
# The objstorage to read contents' data from
'objstorage': ('dict', {
'cls': 'pathslicing',
'args': {
'root': '/srv/softwareheritage/objects',
'slicing': '0:2/2:4/4:6',
},
}),
# the set of checksums that should be computed.
# Examples: 'sha1_git', 'blake2b512', 'blake2s256'
'compute_checksums': (
'list[str]', []),
# whether checksums that already exist in the DB should be
# recomputed/updated or left untouched
'recompute_checksums': ('bool', False),
# Number of contents to retrieve blobs at the same time
'batch_size_retrieve_content': ('int', 10),
# Number of contents to update at the same time
'batch_size_update': ('int', 100),
}
CONFIG_BASE_FILENAME = 'indexer/rehash'
def __init__(self) -> None:
self.config = self.parse_config_file()
self.storage = get_storage(**self.config['storage'])
self.objstorage = get_objstorage(**self.config['objstorage'])
self.compute_checksums = self.config['compute_checksums']
self.recompute_checksums = self.config[
'recompute_checksums']
self.batch_size_retrieve_content = self.config[
'batch_size_retrieve_content']
self.batch_size_update = self.config[
'batch_size_update']
self.log = logging.getLogger('swh.indexer.rehash')
if not self.compute_checksums:
raise ValueError('Checksums list should not be empty.')
def _read_content_ids(
self, contents: List[Dict[str, Any]]
) -> Generator[bytes, Any, None]:
"""Read the content identifiers from the contents.
"""
for c in contents:
h = c['sha1']
if isinstance(h, str):
h = hashutil.hash_to_bytes(h)
yield h
def get_new_contents_metadata(
self, all_contents: List[Dict[str, Any]]
) -> Generator[Tuple[Dict[str, Any], List[Any]], Any, None]:
"""Retrieve raw contents and compute new checksums on the
contents. Unknown or corrupted contents are skipped.
Args:
all_contents: List of contents as dictionary with
the necessary primary keys
Yields:
tuple: tuple of (content to update, list of checksums computed)
"""
content_ids = self._read_content_ids(all_contents)
for contents in utils.grouper(content_ids,
self.batch_size_retrieve_content):
contents_iter = itertools.tee(contents, 2)
try:
content_metadata = self.storage.content_get_metadata(
[s for s in contents_iter[0]])
except Exception:
self.log.exception(
'Problem when reading contents metadata.')
continue
for content in content_metadata:
# Recompute checksums provided in compute_checksums options
if self.recompute_checksums:
checksums_to_compute = list(self.compute_checksums)
else:
# Compute checksums provided in compute_checksums
# options not already defined for that content
checksums_to_compute = [h for h in self.compute_checksums
if not content.get(h)]
if not checksums_to_compute: # Nothing to recompute
continue
try:
raw_content = self.objstorage.get(content['sha1'])
except ObjNotFoundError:
self.log.warning('Content %s not found in objstorage!' %
content['sha1'])
continue
content_hashes = hashutil.MultiHash.from_data(
raw_content, hash_names=checksums_to_compute).digest()
content.update(content_hashes)
yield content, checksums_to_compute
def run(self, contents: List[Dict[str, Any]]) -> Dict:
"""Given a list of content:
- (re)compute a given set of checksums on contents available in our
object storage
- update those contents with the new metadata
Args:
contents: contents as dictionary with necessary keys.
key present in such dictionary should be the ones defined in
the 'primary_key' option.
Returns:
A summary dict with key 'status', task' status and 'count' the
number of updated contents.
"""
status = 'uneventful'
count = 0
for data in utils.grouper(
self.get_new_contents_metadata(contents),
self.batch_size_update):
groups: Dict[str, List[Any]] = defaultdict(list)
for content, keys_to_update in data:
keys = ','.join(keys_to_update)
groups[keys].append(content)
for keys_to_update, contents in groups.items():
keys = keys_to_update.split(',')
try:
self.storage.content_update(contents,
keys=keys)
count += len(contents)
status = 'eventful'
except Exception:
self.log.exception('Problem during update.')
continue
return {
'status': status,
'count': count,
}

File Metadata

Mime Type
text/x-python
Expires
Jun 4 2025, 7:38 PM (10 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3366381

Event Timeline