diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py index bc37773..a114d97 100644 --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,383 +1,386 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import os import logging import shutil import tempfile from swh.core.config import SWHConfig from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError from swh.model import hashutil from swh.storage import get_storage from swh.scheduler.utils import get_task class DiskIndexer: - """Mixin intended to be used with other *Indexer classes. + """Mixin intended to be used with other SomethingIndexer classes. - Indexer* inheriting from this class are a category of indexers - which needs the disk for their computations. + Indexers inheriting from this class are a category of indexers which + needs the disk for their computations. - Expects: - self.working_directory variable defined at runtime. + Note: + expects `self.working_directory` variable defined at runtime. """ def __init__(self): super().__init__() def write_to_temp(self, filename, data): """Write the sha1's content in a temporary file. Args: sha1 (str): the sha1 name filename (str): one of sha1's many filenames data (bytes): the sha1's content to write in temporary file Returns: The path to the temporary file created. That file is filled in with the raw content's data. """ os.makedirs(self.working_directory, exist_ok=True) temp_dir = tempfile.mkdtemp(dir=self.working_directory) content_path = os.path.join(temp_dir, filename) with open(content_path, 'wb') as f: f.write(data) return content_path def cleanup(self, content_path): """Remove content_path from working directory. Args: content_path (str): the file to remove """ temp_dir = os.path.dirname(content_path) shutil.rmtree(temp_dir) class BaseIndexer(SWHConfig, metaclass=abc.ABCMeta): """Base class for indexers to inherit from. The main entry point is the `run` functions which is in charge to trigger the computations on the ids batch received. Indexers can: + - filter out ids whose data has already been indexed. - retrieve ids data from storage or objstorage - index this data depending on the object and store the result in storage. To implement a new object type indexer, inherit from the BaseIndexer and - implement the process of indexation : + implement the process of indexation: - - def run(self, object_ids, policy_update): object_ids are different - depending on object. For example: sha1 for content, sha1_git for - revision, directory, release, and id for origin + def run(self, object_ids, policy_update) + object_ids are different depending on object. For example: sha1 for + content, sha1_git for revision, directory, release, and id for origin To implement a new concrete indexer, inherit from the object level classes: ContentIndexer, RevisionIndexer (later on OriginIndexer will also be available) Then you need to implement the following functions: - - def filter(self, ids): filter out data already - indexed (in storage). This function is used by the - orchestrator and not directly by the indexer - (cf. swh.indexer.orchestrator.BaseOrchestratorIndexer). + def filter(self, ids) + filter out data already indexed (in storage). This function is used by + the orchestrator and not directly by the indexer + (cf. swh.indexer.orchestrator.BaseOrchestratorIndexer). - - def index_object(self, id, data): compute index on - id with data (retrieved from the storage or the objstorage by the - id key) and return the resulting index computation. + def index_object(self, id, data) + compute index on id with data (retrieved from the storage or the + objstorage by the id key) and return the resulting index computation. - - def persist_index_computations(self, results, policy_update): - persist the results of multiple index computations in the - storage. + def persist_index_computations(self, results, policy_update) + persist the results of multiple index computations in the storage. The new indexer implementation can also override the following functions: - - def prepare(self): Configuration preparation for the indexer. - When overriding, this must call the super().prepare() function. + def prepare(self) + Configuration preparation for the indexer. When overriding, this must + call the super().prepare() function. - - def check(self): Configuration check for the indexer. - When overriding, this must call the super().check() function. + def check(self) + Configuration check for the indexer. When overriding, this must call the + super().check() function. - - def retrieve_tools_information(self): This should return a - dict of the tool(s) to use when indexing or filtering. + def retrieve_tools_information(self) + This should return a dict of the tool(s) to use when indexing or + filtering. """ CONFIG = 'indexer/base' DEFAULT_CONFIG = { 'storage': ('dict', { 'host': 'uffizi', 'cls': 'remote', 'args': {'root': '/tmp/softwareheritage/objects', 'slicing': '0:2/2:4/4:6'} }), # queue to reschedule if problem (none for no rescheduling, # the default) 'rescheduling_task': ('str', None), 'objstorage': ('dict', { 'cls': 'multiplexer', 'args': { 'objstorages': [{ 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure-storage', 'args': { 'account_name': '0euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '0'} ] } }, { 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure-storage', 'args': { 'account_name': '1euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '1'} ] } }] }, }), } ADDITIONAL_CONFIG = {} def __init__(self): """Prepare and check that the indexer is ready to run. """ super().__init__() self.prepare() self.check() def prepare(self): """Prepare the indexer's needed runtime configuration. Without this step, the indexer cannot possibly run. """ self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) objstorage = self.config['objstorage'] self.objstorage = get_objstorage(objstorage['cls'], objstorage['args']) storage = self.config['storage'] self.storage = get_storage(storage['cls'], storage['args']) rescheduling_task = self.config['rescheduling_task'] if rescheduling_task: self.rescheduling_task = get_task(rescheduling_task) else: self.rescheduling_task = None l = logging.getLogger('requests.packages.urllib3.connectionpool') l.setLevel(logging.WARN) self.log = logging.getLogger('swh.indexer') self.tools = self.retrieve_tools_information() def check(self): """Check the indexer's configuration is ok before proceeding. If ok, does nothing. If not raise error. """ if not self.tools: raise ValueError('Tools %s is unknown, cannot continue' % self.config['tools']) def retrieve_tools_information(self): """Permit to define how to retrieve tool information based on configuration. Add a sensible default which can be overridden if not sufficient. (For now, all indexers use only one tool) """ tool = { 'tool_%s' % key: value for key, value in self.config['tools'].items() } return self.storage.indexer_configuration_get(tool) @abc.abstractmethod def filter(self, ids): """Filter missing ids for that particular indexer. Args: ids ([bytes]): list of ids Yields: iterator of missing ids """ pass @abc.abstractmethod def index(self, id, data): """Index computation for the id and associated raw data. Args: id (bytes): identifier data (bytes): id's data from storage or objstorage depending on object type Returns: a dict that makes sense for the persist_index_computations function. """ pass @abc.abstractmethod def persist_index_computations(self, results, policy_update): """Persist the computation resulting from the index. Args: results ([result]): List of results. One result is the result of the index function. policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them Returns: None """ pass def next_step(self, results): """Do something else with computations results (e.g. send to another queue, ...). (This is not an abstractmethod since it is optional). Args: results ([result]): List of results (dict) as returned by index function. Returns: None """ pass @abc.abstractmethod def run(self, ids, policy_update): """Given a list of ids: - retrieves the data from the storage - executes the indexing computations - stores the results (according to policy_update) Args: ids ([bytes]): id's identifier list policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ pass class ContentIndexer(BaseIndexer): """ An object type indexer, inherits from the BaseIndexer and implements the process of indexation for Contents using the run method Note: the ContentIndexer is not an instantiable object to use it in another context one should inherit from this class and override the methods mentioned in the BaseIndexer class """ def run(self, sha1s, policy_update): """Given a list of sha1s: - retrieve the content from the storage - execute the indexing computations - store the results (according to policy_update) Args: sha1s ([bytes]): sha1's identifier list policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ results = [] try: for sha1 in sha1s: try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warn('Content %s not found in objstorage' % hashutil.hash_to_hex(sha1)) continue res = self.index(sha1, raw_content) if res: # If no results, skip it results.append(res) self.persist_index_computations(results, policy_update) self.next_step(results) except Exception: self.log.exception( 'Problem when reading contents metadata.') if self.rescheduling_task: self.log.warn('Rescheduling batch') self.rescheduling_task.delay(sha1s, policy_update) class RevisionIndexer(BaseIndexer): """ An object type indexer, inherits from the BaseIndexer and implements the process of indexation for Revisions using the run method Note: the RevisionIndexer is not an instantiable object to use it in another context one should inherit from this class and override the methods mentioned in the BaseIndexer class """ def run(self, sha1_gits, policy_update): """ Given a list of sha1_gits: - retrieve revsions from storage - execute the indexing computations - store the results (according to policy_update) Args: sha1_gits ([bytes]): sha1_git's identifier list policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ results = [] revs = self.storage.revision_get(sha1_gits) for rev in revs: if not rev: self.log.warn('Revisions %s not found in storage' % list(map(hashutil.hash_to_hex, sha1_gits))) continue try: res = self.index(rev) if res: # If no results, skip it results.append(res) except Exception: self.log.exception( 'Problem when processing revision') self.persist_index_computations(results, policy_update) diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py index 966537a..ee574f1 100644 --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,283 +1,294 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging from swh.indexer.indexer import ContentIndexer, RevisionIndexer from swh.indexer.metadata_dictionary import compute_metadata from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_detector import extract_minimal_metadata_dict from swh.model import hashutil class ContentMetadataIndexer(ContentIndexer): - """Indexer at content level in charge of: + """Content-level indexer + + This indexer is in charge of: + - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing translated_metadata by given context - using the metadata_dictionary as the 'swh-metadata-translator' tool - store result in content_metadata table + """ CONFIG_BASE_FILENAME = 'indexer/metadata' def __init__(self, tool, config): self.tool = tool # twisted way to use the exact same config of RevisionMetadataIndexer # object that uses internally ContentMetadataIndexer self.config = config super().__init__() def prepare(self): self.results = [] if self.config['storage']: self.storage = self.config['storage'] if self.config['objstorage']: self.objstorage = self.config['objstorage'] l = logging.getLogger('requests.packages.urllib3.connectionpool') l.setLevel(logging.WARN) self.log = logging.getLogger('swh.indexer') self.tools = self.retrieve_tools_information() def retrieve_tools_information(self): self.config['tools'] = self.tool return super().retrieve_tools_information() def filter(self, sha1s): """Filter out known sha1s and return only missing ones. """ yield from self.storage.content_metadata_missing(( { 'id': sha1, 'indexer_configuration_id': self.tools['id'], } for sha1 in sha1s )) def index(self, sha1, raw_content): """Index sha1s' content and store result. Args: sha1 (bytes): content's identifier raw_content (bytes): raw content in bytes Returns: - result (dict): representing a content_metadata - if translation wasn't successful the translated_metadata keys - will be kept as None + dict: dictionary representing a content_metadata. If the + translation wasn't successful the translated_metadata keys will + be returned as None """ result = { 'id': sha1, 'indexer_configuration_id': self.tools['id'], 'translated_metadata': None } try: context = self.tools['tool_configuration']['context'] result['translated_metadata'] = compute_metadata( context, raw_content) # a twisted way to keep result with indexer object for get_results self.results.append(result) except: self.log.exception( "Problem during tool retrieval of metadata translation") return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_metadata, dict with the following keys: - id (bytes): content's identifier (sha1) - translated_metadata (jsonb): detected metadata policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.storage.content_metadata_add( results, conflict_update=(policy_update == 'update-dups')) def get_results(self): """ can be called only if run method was called before Returns: - results (list): list of content_metadata entries calculated - by current indxer + list: list of content_metadata entries calculated by current indxer """ return self.results class RevisionMetadataIndexer(RevisionIndexer): - """Indexer at Revision level in charge of: + """Revision-level indexer + + This indexer is in charge of: + - filtering revisions already indexed in revision_metadata table with defined computation tool - retrieve all entry_files in root directory - use metadata_detector for file_names containig metadata - compute metadata translation if necessary and possible (depends on tool) - send sha1s to content indexing if possible - store the results for revision """ CONFIG_BASE_FILENAME = 'indexer/metadata' ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'swh-metadata-detector', 'version': '0.0.1', 'configuration': { 'type': 'local', 'context': ['npm', 'codemeta'] }, }), } def prepare(self): super().prepare() def filter(self, sha1_gits): """Filter out known sha1s and return only missing ones. """ yield from self.storage.revision_metadata_missing(( { 'id': sha1_git, 'indexer_configuration_id': self.tools['id'], } for sha1_git in sha1_gits )) def index(self, rev): """Index rev by processing it and organizing result. - use metadata_detector to iterate on filenames - - if one filename detected -> sends file to content indexer - - if multiple file detected -> translation needed at revision level - Args: - rev (bytes): revision artifact from storage + use metadata_detector to iterate on filenames + + - if one filename detected -> sends file to content indexer + - if multiple file detected -> translation needed at revision level - Returns: - A dict, representing a revision_metadata, with keys: - - id (bytes): rev's identifier (sha1_git) - - indexer_configuration_id (bytes): tool used - - translated_metadata (bytes): dict of retrieved metadata + Args: + rev (bytes): revision artifact from storage + + Returns: + dict: dictionary representing a revision_metadata, with keys: + + - id (bytes): rev's identifier (sha1_git) + - indexer_configuration_id (bytes): tool used + - translated_metadata (bytes): dict of retrieved metadata """ try: result = { 'id': rev['id'], 'indexer_configuration_id': self.tools['id'], 'translated_metadata': None } root_dir = rev['directory'] dir_ls = self.storage.directory_ls(root_dir, recursive=False) files = (entry for entry in dir_ls if entry['type'] == 'file') detected_files = detect_metadata(files) result['translated_metadata'] = self.translate_revision_metadata( detected_files) except Exception as e: self.log.exception( 'Problem when indexing rev') return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ # TODO: add functions in storage to keep data in revision_metadata self.storage.revision_metadata_add( results, conflict_update=(policy_update == 'update-dups')) def translate_revision_metadata(self, detected_files): """ Determine plan of action to translate metadata when containing one or multiple detected files: + Args: - - detected_files : dict with context name and list of sha1s - (e.g : {'npm' : [sha1_1, sha1_2], - 'authors': sha1_3}) + detected_files (dict): dictionary mapping context names (e.g., + "npm", "authors") to list of sha1 Returns: - - translated_metadata: dict with the CodeMeta vocabulary + dict: dict with translated metadata according to the CodeMeta + vocabulary + """ translated_metadata = [] tool = { 'name': 'swh-metadata-translator', 'version': '0.0.1', 'configuration': { 'type': 'local', 'context': None }, } # TODO: iterate on each context, on each file # -> get raw_contents # -> translate each content config = { 'storage': self.storage, 'objstorage': self.objstorage } for context in detected_files.keys(): tool['configuration']['context'] = context c_metadata_indexer = ContentMetadataIndexer(tool, config) # sha1s that are in content_metadata table sha1s_in_storage = [] metadata_generator = self.storage.content_metadata_get( detected_files[context]) for c in metadata_generator: # extracting translated_metadata sha1 = c['id'] sha1s_in_storage.append(sha1) local_metadata = c['translated_metadata'] # local metadata is aggregated if local_metadata: translated_metadata.append(local_metadata) sha1s_filtered = [item for item in detected_files[context] if item not in sha1s_in_storage] if sha1s_filtered: # schedule indexation of content try: c_metadata_indexer.run(sha1s_filtered, policy_update='ignore-dups') # on the fly possibility: results = c_metadata_indexer.get_results() for result in results: local_metadata = result['translated_metadata'] translated_metadata.append(local_metadata) except Exception as e: self.log.warn("""Exception while indexing content""", e) # transform translated_metadata into min set with swh-metadata-detector min_metadata = extract_minimal_metadata_dict(translated_metadata) return min_metadata @click.command() @click.option('--revs', '-i', default=['8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', '026040ea79dec1b49b4e3e7beda9132b6b26b51b', '9699072e21eded4be8d45e3b8d543952533fa190'], help='Default sha1_git to lookup', multiple=True) def main(revs): _git_sha1s = list(map(hashutil.hash_to_bytes, revs)) rev_metadata_indexer = RevisionMetadataIndexer() rev_metadata_indexer.run(_git_sha1s, 'update-dups') if __name__ == '__main__': logging.basicConfig(level=logging.INFO) main() diff --git a/swh/indexer/metadata_dictionary.py b/swh/indexer/metadata_dictionary.py index d68b65c..5e2ee14 100644 --- a/swh/indexer/metadata_dictionary.py +++ b/swh/indexer/metadata_dictionary.py @@ -1,204 +1,211 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json def convert(raw_content): """ convert raw_content recursively: - - from bytes to string - - from string to dict + + - from bytes to string + - from string to dict + Args: - - raw_content (bytes / string / dict) + raw_content (bytes / string / dict) + Returns: - - Dict of content (if string was json, otherwise returns string) + dict: content (if string was json, otherwise returns string) + """ if isinstance(raw_content, bytes): return convert(raw_content.decode()) if isinstance(raw_content, str): try: content = json.loads(raw_content) if content: return content else: return raw_content except json.decoder.JSONDecodeError: return raw_content if isinstance(raw_content, dict): return raw_content class BaseMapping(): """Base class for mappings to inherit from To implement a new mapping: - - inherit this class - - add a local property self.mapping - - override translate function + + - inherit this class + - add a local property self.mapping + - override translate function """ def translate(self, content_dict): """ Tranlsates content by parsing content to a json object and translating with the npm mapping (for now hard_coded mapping) + Args: - - context_text (text) : should be json + context_text (text): should be json Returns: - - translated_metadata (dict): jsonb form needed for the indexer + dict: translated metadata in jsonb form needed for the indexer + """ translated_metadata = {} default = 'other' translated_metadata['other'] = {} try: for k, v in content_dict.items(): try: term = self.mapping.get(k, default) if term not in translated_metadata: translated_metadata[term] = v continue if isinstance(translated_metadata[term], str): in_value = translated_metadata[term] translated_metadata[term] = [in_value, v] continue if isinstance(translated_metadata[term], list): translated_metadata[term].append(v) continue if isinstance(translated_metadata[term], dict): translated_metadata[term][k] = v continue except KeyError: self.log.exception( "Problem during item mapping") continue except: return None return translated_metadata class NpmMapping(BaseMapping): """ dedicated class for NPM (package.json) mapping and translation """ mapping = { 'repository': 'codeRepository', 'os': 'operatingSystem', 'cpu': 'processorRequirements', 'engines': 'processorRequirements', 'dependencies': 'softwareRequirements', 'bundleDependencies': 'softwareRequirements', 'peerDependencies': 'softwareRequirements', 'author': 'author', 'contributor': 'contributor', 'keywords': 'keywords', 'license': 'license', 'version': 'version', 'description': 'description', 'name': 'name', 'devDependencies': 'softwareSuggestions', 'optionalDependencies': 'softwareSuggestions', 'bugs': 'issueTracker', 'homepage': 'url' } def translate(self, raw_content): content_dict = convert(raw_content) return super().translate(content_dict) class MavenMapping(BaseMapping): """ dedicated class for Maven (pom.xml) mapping and translation """ mapping = { 'license': 'license', 'version': 'version', 'description': 'description', 'name': 'name', 'prerequisites': 'softwareRequirements', 'repositories': 'codeRepository', 'groupId': 'identifier', 'ciManagement': 'contIntegration', 'issuesManagement': 'issueTracker', } def translate(self, raw_content): content = convert(raw_content) # parse content from xml to dict return super().translate(content) class DoapMapping(BaseMapping): mapping = { } def translate(self, raw_content): content = convert(raw_content) # parse content from xml to dict return super().translate(content) def parse_xml(content): """ Parses content from xml to a python dict Args: - content (text): the string form of the raw_content ( in xml) Returns: - parsed_xml (dict): a python dict of the content after parsing """ # check if xml # use xml parser to dict return content mapping_tool_fn = { "npm": NpmMapping(), "maven": MavenMapping(), "doap_xml": DoapMapping() } def compute_metadata(context, raw_content): """ first landing method: a dispatcher that sends content to the right function to carry out the real parsing of syntax and translation of terms + Args: - - context (text) : defines to which function/tool - the content is sent - - content (text): the string form of the raw_content + context (text): defines to which function/tool the content is sent + content (text): the string form of the raw_content Returns: - - translated_metadata (dict): jsonb form needed for the indexer - to store in storage + dict: translated metadata jsonb dictionary needed for the indexer to + store in storage """ if raw_content is None or raw_content is b"": return None # TODO: keep mapping not in code (maybe fetch crosswalk from storage?) # if fetched from storage should be done once for batch of sha1s dictionary = mapping_tool_fn[context] translated_metadata = dictionary.translate(raw_content) # print(translated_metadata) return translated_metadata def main(): raw_content = """{"name": "test_name", "unknown_term": "ut"}""" raw_content1 = b"""{"name": "test_name", "unknown_term": "ut", "prerequisites" :"packageXYZ"}""" result = compute_metadata("npm", raw_content) result1 = compute_metadata("maven", raw_content1) print(result) print(result1) if __name__ == "__main__": main() diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py index 414a535..1411443 100644 --- a/swh/indexer/orchestrator.py +++ b/swh/indexer/orchestrator.py @@ -1,128 +1,123 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group from swh.core.config import SWHConfig from swh.core.utils import grouper from swh.scheduler import utils from . import TASK_NAMES, INDEXER_CLASSES def get_class(clazz): """Get a symbol class dynamically by its fully qualified name string representation. """ parts = clazz.split('.') module = '.'.join(parts[:-1]) m = __import__(module) for comp in parts[1:]: m = getattr(m, comp) return m class BaseOrchestratorIndexer(SWHConfig): """The indexer orchestrator is in charge of dispatching batch of contents (filtered or not based on presence) to indexers. That dispatch is indexer specific, so the configuration reflects it: - when check_presence flag is true, filter out the contents already present for that indexer, otherwise send everything - broadcast those (filtered or not) contents to indexers in a batch_size fashioned - For example: - - ```yaml - indexers: - mimetype: - batch_size: 10 - check_presence: false - language: - batch_size: 2 - check_presence: true - ``` + For example:: - means: + indexers: + mimetype: + batch_size: 10 + check_presence: false + language: + batch_size: 2 + check_presence: true - - send all contents received as batch of size 10 to the 'mimetype' - indexer + means: - - send only unknown contents as batch of size 2 to the 'language' - indexer. + - send all contents received as batch of size 10 to the 'mimetype' indexer + - send only unknown contents as batch of size 2 to the 'language' indexer. """ CONFIG_BASE_FILENAME = 'indexer/orchestrator' DEFAULT_CONFIG = { 'indexers': ('dict', { 'mimetype': { 'batch_size': 10, 'check_presence': True, }, }), } def __init__(self): super().__init__() self.config = self.parse_config_file() indexer_names = list(self.config['indexers'].keys()) random.shuffle(indexer_names) indexers = {} tasks = {} for name in indexer_names: if name not in TASK_NAMES: raise ValueError('%s must be one of %s' % ( name, TASK_NAMES.keys())) opts = self.config['indexers'][name] indexers[name] = ( INDEXER_CLASSES[name], opts['check_presence'], opts['batch_size']) tasks[name] = utils.get_task(TASK_NAMES[name]) self.indexers = indexers self.tasks = tasks def run(self, ids): for name, (idx_class, filtering, batch_size) in self.indexers.items(): if filtering: policy_update = 'ignore-dups' indexer_class = get_class(idx_class) ids_filtered = list(indexer_class().filter(ids)) if not ids_filtered: continue else: policy_update = 'update-dups' ids_filtered = ids celery_tasks = [] for ids_to_send in grouper(ids_filtered, batch_size): celery_task = self.tasks[name].s( ids=list(ids_to_send), policy_update=policy_update) celery_tasks.append(celery_task) group(celery_tasks).delay() class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer): """Orchestrator which deals with batch of any types of contents. """ class OrchestratorTextContentsIndexer(BaseOrchestratorIndexer): """Orchestrator which deals with batch of text contents. """ CONFIG_BASE_FILENAME = 'indexer/orchestrator_text' diff --git a/swh/indexer/rehash.py b/swh/indexer/rehash.py index dc35a0b..4530e2d 100644 --- a/swh/indexer/rehash.py +++ b/swh/indexer/rehash.py @@ -1,189 +1,189 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import itertools from collections import defaultdict from swh.core import utils from swh.core.config import SWHConfig from swh.model import hashutil from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError from swh.storage import get_storage from swh.scheduler.utils import get_task class RecomputeChecksums(SWHConfig): """Class in charge of (re)computing content's hashes. Hashes to compute are defined across 2 configuration options: - - compute_checksums ([str]): list of hash algorithms that - swh.model.hashutil.hash_data function should be able to deal - with. For variable-length checksums, a desired checksum length - should also be provided. Their format is : e.g: blake2:512 + compute_checksums ([str]) + list of hash algorithms that py:func:`swh.model.hashutil.hash_data` + function should be able to deal with. For variable-length checksums, a + desired checksum length should also be provided. Their format is + : e.g: blake2:512 - - recompute_checksums (bool): a boolean to notify that we also - want to recompute potential existing hashes specified in - compute_checksums. Default to False. + recompute_checksums (bool) + a boolean to notify that we also want to recompute potential existing + hashes specified in compute_checksums. Default to False. """ DEFAULT_CONFIG = { # The storage to read from or update metadata to 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/' }, }), # The objstorage to read contents' data from 'objstorage': ('dict', { 'cls': 'pathslicing', 'args': { 'root': '/srv/softwareheritage/objects', 'slicing': '0:2/2:4/4:6', }, }), # the set of checksums that should be computed. # Examples: 'sha1_git', 'blake2b512', 'blake2s256' 'compute_checksums': ( 'list[str]', []), # whether checksums that already exist in the DB should be # recomputed/updated or left untouched 'recompute_checksums': ('bool', False), # Number of contents to retrieve blobs at the same time 'batch_size_retrieve_content': ('int', 10), # Number of contents to update at the same time 'batch_size_update': ('int', 100), # Rescheduling task on error (if None, nothing is done) 'rescheduling_task': ('str', None), } CONFIG_BASE_FILENAME = 'indexer/rehash' def __init__(self): self.config = self.parse_config_file() self.storage = get_storage(**self.config['storage']) self.objstorage = get_objstorage(**self.config['objstorage']) self.compute_checksums = self.config['compute_checksums'] self.recompute_checksums = self.config[ 'recompute_checksums'] self.batch_size_retrieve_content = self.config[ 'batch_size_retrieve_content'] self.batch_size_update = self.config[ 'batch_size_update'] self.log = logging.getLogger('swh.indexer.rehash') rescheduling_task = self.config['rescheduling_task'] if rescheduling_task: self.rescheduling_task = get_task(rescheduling_task) else: self.rescheduling_task = None if not self.compute_checksums: raise ValueError('Checksums list should not be empty.') def _read_content_ids(self, contents): """Read the content identifiers from the contents. """ for c in contents: h = c['sha1'] if isinstance(h, str): h = hashutil.hash_to_bytes(h) yield h def get_new_contents_metadata(self, all_contents): """Retrieve raw contents and compute new checksums on the contents. Unknown or corrupted contents are skipped. Args: all_contents ([dict]): List of contents as dictionary with the necessary primary keys checksum_algorithms ([str]): List of checksums to compute Yields: tuple of: content to update, list of checksums computed """ content_ids = self._read_content_ids(all_contents) for contents in utils.grouper(content_ids, self.batch_size_retrieve_content): contents_iter = itertools.tee(contents, 2) try: content_metadata = self.storage.content_get_metadata( [s for s in contents_iter[0]]) except Exception: self.log.exception( 'Problem when reading contents metadata.') if self.rescheduling_task: self.log.warn('Rescheduling batch.') cs = [{'sha1': sha1} for sha1 in contents_iter[1]] self.rescheduling_task.delay(cs) continue for content in content_metadata: if self.recompute_checksums: # Recompute checksums provided # in compute_checksums options checksums_to_compute = list(self.compute_checksums) else: # Compute checkums provided in compute_checksums # options not already defined for that content checksums_to_compute = [h for h in self.compute_checksums if not content.get(h)] if not checksums_to_compute: # Nothing to recompute continue try: raw_content = self.objstorage.get(content['sha1']) except ObjNotFoundError: self.log.warn('Content %s not found in objstorage!' % content['sha1']) continue # Actually computing the checksums for that content updated_content = hashutil.hash_data( raw_content, algorithms=checksums_to_compute) content.update(updated_content) yield content, checksums_to_compute def run(self, contents): - """Given a list of content (dict): - - (re)compute a given set of checksums on contents - available in our object storage - - update those contents with the new metadata - - Args: - - contents ([dict]): contents as dictionary with - necessary keys. key present in such dictionary - should be the ones defined in the 'primary_key' - option. + """Given a list of content: + + - (re)compute a given set of checksums on contents available in our + object storage + - update those contents with the new metadata + + Args: + contents (dict): contents as dictionary with necessary keys. + key present in such dictionary should be the ones defined in + the 'primary_key' option. """ for data in utils.grouper( self.get_new_contents_metadata(contents), self.batch_size_update): groups = defaultdict(list) for content, keys_to_update in data: keys = ','.join(keys_to_update) groups[keys].append(content) for keys_to_update, contents in groups.items(): keys = keys_to_update.split(',') try: self.storage.content_update(contents, keys=keys) except Exception: self.log.exception('Problem during update.') if self.rescheduling_task: self.log.warn('Rescheduling batch.') cs = [{'sha1': c['sha1']} for c in contents] self.rescheduling_task.delay(cs) continue