diff --git a/swh/indexer/ctags.py b/swh/indexer/ctags.py --- a/swh/indexer/ctags.py +++ b/swh/indexer/ctags.py @@ -10,7 +10,7 @@ from swh.model import hashutil from .language import compute_language -from .indexer import BaseIndexer, DiskIndexer +from .indexer import ContentIndexer, DiskIndexer # Options used to compute tags @@ -54,7 +54,7 @@ } -class CtagsIndexer(BaseIndexer, DiskIndexer): +class CtagsIndexer(ContentIndexer, DiskIndexer): CONFIG_BASE_FILENAME = 'indexer/ctags' ADDITIONAL_CONFIG = { @@ -80,7 +80,7 @@ self.working_directory = self.config['workdir'] self.language_map = self.config['languages'] - def filter_contents(self, sha1s): + def filter(self, sha1s): """Filter out known sha1s and return only missing ones. """ @@ -91,7 +91,7 @@ } for sha1 in sha1s )) - def index_content(self, sha1, raw_content): + def index(self, sha1, raw_content): """Index sha1s' content and store result. Args: diff --git a/swh/indexer/fossology_license.py b/swh/indexer/fossology_license.py --- a/swh/indexer/fossology_license.py +++ b/swh/indexer/fossology_license.py @@ -8,7 +8,7 @@ from swh.model import hashutil -from .indexer import BaseIndexer, DiskIndexer +from .indexer import ContentIndexer, DiskIndexer def compute_license(path, log=None): @@ -46,7 +46,7 @@ } -class ContentFossologyLicenseIndexer(BaseIndexer, DiskIndexer): +class ContentFossologyLicenseIndexer(ContentIndexer, DiskIndexer): """Indexer in charge of: - filtering out content already indexed - reading content from objstorage per the content's id (sha1) @@ -71,7 +71,7 @@ super().prepare() self.working_directory = self.config['workdir'] - def filter_contents(self, sha1s): + def filter(self, sha1s): """Filter out known sha1s and return only missing ones. """ @@ -82,7 +82,7 @@ } for sha1 in sha1s )) - def index_content(self, sha1, raw_content): + def index(self, sha1, raw_content): """Index sha1s' content and store result. Args: diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -69,24 +69,34 @@ """Base class for indexers to inherit from. The main entry point is the `run` functions which is in charge to - trigger the computations on the sha1s batch received. + trigger the computations on the ids batch received. Indexers can: - - filter out sha1 whose data has already been indexed. - - retrieve sha1's content from objstorage, index this content then - store the result in storage. + - filter out ids whose data has already been indexed. + - retrieve ids data from storage or objstorage + - index this data depending on the object and store the result in storage. - To implement a new index, inherit from this class and implement - the following functions: + To implement a new object type indexer, inherit from the BaseIndexer and + implement the process of indexation : - - def filter_contents(self, sha1s): filter out data already + - def run(self, object_ids, policy_update): object_ids are different + depending on object. For example: sha1 for content, sha1_git for + revision, directorie, release, and id for origin + + To implement a new concrete indexer, inherit from the object level classes: + ContentIndexer, RevisionIndexer + (later on OriginIndexer will also be available) + + Then you need to implement the following functions: + + - def filter(self, ids): filter out data already indexed (in storage). This function is used by the orchestrator and not directly by the indexer (cf. swh.indexer.orchestrator.BaseOrchestratorIndexer). - - def index_content(self, sha1, raw_content): compute index on - sha1 with data raw_content (retrieved in the objstorage by the - sha1 key) and return the resulting index computation. + - def index_object(self, id, data): compute index on + id with data (retrieved from the storage or the objstorage by the + id key) and return the resulting index computation. - def persist_index_computations(self, results, policy_update): persist the results of multiple index computations in the @@ -212,25 +222,26 @@ return self.storage.indexer_configuration_get(tool) @abc.abstractmethod - def filter_contents(self, sha1s): - """Filter missing sha1 for that particular indexer. + def filter(self, ids): + """Filter missing ids for that particular indexer. Args: - sha1s ([bytes]): list of contents' sha1 + ids ([bytes]): list of ids Yields: - iterator of missing sha1 + iterator of missing ids """ pass @abc.abstractmethod - def index_content(self, sha1, content): + def index(self, id, data): """Index computation for the sha1 and associated raw content. Args: - sha1 (bytes): sha1 identifier - content (bytes): sha1's raw content + id (bytes): sha1 identifier + content (bytes): id's data from storage or objstorage depending on + object type Returns: a dict that makes sense for the persist_index_computations @@ -245,7 +256,7 @@ Args: results ([result]): List of results. One result is the - result of the index_content function. + result of the index function. policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them @@ -263,7 +274,7 @@ Args: results ([result]): List of results (dict) as returned - by index_content function. + by index function. Returns: None @@ -271,6 +282,32 @@ """ pass + @abc.abstractmethod + def run(self, ids, policy_update): + """Given a list of ids: + - retrieves the data from the storage + - executes the indexing computations + - stores the results (according to policy_update) + + Args: + ids ([bytes]): id's identifier list + policy_update ([str]): either 'update-dups' or 'ignore-dups' to + respectively update duplicates or ignore them + + """ + pass + + +class ContentIndexer(BaseIndexer): + """ + An object type indexer, inherit from the BaseIndexer and + implement the process of indexation for Contents with the run method + + Note: the ContentIndexer is not an instantiable object + to use it in another context one should refer to the instructions in the + BaseIndexer + """ + def run(self, sha1s, policy_update): """Given a list of sha1s: - retrieve the content from the storage @@ -292,7 +329,7 @@ self.log.warn('Content %s not found in objstorage' % hashutil.hash_to_hex(sha1)) continue - res = self.index_content(sha1, raw_content) + res = self.index(sha1, raw_content) if res: # If no results, skip it results.append(res) @@ -304,3 +341,46 @@ if self.rescheduling_task: self.log.warn('Rescheduling batch') self.rescheduling_task.delay(sha1s, policy_update) + + +class RevisionIndexer(BaseIndexer): + """ + An object type indexer, inherit from the BaseIndexer and + implement the process of indexation for Revisions with the run method + + Note: the RevisionIndexer is not an instantiable object + to use it in another context one should refer to the instructions in the + BaseIndexer + """ + + def run(self, sha1_gits, policy_update): + """ + Given a list of sha1_gits: + - retrieve revsions from storage + - execute the indexing computations + - store the results (according to policy_update) + Args: + sha1_gits ([bytes]): sha1_git's identifier list + policy_update ([str]): either 'update-dups' or 'ignore-dups' to + respectively update duplicates or ignore them + + """ + results = [] + try: + for sha1_git in sha1_gits: + try: + revs = self.storage.revision_get([sha1_git]) + except ValueError: + self.log.warn('Revision %s not found in storage' % + hashutil.hash_to_hex(sha1_git)) + continue + for rev in revs: + if rev: # If no revision, skip it + res = self.index(rev) + print(res) + if res: # If no results, skip it + results.append(res) + self.persist_index_computations(results, policy_update) + except Exception: + self.log.exception( + 'Problem when processing revision') diff --git a/swh/indexer/language.py b/swh/indexer/language.py --- a/swh/indexer/language.py +++ b/swh/indexer/language.py @@ -10,7 +10,7 @@ from pygments.util import ClassNotFound from chardet.universaldetector import UniversalDetector -from .indexer import BaseIndexer +from .indexer import ContentIndexer def _cleanup_classname(classname): @@ -107,7 +107,7 @@ } -class ContentLanguageIndexer(BaseIndexer): +class ContentLanguageIndexer(ContentIndexer): """Indexer in charge of: - filtering out content already indexed - reading content from objstorage per the content's id (sha1) @@ -134,7 +134,7 @@ c = self.config self.max_content_size = c['tools']['configuration']['max_content_size'] - def filter_contents(self, sha1s): + def filter(self, sha1s): """Filter out known sha1s and return only missing ones. """ @@ -145,7 +145,7 @@ } for sha1 in sha1s )) - def index_content(self, sha1, raw_content): + def index(self, sha1, raw_content): """Index sha1s' content and store result. Args: diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -3,17 +3,22 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from .indexer import BaseIndexer +from swh.indexer.indexer import ContentIndexer, RevisionIndexer from swh.indexer.metadata_dictionary import compute_metadata +from swh.indexer.metadata_detector import detect_metadata +from swh.indexer.metadata_detector import extract_minimal_metadata_dict +from swh.model import hashutil +from swh.objstorage.exc import ObjNotFoundError -class ContentMetadataIndexer(BaseIndexer): - """Indexer in charge of: - - filtering out content already indexed + +class ContentMetadataIndexer(ContentIndexer): + """Indexer at content level in charge of: + - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing translated_metadata by given context - - using the MetadataDict and a tool for each context - - store result in storage + - using the metadata_dictionary as the 'swh-metadata-translator' tool + - store result in content_metadata table """ CONFIG_BASE_FILENAME = 'indexer/metadata' @@ -28,10 +33,17 @@ }), } + def __init__(self): + self.config = self.parse_config_file( + config_filename="~/.config/swh/storage.yml", + additional_configs=[self.ADDITIONAL_CONFIG]) + super().__init__() + def prepare(self): super().prepare() + self.results = [] - def filter_contents(self, sha1s): + def filter(self, sha1s): """Filter out known sha1s and return only missing ones. """ yield from self.storage.content_metadata_missing(( @@ -41,7 +53,7 @@ } for sha1 in sha1s )) - def index_content(self, sha1, raw_content): + def index(self, sha1, raw_content): """Index sha1s' content and store result. Args: @@ -63,6 +75,8 @@ context = self.tools['configuration']['context'] result['translated_metadata'] = compute_metadata( context, raw_content) + # a twisted way to keep result with indexer object for get_results + self.results.append(result) except: self.log.exception( "Problem during tool retrieval of metadata translation") @@ -82,3 +96,164 @@ """ self.storage.content_metadata_add( results, conflict_update=(policy_update == 'update-dups')) + + def get_results(self): + """ + can be called only if run method was called before + + Returns: + results (list): list of content_metadata entries calculated + by current indxer + """ + return self.results + + +class RevisionMetadataIndexer(RevisionIndexer): + """Indexer at Revision level in charge of: + - filtering revisions already indexed in revision_metadata table with + defined computation tool + - retrieve all entry_files in root directory + - use metadata_detector for file_names containig metadata + - compute metadata translation if necessary and possible (depends on tool) + - send sha1s to content indexing if possible + - store the results for revision + + """ + ADDITIONAL_CONFIG = { + 'destination_queue': ('str', None), + 'tools': ('dict', { + 'name': 'swh-metadata-detector', + 'version': '0.1', + 'configuration': { + 'type': 'local', + 'contexts': ['npm'] + }, + }), + } + + def prepare(self): + super().prepare() + self.tools = self.retrieve_tools_information() + print(self.tools) + + def filter(self, sha1_gits): + """Filter out known sha1s and return only missing ones. + + """ + yield from self.storage.revision_metadata_missing(( + { + 'id': sha1_git, + 'indexer_configuration_id': self.tools['id'], + } for sha1_git in sha1_gits + )) + + def index(self, rev): + """Index rev by processing it and organizing result. + use metadata_detector to iterate on filenames + - if one filename detected -> sends file to content indexer + - if multiple file detected -> translation needed at revision level + + Args: + rev (bytes): revision artifact from storage + + Returns: + A dict, representing a revision_metadata, with keys: + - id (bytes): rev's identifier (sha1_git) + - indexer_configuration_id (bytes): tool used + - translated_metadata (bytes): dict of retrieved metadata + + """ + try: + result = { + 'id': rev['id'], + 'indexer_configuration_id': self.tools['id'], + 'translated_metadata': None + } + + root_dir = rev['directory'] + dir_ls = self.storage.directory_ls(root_dir, recursive=True) + files = (entry for entry in dir_ls if entry['type'] == 'file') + detected_files = detect_metadata(files) + result['translated_metadata'] = self.translate_revision_metadata( + detected_files) + except Exception as e: + self.log.exception( + 'Problem when indexing rev: ', e) + print(e) + + return result + + def persist_index_computations(self, results, policy_update): + """Persist the results in storage. + + Args: + results ([dict]): list of content_mimetype, dict with the + following keys: + - id (bytes): content's identifier (sha1) + - mimetype (bytes): mimetype in bytes + - encoding (bytes): encoding in bytes + policy_update ([str]): either 'update-dups' or 'ignore-dups' to + respectively update duplicates or ignore them + + """ + self.storage.revision_metadata_add( + results, conflict_update=(policy_update == 'update-dups')) + + def translate_revision_metadata(self, detected_files): + """ + Determine plan of action to translate metadata when containing + one or multiple detected files: + Args: + - detected_files : dict with context name and list of sha1s + (e.g : {'npm' : [sha1_1, sha1_2], + 'authors': sha1_3}) + + Returns: + - translated_metadata: dict with the CodeMeta vocabulary + """ + print(detected_files) + translated_metadata = [] + + # TODO: iterate on each context, on each file + # -> get raw_contents + # -> translate each content + for context in detected_files.keys(): + for sha1 in detected_files[context]: + try: + raw_content = self.objstorage.get(sha1) + except ObjNotFoundError: + self.log.warn('Content %s not found in objstorage' % + hashutil.hash_to_hex(sha1)) + # sends to raw_content 'swh-metadata-translator' + local_metadata = compute_metadata(context, raw_content) + # aggregating metadata + translated_metadata.append(local_metadata) + # for now this method doesn't call the ContentMetadataIndexer + # due to configuration issue that should be resolved with a better + # configuration management plan, should look like this + ##################################################################### + # send sha1s to ContentMetadataIndexer + # c_metadata_indexer = ContentMetadataIndexer() + # c_metadata_indexer.run(sha1s, policy_update='ignore-dups') + # translated_metadata = c_metadata_indexer.get_results() + ##################################################################### + # open questions: + # do we keep at revision level the translated_metadata of one file? + # we have a key in the swh-metadata-translator named 'other' + # to keep undefined categories, should we delete this ? + extract_minimal_metadata_dict(translated_metadata) + return translated_metadata + + +def main(): + rev_metadata_indexer = RevisionMetadataIndexer() + sha1_git1 = hashutil.hash_to_bytes( + '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f') + sha1_git2 = hashutil.hash_to_bytes( + '8dbb6aeb036e7fd80664eb8bfd1507881af1ba94') + sha1_gits = [sha1_git1, sha1_git2] + rev_metadata_indexer.run(sha1_gits, 'update-dups') + + +if __name__ == '__main__': + main() diff --git a/swh/indexer/mimetype.py b/swh/indexer/mimetype.py --- a/swh/indexer/mimetype.py +++ b/swh/indexer/mimetype.py @@ -8,7 +8,7 @@ from subprocess import Popen, PIPE from swh.scheduler import utils -from .indexer import BaseIndexer +from .indexer import ContentIndexer def compute_mimetype_encoding(raw_content): @@ -35,7 +35,7 @@ } -class ContentMimetypeIndexer(BaseIndexer): +class ContentMimetypeIndexer(ContentIndexer): """Indexer in charge of: - filtering out content already indexed - reading content from objstorage per the content's id (sha1) @@ -67,7 +67,7 @@ self.task_destination = None self.tools = self.retrieve_tools_information() - def filter_contents(self, sha1s): + def filter(self, sha1s): """Filter out known sha1s and return only missing ones. """ @@ -78,7 +78,7 @@ } for sha1 in sha1s )) - def index_content(self, sha1, raw_content): + def index(self, sha1, raw_content): """Index sha1s' content and store result. Args: diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py --- a/swh/indexer/orchestrator.py +++ b/swh/indexer/orchestrator.py @@ -98,7 +98,7 @@ if filtering: policy_update = 'ignore-dups' indexer_class = get_class(idx_class) - sha1s_filtered = list(indexer_class().filter_contents(sha1s)) + sha1s_filtered = list(indexer_class().filter(sha1s)) if not sha1s_filtered: continue else: diff --git a/swh/indexer/tests/test_metadata.py b/swh/indexer/tests/test_metadata.py --- a/swh/indexer/tests/test_metadata.py +++ b/swh/indexer/tests/test_metadata.py @@ -19,16 +19,69 @@ self.state = metadata self.conflict_update = conflict_update + def revision_metadata_add(self, metadata, conflict_update=None): + self.state = metadata + self.conflict_update = conflict_update + def indexer_configuration_get(self, tool): - return { - 'id': 30, - 'name': 'hard_mapping_npm', - 'version': '0.1', - 'configuration': { - 'type': 'local', - 'context': 'npm' - }, - } + if tool['tool_name'] == 'swh-metadata-translator': + return { + 'id': 30, + 'name': 'swh-metadata-translator', + 'version': '0.1', + 'configuration': { + 'type': 'local', + 'context': 'npm' + }, + } + elif tool['tool_name'] == 'swh-metadata-detector': + return { + 'id': 7, + 'name': 'swh-metadata-detector', + 'version': '0.1', + 'configuration': { + 'type': 'local', + 'context': 'npm' + }, + } + + def revision_missing(self, revisions, cur=None): + pass + + def revision_get(self, revisions): + """Get all revisions from storage + Args: an iterable of revision ids + Returns: an iterable of revisions as dictionaries + (or None if the revision doesn't exist) + """ + pass + + def directory_get(self, + directories, + cur=None): + """Get information on directories. + + Args: + - directories: an iterable of directory ids + + Returns: + List of directories as dict with keys and associated values. + + """ + pass + + def directory_ls(self, directory, recursive=False, cur=None): + """Get entries for one directory. + + Args: + - directory: the directory to list entries from. + - recursive: if flag on, this list recursively from this directory. + + Returns: + List of entries for such directory. + + """ + pass class TestMetadataIndexer(ContentMetadataIndexer): @@ -53,6 +106,32 @@ self.task_destination = None self.rescheduling_task = self.config['rescheduling_task'] self.tools = self.retrieve_tools_information() + self.results = [] + + +# class TestRevisionMetadataIndexer(RevsionMetadataIndexer): +# """Specific indexer whose configuration is enough to satisfy the +# indexing tests. +# """ +# def prepare(self): +# self.config = { +# 'rescheduling_task': None, +# 'tools': { +# 'name': 'swh-metadata-detector', +# 'version': '0.0.1', +# 'configuration': { +# 'type': 'local', +# 'context': 'npm' +# } +# } +# } +# self.storage = MockStorage() +# self.log = logging.getLogger('swh.indexer') +# self.objstorage = MockObjStorage() +# self.task_destination = None +# self.rescheduling_task = self.config['rescheduling_task'] +# self.tools = self.retrieve_tools_information() +# self.results = [] class Metadata(unittest.TestCase):