diff --git a/swh/indexer/__init__.py b/swh/indexer/__init__.py --- a/swh/indexer/__init__.py +++ b/swh/indexer/__init__.py @@ -20,6 +20,7 @@ 'language': 'swh.indexer.tasks.SWHContentLanguageTask', 'ctags': 'swh.indexer.tasks.SWHCtagsTask', 'fossology_license': 'swh.indexer.tasks.SWHContentFossologyLicenseTask', + 'rehash': 'swh.indexer.tasks.SWHRecomputeChecksums', } diff --git a/swh/indexer/producer.py b/swh/indexer/producer.py --- a/swh/indexer/producer.py +++ b/swh/indexer/producer.py @@ -9,7 +9,7 @@ from swh.core import utils from swh.model import hashutil -from swh.scheduler.celery_backend.config import app +from swh.scheduler.utils import get_task def read_from_stdin(): @@ -17,19 +17,26 @@ yield hashutil.hash_to_bytes(sha1.strip()) -def gen_sha1(batch): +def gen_sha1(batch, dict_with_key=None): """Generate batch of grouped sha1s from the objstorage. """ - for sha1s in utils.grouper(read_from_stdin(), batch): - sha1s = list(sha1s) - random.shuffle(sha1s) - yield sha1s + def _gen(): + for sha1s in utils.grouper(read_from_stdin(), batch): + sha1s = list(sha1s) + random.shuffle(sha1s) + yield sha1s + + if dict_with_key: + for sha1s in _gen(): + yield [{dict_with_key: sha1} for sha1 in sha1s] + else: + yield from _gen() -def run_with_limit(task, limit, batch): +def run_with_limit(task, limit, batch, dict_with_key=None): count = 0 - for sha1s in gen_sha1(batch): + for sha1s in gen_sha1(batch, dict_with_key): count += len(sha1s) print('%s sent - [%s, ...]' % (len(sha1s), sha1s[0])) task.delay(sha1s) @@ -37,8 +44,9 @@ return -def run_no_limit(task, batch): - for sha1s in gen_sha1(batch): +def run(task, batch, dict_with_key=None): + for sha1s in gen_sha1(batch, dict_with_key): + print(sha1s) print('%s sent - [%s, ...]' % (len(sha1s), sha1s[0])) task.delay(sha1s) @@ -47,7 +55,15 @@ @click.option('--limit', default=None, help='Limit the number of data to read') @click.option('--batch', default='10', help='Group data by batch') @click.option('--task-name', default='orchestrator_all', help='') -def main(limit, batch, task_name): +@click.option('--dict-with-key', default=None) +def main(limit, batch, task_name, dict_with_key): + """Read sha1 from stdin and send them for indexing. + + By default, send directly list of hashes. Using the + --dict-with-key, this will send dict list with one key mentioned + as parameter to the dict-with-key flag. + + """ batch = int(batch) from . import tasks, TASK_NAMES # noqa @@ -58,12 +74,12 @@ ', '.join(possible_tasks)) return - task = app.tasks[TASK_NAMES[task_name]] + task = get_task(TASK_NAMES[task_name]) if limit: - run_with_limit(task, int(limit), batch) + run_with_limit(task, int(limit), batch, dict_with_key) else: - run_no_limit(task, batch) + run(task, batch, dict_with_key) if __name__ == '__main__': diff --git a/swh/indexer/rehash.py b/swh/indexer/rehash.py new file mode 100644 --- /dev/null +++ b/swh/indexer/rehash.py @@ -0,0 +1,148 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import defaultdict + +from swh.model import hashutil +from swh.core import utils +from swh.core.config import SWHConfig +from swh.storage import get_storage + + +class RecomputeChecksums(SWHConfig): + """Class in charge of (re)computing content's hashes. + + Hashes to compute are defined across 2 configuration options: + + - compute_checksums ([str]): list of hash algorithms that + swh.model.hashutil.hash_data function should be able to deal + with. For variable-length checksums, a desired checksum length + should also be provided. Their format is : e.g: blake2:512 + + - recompute_checksums (bool): a boolean to notify that we also + want to recompute potential existing hashes specified in + compute_checksums. Default to False. + + """ + DEFAULT_CONFIG = { + 'storage': ('dict', { + 'cls': 'remote', + 'args': { + 'url': 'http://localhost:5002/' + }, + }), + # the set of checksums that should be computed. For + # variable-length checksums a desired checksum length should also + # be provided. Examples: 'sha1_git', 'sha3:224', 'blake2:512', 'sha512' + 'compute_checksums': ( + 'list[str]', []), + # whether checksums that already exist in the DB should be + # recomputed/updated or left untouched + 'recompute_checksums': ('bool', False), + # Number of contents to retrieve blobs at the same time + 'batch_size_retrieve_content': ('int', 10), + # Number of contents to update at the same time + 'batch_size_update': ('int', 100) + } + + CONFIG_BASE_FILENAME = 'storage/rehash' + + def __init__(self): + self.config = self.parse_config_file() + self.storage = get_storage(**self.config['storage']) + self.compute_checksums = self.config['compute_checksums'] + self.recompute_checksums = self.config[ + 'recompute_checksums'] + self.batch_size_retrieve_content = self.config[ + 'batch_size_retrieve_content'] + self.batch_size_update = self.config[ + 'batch_size_update'] + + if not self.compute_checksums: + raise ValueError('Checksums list should not be empty.') + + def _read_content_ids(self, contents): + """Read the content identifiers from the contents. + + """ + for c in contents: + h = c['sha1'] + if isinstance(h, str): + h = hashutil.hash_to_bytes(h) + + yield h + + def get_new_contents_metadata(self, all_contents): + """Retrieve raw contents and compute new checksums on the + contents. Unknown or corrupted contents are skipped. + + Args: + all_contents ([dict]): List of contents as dictionary with + the necessary primary keys + checksum_algorithms ([str]): List of checksums to compute + + Yields: + tuple of: content to update, list of checksums computed + + """ + for contents in utils.grouper(all_contents, + self.batch_size_retrieve_content): + contents = self.storage.content_get_metadata( + self._read_content_ids(contents)) + + for content in contents: + # Retrieve content's data + raw_contents = list(self.storage.content_get( + [content['sha1']])) + raw_content = raw_contents[0] + if not raw_content: + continue + + raw_content = raw_content['data'] + + if self.recompute_checksums: # Recompute checksums provided + # in compute_checksums options + checksums_to_compute = list(self.compute_checksums) + else: # Compute checkums provided in compute_checksums + # options not already defined for that content + checksums_to_compute = [h for h in self.compute_checksums + if not content.get(h)] + + if not checksums_to_compute: # Nothing to recompute + continue + + # Actually computing the checksums for that content + updated_content = hashutil.hash_data( + raw_content, algorithms=checksums_to_compute) + content.update(updated_content) + yield content, checksums_to_compute + + def run(self, contents): + """Given a list of content (dict): + - (re)compute a given set of checksums on contents + available in our object storage + - update those contents with the new metadata + + Args: + - contents ([dict]): contents as dictionary with + necessary keys. key present in such dictionary + should be the ones defined in the 'primary_key' + option. + + """ + for data in utils.grouper( + self.get_new_contents_metadata(contents), + self.batch_size_update): + + groups = defaultdict(list) + for content, keys_to_update in list(data): + keys = ','.join(keys_to_update) + groups[keys].append(content) + + for keys_to_update, contents in groups.items(): + keys = keys_to_update.split(',') + self.storage.content_update(contents, + keys=keys) diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016 The Software Heritage developers +# Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -11,6 +11,7 @@ from .language import ContentLanguageIndexer from .ctags import CtagsIndexer from .fossology_license import ContentFossologyLicenseIndexer +from .rehash import RecomputeChecksums class SWHOrchestratorAllContentsTask(Task): @@ -73,3 +74,13 @@ def run(self, *args, **kwargs): ContentFossologyLicenseIndexer().run(*args, **kwargs) + + +class SWHRecomputeChecksums(Task): + """Task which recomputes hashes and possibly new ones. + + """ + task_queue = 'swh_indexer_rehash' + + def run(self, *args, **kwargs): + RecomputeChecksums().run(*args, **kwargs)