diff --git a/swh/indexer/producer.py b/swh/indexer/producer.py --- a/swh/indexer/producer.py +++ b/swh/indexer/producer.py @@ -9,7 +9,7 @@ from swh.core import utils from swh.model import hashutil -from swh.scheduler.celery_backend.config import app +from swh.scheduler.utils import get_task def read_from_stdin(): @@ -58,7 +58,7 @@ ', '.join(possible_tasks)) return - task = app.tasks[TASK_NAMES[task_name]] + task = get_task(TASK_NAMES[task_name]) if limit: run_with_limit(task, int(limit), batch) diff --git a/swh/indexer/rehash.py b/swh/indexer/rehash.py new file mode 100644 --- /dev/null +++ b/swh/indexer/rehash.py @@ -0,0 +1,148 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import defaultdict + +from swh.model import hashutil +from swh.core import utils +from swh.core.config import SWHConfig +from swh.storage import get_storage + + +class RecomputeChecksums(SWHConfig): + """Class in charge of (re)computing content's hashes. + + Hashes to compute are defined across 2 configuration options: + + - compute_checksums ([str]): list of hash algorithms that + swh.model.hashutil.hash_data function should be able to deal + with. For variable-length checksums, a desired checksum length + should also be provided. Their format is : e.g: blake2:512 + + - recompute_checksums (bool): a boolean to notify that we also + want to recompute potential existing hashes specified in + compute_checksums. Default to False. + + """ + DEFAULT_CONFIG = { + 'storage': ('dict', { + 'cls': 'remote', + 'args': { + 'url': 'http://localhost:5002/' + }, + }), + # the set of checksums that should be computed. For + # variable-length checksums a desired checksum length should also + # be provided. Examples: 'sha1_git', 'sha3:224', 'blake2:512', 'sha512' + 'compute_checksums': ( + 'list[str]', []), + # whether checksums that already exist in the DB should be + # recomputed/updated or left untouched + 'recompute_checksums': ('bool', False), + # Number of contents to retrieve blobs at the same time + 'batch_size_retrieve_content': ('int', 10), + # Number of contents to update at the same time + 'batch_size_update': ('int', 100) + } + + CONFIG_BASE_FILENAME = 'storage/rehash' + + def __init__(self): + self.config = self.parse_config_file() + self.storage = get_storage(**self.config['storage']) + self.compute_checksums = self.config['compute_checksums'] + self.recompute_checksums = self.config[ + 'recompute_checksums'] + self.batch_size_retrieve_content = self.config[ + 'batch_size_retrieve_content'] + self.batch_size_update = self.config[ + 'batch_size_update'] + + if not self.compute_checksums: + raise ValueError('Checksums list should not be empty.') + + def _read_content_ids(self, contents): + """Read the content identifiers from the contents. + + """ + for c in contents: + h = c['sha1'] + if isinstance(h, str): + h = hashutil.hash_to_bytes(h) + + yield h + + def get_new_contents_metadata(self, all_contents): + """Retrieve raw contents and compute new checksums on the + contents. Unknown or corrupted contents are skipped. + + Args: + all_contents ([dict]): List of contents as dictionary with + the necessary primary keys + checksum_algorithms ([str]): List of checksums to compute + + Yields: + tuple of: content to update, list of checksums computed + + """ + for contents in utils.grouper(all_contents, + self.batch_size_retrieve_content): + contents = self.storage.content_get_metadata( + self._read_content_ids(contents)) + + for content in contents: + # Retrieve content's data + raw_contents = list(self.storage.content_get( + [content['sha1']])) + raw_content = raw_contents[0] + if not raw_content: + continue + + raw_content = raw_content['data'] + + if self.recompute_checksums: # Recompute checksums provided + # in compute_checksums options + checksums_to_compute = list(self.compute_checksums) + else: # Compute checkums provided in compute_checksums + # options not already defined for that content + checksums_to_compute = [h for h in self.compute_checksums + if not content.get(h)] + + if not checksums_to_compute: # Nothing to recompute + continue + + # Actually computing the checksums for that content + updated_content = hashutil.hash_data( + raw_content, algorithms=checksums_to_compute) + content.update(updated_content) + yield content, checksums_to_compute + + def run(self, contents): + """Given a list of content (dict): + - (re)compute a given set of checksums on contents + available in our object storage + - update those contents with the new metadata + + Args: + - contents ([dict]): contents as dictionary with + necessary keys. key present in such dictionary + should be the ones defined in the 'primary_key' + option. + + """ + for data in utils.grouper( + self.get_new_contents_metadata(contents), + self.batch_size_update): + + groups = defaultdict(list) + for content, keys_to_update in list(data): + keys = ','.join(keys_to_update) + groups[keys].append(content) + + for keys_to_update, contents in groups.items(): + keys = keys_to_update.split(',') + self.storage.content_update(contents, + keys=keys) diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016 The Software Heritage developers +# Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -11,6 +11,7 @@ from .language import ContentLanguageIndexer from .ctags import CtagsIndexer from .fossology_license import ContentFossologyLicenseIndexer +from .rehash import RecomputeChecksums class SWHOrchestratorAllContentsTask(Task): @@ -73,3 +74,13 @@ def run(self, *args, **kwargs): ContentFossologyLicenseIndexer().run(*args, **kwargs) + + +class SWHRecomputeChecksums(Task): + """Task which recomputes hashes and possibly new ones. + + """ + task_queue = 'swh_indexer_rehash' + + def run(self, *args, **kwargs): + RecomputeChecksums().run(*args, **kwargs)