diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py index 9d38a54..ce4e7a7 100644 --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,506 +1,506 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import os import logging import shutil import tempfile import datetime from copy import deepcopy from swh.storage import get_storage from swh.core.config import SWHConfig from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError from swh.model import hashutil from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY class DiskIndexer: """Mixin intended to be used with other SomethingIndexer classes. Indexers inheriting from this class are a category of indexers which needs the disk for their computations. Note: This expects `self.working_directory` variable defined at runtime. """ def write_to_temp(self, filename, data): """Write the sha1's content in a temporary file. Args: sha1 (str): the sha1 name filename (str): one of sha1's many filenames data (bytes): the sha1's content to write in temporary file Returns: The path to the temporary file created. That file is filled in with the raw content's data. """ os.makedirs(self.working_directory, exist_ok=True) temp_dir = tempfile.mkdtemp(dir=self.working_directory) content_path = os.path.join(temp_dir, filename) with open(content_path, 'wb') as f: f.write(data) return content_path def cleanup(self, content_path): """Remove content_path from working directory. Args: content_path (str): the file to remove """ temp_dir = os.path.dirname(content_path) shutil.rmtree(temp_dir) class BaseIndexer(SWHConfig, metaclass=abc.ABCMeta): """Base class for indexers to inherit from. The main entry point is the :func:`run` function which is in charge of triggering the computations on the batch dict/ids received. Indexers can: - filter out ids whose data has already been indexed. - retrieve ids data from storage or objstorage - index this data depending on the object and store the result in storage. To implement a new object type indexer, inherit from the BaseIndexer and implement indexing: :func:`run`: object_ids are different depending on object. For example: sha1 for content, sha1_git for revision, directory, release, and id for origin To implement a new concrete indexer, inherit from the object level classes: :class:`ContentIndexer`, :class:`RevisionIndexer`, :class:`OriginIndexer`. Then you need to implement the following functions: :func:`filter`: filter out data already indexed (in storage). This function is used by the orchestrator and not directly by the indexer (cf. swh.indexer.orchestrator.BaseOrchestratorIndexer). :func:`index_object`: compute index on id with data (retrieved from the storage or the objstorage by the id key) and return the resulting index computation. :func:`persist_index_computations`: persist the results of multiple index computations in the storage. The new indexer implementation can also override the following functions: :func:`prepare`: Configuration preparation for the indexer. When overriding, this must call the `super().prepare()` instruction. :func:`check`: Configuration check for the indexer. When overriding, this must call the `super().check()` instruction. :func:`register_tools`: This should return a dict of the tool(s) to use when indexing or filtering. """ CONFIG = 'indexer/base' DEFAULT_CONFIG = { INDEXER_CFG_KEY: ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5007/' } }), 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/', } }), 'objstorage': ('dict', { 'cls': 'multiplexer', 'args': { 'objstorages': [{ 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure', 'args': { 'account_name': '0euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '0'} ] } }, { 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure', 'args': { 'account_name': '1euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '1'} ] } }] }, }), } ADDITIONAL_CONFIG = {} def __init__(self): """Prepare and check that the indexer is ready to run. """ super().__init__() self.prepare() self.check() def prepare(self): """Prepare the indexer's needed runtime configuration. Without this step, the indexer cannot possibly run. """ self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) if self.config['storage']: self.storage = get_storage(**self.config['storage']) objstorage = self.config['objstorage'] self.objstorage = get_objstorage(objstorage['cls'], objstorage['args']) idx_storage = self.config[INDEXER_CFG_KEY] self.idx_storage = get_indexer_storage(**idx_storage) _log = logging.getLogger('requests.packages.urllib3.connectionpool') _log.setLevel(logging.WARN) self.log = logging.getLogger('swh.indexer') self.tools = list(self.register_tools(self.config['tools'])) def check(self): """Check the indexer's configuration is ok before proceeding. If ok, does nothing. If not raise error. """ if not self.tools: raise ValueError('Tools %s is unknown, cannot continue' % self.tools) def _prepare_tool(self, tool): """Prepare the tool dict to be compliant with the storage api. """ return {'tool_%s' % key: value for key, value in tool.items()} def register_tools(self, tools): """Permit to register tools to the storage. Add a sensible default which can be overridden if not sufficient. (For now, all indexers use only one tool) Expects the self.config['tools'] property to be set with one or more tools. Args: tools (dict/[dict]): Either a dict or a list of dict. Returns: List of dict with additional id key. Raises: ValueError if not a list nor a dict. """ tools = self.config['tools'] if isinstance(tools, list): tools = map(self._prepare_tool, tools) elif isinstance(tools, dict): tools = [self._prepare_tool(tools)] else: raise ValueError('Configuration tool(s) must be a dict or list!') return self.idx_storage.indexer_configuration_add(tools) @abc.abstractmethod def filter(self, ids): """Filter missing ids for that particular indexer. Args: ids ([bytes]): list of ids Yields: iterator of missing ids """ pass @abc.abstractmethod def index(self, id, data): """Index computation for the id and associated raw data. Args: id (bytes): identifier data (bytes): id's data from storage or objstorage depending on object type Returns: a dict that makes sense for the persist_index_computations function. """ pass @abc.abstractmethod def persist_index_computations(self, results, policy_update): """Persist the computation resulting from the index. Args: results ([result]): List of results. One result is the result of the index function. policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them Returns: None """ pass def next_step(self, results, task): """Do something else with computations results (e.g. send to another queue, ...). (This is not an abstractmethod since it is optional). Args: results ([result]): List of results (dict) as returned by index function. task (dict): a dict in the form expected by `scheduler.backend.SchedulerBackend.create_tasks` without `next_run`, plus a `result_name` key. Returns: None """ if task: if getattr(self, 'scheduler', None): scheduler = self.scheduler else: scheduler = get_scheduler(**self.config['scheduler']) task = deepcopy(task) result_name = task.pop('result_name') task['next_run'] = datetime.datetime.now() task['arguments']['kwargs'][result_name] = self.results scheduler.create_tasks([task]) @abc.abstractmethod def run(self, ids, policy_update, next_step=None, **kwargs): """Given a list of ids: - retrieves the data from the storage - executes the indexing computations - stores the results (according to policy_update) Args: ids ([bytes]): id's identifier list policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them next_step (dict): a dict in the form expected by `scheduler.backend.SchedulerBackend.create_tasks` without `next_run`, plus a `result_name` key. **kwargs: passed to the `index` method """ pass class ContentIndexer(BaseIndexer): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Content indexing using the run method Note: the :class:`ContentIndexer` is not an instantiable object. To use it in another context, one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, ids, policy_update, next_step=None, **kwargs): """Given a list of ids: - retrieve the content from the storage - execute the indexing computations - store the results (according to policy_update) Args: ids ([bytes]): sha1's identifier list policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them next_step (dict): a dict in the form expected by `scheduler.backend.SchedulerBackend.create_tasks` without `next_run`, plus a `result_name` key. **kwargs: passed to the `index` method """ results = [] try: for sha1 in ids: try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: - self.log.warn('Content %s not found in objstorage' % - hashutil.hash_to_hex(sha1)) + self.log.warning('Content %s not found in objstorage' % + hashutil.hash_to_hex(sha1)) continue res = self.index(sha1, raw_content, **kwargs) if res: # If no results, skip it results.append(res) self.persist_index_computations(results, policy_update) self.results = results return self.next_step(results, task=next_step) except Exception: self.log.exception( 'Problem when reading contents metadata.') class OriginIndexer(BaseIndexer): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Origin indexing using the run method Note: the :class:`OriginIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, ids, policy_update, parse_ids=False, next_step=None, **kwargs): """Given a list of origin ids: - retrieve origins from storage - execute the indexing computations - store the results (according to policy_update) Args: ids ([Union[int, Tuple[str, bytes]]]): list of origin ids or (type, url) tuples. policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them parse_ids (bool: If `True`, will try to convert `ids` from a human input to the valid type. next_step (dict): a dict in the form expected by `scheduler.backend.SchedulerBackend.create_tasks` without `next_run`, plus a `result_name` key. **kwargs: passed to the `index` method """ if parse_ids: ids = [ o.split('+', 1) if ':' in o else int(o) # type+url or id for o in ids] results = [] for id_ in ids: if isinstance(id_, (tuple, list)): if len(id_) != 2: raise TypeError('Expected a (type, url) tuple.') (type_, url) = id_ params = {'type': type_, 'url': url} elif isinstance(id_, int): params = {'id': id_} else: raise TypeError('Invalid value in "ids": %r' % id_) origin = self.storage.origin_get(params) if not origin: - self.log.warn('Origins %s not found in storage' % - list(ids)) + self.log.warning('Origins %s not found in storage' % + list(ids)) continue try: res = self.index(origin, **kwargs) if origin: # If no results, skip it results.append(res) except Exception: self.log.exception( 'Problem when processing origin %s' % id_) self.persist_index_computations(results, policy_update) self.results = results return self.next_step(results, task=next_step) class RevisionIndexer(BaseIndexer): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Revision indexing using the run method Note: the :class:`RevisionIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, ids, policy_update, next_step=None): """Given a list of sha1_gits: - retrieve revisions from storage - execute the indexing computations - store the results (according to policy_update) Args: ids ([bytes or str]): sha1_git's identifier list policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ results = [] ids = [id_.encode() if isinstance(id_, str) else id_ for id_ in ids] revs = self.storage.revision_get(ids) for rev in revs: if not rev: - self.log.warn('Revisions %s not found in storage' % - list(map(hashutil.hash_to_hex, ids))) + self.log.warning('Revisions %s not found in storage' % + list(map(hashutil.hash_to_hex, ids))) continue try: res = self.index(rev) if res: # If no results, skip it results.append(res) except Exception: self.log.exception( 'Problem when processing revision') self.persist_index_computations(results, policy_update) self.results = results return self.next_step(results, task=next_step) diff --git a/swh/indexer/language.py b/swh/indexer/language.py index 044d80a..5ac61ec 100644 --- a/swh/indexer/language.py +++ b/swh/indexer/language.py @@ -1,208 +1,209 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import io from pygments.lexers import guess_lexer from pygments.util import ClassNotFound from chardet.universaldetector import UniversalDetector from .indexer import ContentIndexer def _cleanup_classname(classname): """Determine the language from the pygments' lexer names. """ return classname.lower().replace(' ', '-') def _read_raw(raw_content, size=2048): """Read raw content in chunk. """ bs = io.BytesIO(raw_content) while True: chunk = bs.read(size) if not chunk: break yield chunk def _detect_encoding(raw_content): """Given a raw content, try and detect its encoding. """ detector = UniversalDetector() for chunk in _read_raw(raw_content): detector.feed(chunk) if detector.done: break detector.close() return detector.result['encoding'] def compute_language_from_chunk(encoding, length, raw_content, max_size, log=None): """Determine the raw content's language. Args: encoding (str): Encoding to use to decode the content length (int): raw_content's length raw_content (bytes): raw content to work with max_size (int): max size to split the raw content at Returns: Dict with keys: - lang: None if nothing found or the possible language """ try: if max_size <= length: raw_content = raw_content[0:max_size] content = raw_content.decode(encoding) lang = _cleanup_classname( guess_lexer(content).name) except ClassNotFound: lang = None except UnicodeDecodeError: raise except Exception: if log: log.exception('Problem during language detection, skipping') lang = None return { 'lang': lang } def compute_language(raw_content, encoding=None, log=None): """Determine the raw content's language. Args: raw_content (bytes): raw content to work with Returns: Dict with keys: - lang: None if nothing found or the possible language """ try: encoding = _detect_encoding(raw_content) content = raw_content.decode(encoding) lang = _cleanup_classname( guess_lexer(content).name) except ClassNotFound: lang = None except Exception: if log: log.exception('Problem during language detection, skipping') lang = None return { 'lang': lang } class ContentLanguageIndexer(ContentIndexer): """Indexer in charge of: - filtering out content already indexed - reading content from objstorage per the content's id (sha1) - computing {mimetype, encoding} from that content - store result in storage """ CONFIG_BASE_FILENAME = 'indexer/language' ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'pygments', 'version': '2.0.1+dfsg-1.1+deb8u1', 'configuration': { 'type': 'library', 'debian-package': 'python3-pygments', 'max_content_size': 10240, }, }), } def prepare(self): super().prepare() c = self.config self.max_content_size = c['tools']['configuration']['max_content_size'] self.tool = self.tools[0] def filter(self, ids): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.content_language_missing(( { 'id': sha1, 'indexer_configuration_id': self.tool['id'] } for sha1 in ids )) def index(self, id, data): """Index sha1s' content and store result. Args: id (bytes): content's identifier data (bytes): raw content in bytes Returns: A dict, representing a content_mimetype, with keys: - id (bytes): content's identifier (sha1) - lang (bytes): detected language """ result = { 'id': id, 'indexer_configuration_id': self.tool['id'], 'lang': None, } encoding = _detect_encoding(data) if not encoding: return result _len = len(data) for i in range(0, 9): max_size = self.max_content_size + i try: result = compute_language_from_chunk( encoding, _len, data, max_size, log=self.log) except UnicodeDecodeError: - self.log.warn('Decoding failed on wrong byte chunk at [0-%s]' - ', trying again at next ending byte.' % max_size) + self.log.warning( + 'Decoding failed on wrong byte chunk at [0-%s]' + ', trying again at next ending byte.' % max_size) continue # we found something, so we return it result.update({ 'id': id, 'indexer_configuration_id': self.tool['id'], }) break return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - lang (bytes): detected language policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.idx_storage.content_language_add( results, conflict_update=(policy_update == 'update-dups')) diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py index 5a17314..933716b 100644 --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,334 +1,334 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging from swh.indexer.indexer import ContentIndexer, RevisionIndexer, OriginIndexer from swh.indexer.metadata_dictionary import MAPPINGS from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_detector import extract_minimal_metadata_dict from swh.indexer.storage import INDEXER_CFG_KEY from swh.model import hashutil class ContentMetadataIndexer(ContentIndexer): """Content-level indexer This indexer is in charge of: - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing translated_metadata by given context - using the metadata_dictionary as the 'swh-metadata-translator' tool - store result in content_metadata table """ CONFIG_BASE_FILENAME = 'indexer/metadata' def __init__(self, tool, config): # twisted way to use the exact same config of RevisionMetadataIndexer # object that uses internally ContentMetadataIndexer self.config = config self.config['tools'] = tool super().__init__() def filter(self, ids): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.content_metadata_missing(( { 'id': sha1, 'indexer_configuration_id': self.tool['id'], } for sha1 in ids )) def index(self, id, data): """Index sha1s' content and store result. Args: id (bytes): content's identifier data (bytes): raw content in bytes Returns: dict: dictionary representing a content_metadata. If the translation wasn't successful the translated_metadata keys will be returned as None """ result = { 'id': id, 'indexer_configuration_id': self.tool['id'], 'translated_metadata': None } try: mapping_name = self.tool['tool_configuration']['context'] result['translated_metadata'] = MAPPINGS[mapping_name] \ .translate(data) # a twisted way to keep result with indexer object for get_results self.results.append(result) except Exception: self.log.exception( "Problem during tool retrieval of metadata translation") return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_metadata, dict with the following keys: - id (bytes): content's identifier (sha1) - translated_metadata (jsonb): detected metadata policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.idx_storage.content_metadata_add( results, conflict_update=(policy_update == 'update-dups')) def get_results(self): """can be called only if run method was called before Returns: list: list of content_metadata entries calculated by current indexer """ return self.results class RevisionMetadataIndexer(RevisionIndexer): """Revision-level indexer This indexer is in charge of: - filtering revisions already indexed in revision_metadata table with defined computation tool - retrieve all entry_files in root directory - use metadata_detector for file_names containing metadata - compute metadata translation if necessary and possible (depends on tool) - send sha1s to content indexing if possible - store the results for revision """ CONFIG_BASE_FILENAME = 'indexer/metadata' ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'swh-metadata-detector', 'version': '0.0.2', 'configuration': { 'type': 'local', 'context': ['NpmMapping', 'CodemetaMapping'] }, }), } ContentMetadataIndexer = ContentMetadataIndexer def prepare(self): super().prepare() self.tool = self.tools[0] def filter(self, sha1_gits): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.revision_metadata_missing(( { 'id': sha1_git, 'indexer_configuration_id': self.tool['id'], } for sha1_git in sha1_gits )) def index(self, rev): """Index rev by processing it and organizing result. use metadata_detector to iterate on filenames - if one filename detected -> sends file to content indexer - if multiple file detected -> translation needed at revision level Args: rev (bytes): revision artifact from storage Returns: dict: dictionary representing a revision_metadata, with keys: - id (str): rev's identifier (sha1_git) - indexer_configuration_id (bytes): tool used - translated_metadata (bytes): dict of retrieved metadata """ try: result = { 'id': rev['id'].decode(), 'indexer_configuration_id': self.tool['id'], 'translated_metadata': None } root_dir = rev['directory'] dir_ls = self.storage.directory_ls(root_dir, recursive=False) files = [entry for entry in dir_ls if entry['type'] == 'file'] detected_files = detect_metadata(files) result['translated_metadata'] = self.translate_revision_metadata( detected_files) except Exception as e: self.log.exception( 'Problem when indexing rev: %r', e) return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ # TODO: add functions in storage to keep data in revision_metadata self.idx_storage.revision_metadata_add( results, conflict_update=(policy_update == 'update-dups')) def translate_revision_metadata(self, detected_files): """ Determine plan of action to translate metadata when containing one or multiple detected files: Args: detected_files (dict): dictionary mapping context names (e.g., "npm", "authors") to list of sha1 Returns: dict: dict with translated metadata according to the CodeMeta vocabulary """ translated_metadata = [] tool = { 'name': 'swh-metadata-translator', 'version': '0.0.2', 'configuration': { 'type': 'local', 'context': None }, } # TODO: iterate on each context, on each file # -> get raw_contents # -> translate each content config = { INDEXER_CFG_KEY: self.idx_storage, 'objstorage': self.objstorage } for context in detected_files.keys(): tool['configuration']['context'] = context c_metadata_indexer = self.ContentMetadataIndexer(tool, config) # sha1s that are in content_metadata table sha1s_in_storage = [] metadata_generator = self.idx_storage.content_metadata_get( detected_files[context]) for c in metadata_generator: # extracting translated_metadata sha1 = c['id'] sha1s_in_storage.append(sha1) local_metadata = c['translated_metadata'] # local metadata is aggregated if local_metadata: translated_metadata.append(local_metadata) sha1s_filtered = [item for item in detected_files[context] if item not in sha1s_in_storage] if sha1s_filtered: # schedule indexation of content try: c_metadata_indexer.run(sha1s_filtered, policy_update='ignore-dups') # on the fly possibility: results = c_metadata_indexer.get_results() for result in results: local_metadata = result['translated_metadata'] translated_metadata.append(local_metadata) except Exception as e: - self.log.warn("""Exception while indexing content""", e) + self.log.warning("""Exception while indexing content""", e) # transform translated_metadata into min set with swh-metadata-detector min_metadata = extract_minimal_metadata_dict(translated_metadata) return min_metadata class OriginMetadataIndexer(OriginIndexer): def filter(self, ids): return ids def run(self, revisions_metadata, policy_update, *, origin_head): """Expected to be called with the result of RevisionMetadataIndexer as first argument; ie. not a list of ids as other indexers would. Args: * `revisions_metadata` (List[dict]): contains metadata from revisions, along with the respective revision ids. It is passed by RevisionMetadataIndexer via a Celery chain triggered by OriginIndexer.next_step. * `policy_update`: `'ignore-dups'` or `'update-dups'` * `origin_head` (dict): {str(origin_id): rev_id.encode()} keys `origin_id` and `revision_id`, which is the result of OriginHeadIndexer. """ origin_head_map = {int(origin_id): rev_id for (origin_id, rev_id) in origin_head.items()} # Fix up the argument order. revisions_metadata has to be the # first argument because of celery.chain; the next line calls # run() with the usual order, ie. origin ids first. return super().run(ids=list(origin_head_map), policy_update=policy_update, revisions_metadata=revisions_metadata, origin_head_map=origin_head_map) def index(self, origin, *, revisions_metadata, origin_head_map): # Get the last revision of the origin. revision_id = origin_head_map[origin['id']] # Get the metadata of that revision, and return it for revision_metadata in revisions_metadata: if revision_metadata['id'] == revision_id: return { 'origin_id': origin['id'], 'metadata': revision_metadata['translated_metadata'], 'from_revision': revision_id, 'indexer_configuration_id': revision_metadata['indexer_configuration_id'], } raise KeyError('%r not in %r' % (revision_id, [r['id'] for r in revisions_metadata])) def persist_index_computations(self, results, policy_update): self.idx_storage.origin_intrinsic_metadata_add( results, conflict_update=(policy_update == 'update-dups')) @click.command() @click.option('--revs', '-i', help='Default sha1_git to lookup', multiple=True) def main(revs): _git_sha1s = list(map(hashutil.hash_to_bytes, revs)) rev_metadata_indexer = RevisionMetadataIndexer() rev_metadata_indexer.run(_git_sha1s, 'update-dups') if __name__ == '__main__': logging.basicConfig(level=logging.INFO) main() diff --git a/swh/indexer/rehash.py b/swh/indexer/rehash.py index bc90bc9..d2697e0 100644 --- a/swh/indexer/rehash.py +++ b/swh/indexer/rehash.py @@ -1,172 +1,172 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import itertools from collections import defaultdict from swh.core import utils from swh.core.config import SWHConfig from swh.model import hashutil from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError from swh.storage import get_storage class RecomputeChecksums(SWHConfig): """Class in charge of (re)computing content's hashes. Hashes to compute are defined across 2 configuration options: compute_checksums ([str]) list of hash algorithms that py:func:`swh.model.hashutil.MultiHash.from_data` function should be able to deal with. For variable-length checksums, a desired checksum length should also be provided. Their format is : e.g: blake2:512 recompute_checksums (bool) a boolean to notify that we also want to recompute potential existing hashes specified in compute_checksums. Default to False. """ DEFAULT_CONFIG = { # The storage to read from or update metadata to 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/' }, }), # The objstorage to read contents' data from 'objstorage': ('dict', { 'cls': 'pathslicing', 'args': { 'root': '/srv/softwareheritage/objects', 'slicing': '0:2/2:4/4:6', }, }), # the set of checksums that should be computed. # Examples: 'sha1_git', 'blake2b512', 'blake2s256' 'compute_checksums': ( 'list[str]', []), # whether checksums that already exist in the DB should be # recomputed/updated or left untouched 'recompute_checksums': ('bool', False), # Number of contents to retrieve blobs at the same time 'batch_size_retrieve_content': ('int', 10), # Number of contents to update at the same time 'batch_size_update': ('int', 100), } CONFIG_BASE_FILENAME = 'indexer/rehash' def __init__(self): self.config = self.parse_config_file() self.storage = get_storage(**self.config['storage']) self.objstorage = get_objstorage(**self.config['objstorage']) self.compute_checksums = self.config['compute_checksums'] self.recompute_checksums = self.config[ 'recompute_checksums'] self.batch_size_retrieve_content = self.config[ 'batch_size_retrieve_content'] self.batch_size_update = self.config[ 'batch_size_update'] self.log = logging.getLogger('swh.indexer.rehash') if not self.compute_checksums: raise ValueError('Checksums list should not be empty.') def _read_content_ids(self, contents): """Read the content identifiers from the contents. """ for c in contents: h = c['sha1'] if isinstance(h, str): h = hashutil.hash_to_bytes(h) yield h def get_new_contents_metadata(self, all_contents): """Retrieve raw contents and compute new checksums on the contents. Unknown or corrupted contents are skipped. Args: all_contents ([dict]): List of contents as dictionary with the necessary primary keys checksum_algorithms ([str]): List of checksums to compute Yields: tuple of: content to update, list of checksums computed """ content_ids = self._read_content_ids(all_contents) for contents in utils.grouper(content_ids, self.batch_size_retrieve_content): contents_iter = itertools.tee(contents, 2) try: content_metadata = self.storage.content_get_metadata( [s for s in contents_iter[0]]) except Exception: self.log.exception( 'Problem when reading contents metadata.') continue for content in content_metadata: if self.recompute_checksums: # Recompute checksums provided # in compute_checksums options checksums_to_compute = list(self.compute_checksums) else: # Compute checksums provided in compute_checksums # options not already defined for that content checksums_to_compute = [h for h in self.compute_checksums if not content.get(h)] if not checksums_to_compute: # Nothing to recompute continue try: raw_content = self.objstorage.get(content['sha1']) except ObjNotFoundError: - self.log.warn('Content %s not found in objstorage!' % - content['sha1']) + self.log.warning('Content %s not found in objstorage!' % + content['sha1']) continue content_hashes = hashutil.MultiHash.from_data( raw_content, hash_names=checksums_to_compute).digest() content.update(content_hashes) yield content, checksums_to_compute def run(self, contents): """Given a list of content: - (re)compute a given set of checksums on contents available in our object storage - update those contents with the new metadata Args: contents (dict): contents as dictionary with necessary keys. key present in such dictionary should be the ones defined in the 'primary_key' option. """ for data in utils.grouper( self.get_new_contents_metadata(contents), self.batch_size_update): groups = defaultdict(list) for content, keys_to_update in data: keys = ','.join(keys_to_update) groups[keys].append(content) for keys_to_update, contents in groups.items(): keys = keys_to_update.split(',') try: self.storage.content_update(contents, keys=keys) except Exception: self.log.exception('Problem during update.') continue