diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py index 3f1fb8c..5c276c3 100644 --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,492 +1,521 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import os import logging import shutil import tempfile +import datetime +from copy import deepcopy from swh.storage import get_storage from swh.core.config import SWHConfig from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError from swh.model import hashutil from swh.scheduler.utils import get_task +from swh.scheduler import get_scheduler from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY class DiskIndexer: """Mixin intended to be used with other SomethingIndexer classes. Indexers inheriting from this class are a category of indexers which needs the disk for their computations. Note: This expects `self.working_directory` variable defined at runtime. """ def write_to_temp(self, filename, data): """Write the sha1's content in a temporary file. Args: sha1 (str): the sha1 name filename (str): one of sha1's many filenames data (bytes): the sha1's content to write in temporary file Returns: The path to the temporary file created. That file is filled in with the raw content's data. """ os.makedirs(self.working_directory, exist_ok=True) temp_dir = tempfile.mkdtemp(dir=self.working_directory) content_path = os.path.join(temp_dir, filename) with open(content_path, 'wb') as f: f.write(data) return content_path def cleanup(self, content_path): """Remove content_path from working directory. Args: content_path (str): the file to remove """ temp_dir = os.path.dirname(content_path) shutil.rmtree(temp_dir) class BaseIndexer(SWHConfig, metaclass=abc.ABCMeta): """Base class for indexers to inherit from. The main entry point is the :func:`run` function which is in charge of triggering the computations on the batch dict/ids received. Indexers can: - filter out ids whose data has already been indexed. - retrieve ids data from storage or objstorage - index this data depending on the object and store the result in storage. To implement a new object type indexer, inherit from the BaseIndexer and implement indexing: :func:`run`: object_ids are different depending on object. For example: sha1 for content, sha1_git for revision, directory, release, and id for origin To implement a new concrete indexer, inherit from the object level classes: :class:`ContentIndexer`, :class:`RevisionIndexer`, :class:`OriginIndexer`. Then you need to implement the following functions: :func:`filter`: filter out data already indexed (in storage). This function is used by the orchestrator and not directly by the indexer (cf. swh.indexer.orchestrator.BaseOrchestratorIndexer). :func:`index_object`: compute index on id with data (retrieved from the storage or the objstorage by the id key) and return the resulting index computation. :func:`persist_index_computations`: persist the results of multiple index computations in the storage. The new indexer implementation can also override the following functions: :func:`prepare`: Configuration preparation for the indexer. When overriding, this must call the `super().prepare()` instruction. :func:`check`: Configuration check for the indexer. When overriding, this must call the `super().check()` instruction. :func:`register_tools`: This should return a dict of the tool(s) to use when indexing or filtering. """ CONFIG = 'indexer/base' DEFAULT_CONFIG = { INDEXER_CFG_KEY: ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5007/' } }), # queue to reschedule if problem (none for no rescheduling, # the default) 'rescheduling_task': ('str', None), 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/', } }), 'objstorage': ('dict', { 'cls': 'multiplexer', 'args': { 'objstorages': [{ 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure', 'args': { 'account_name': '0euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '0'} ] } }, { 'cls': 'filtered', 'args': { 'storage_conf': { 'cls': 'azure', 'args': { 'account_name': '1euwestswh', 'api_secret_key': 'secret', 'container_name': 'contents' } }, 'filters_conf': [ {'type': 'readonly'}, {'type': 'prefix', 'prefix': '1'} ] } }] }, }), } ADDITIONAL_CONFIG = {} def __init__(self): """Prepare and check that the indexer is ready to run. """ super().__init__() self.prepare() self.check() def prepare(self): """Prepare the indexer's needed runtime configuration. Without this step, the indexer cannot possibly run. """ self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) if self.config['storage']: self.storage = get_storage(**self.config['storage']) objstorage = self.config['objstorage'] self.objstorage = get_objstorage(objstorage['cls'], objstorage['args']) idx_storage = self.config[INDEXER_CFG_KEY] self.idx_storage = get_indexer_storage(**idx_storage) rescheduling_task = self.config['rescheduling_task'] if rescheduling_task: self.rescheduling_task = get_task(rescheduling_task) else: self.rescheduling_task = None _log = logging.getLogger('requests.packages.urllib3.connectionpool') _log.setLevel(logging.WARN) self.log = logging.getLogger('swh.indexer') self.tools = list(self.register_tools(self.config['tools'])) def check(self): """Check the indexer's configuration is ok before proceeding. If ok, does nothing. If not raise error. """ if not self.tools: raise ValueError('Tools %s is unknown, cannot continue' % self.tools) def _prepare_tool(self, tool): """Prepare the tool dict to be compliant with the storage api. """ return {'tool_%s' % key: value for key, value in tool.items()} def register_tools(self, tools): """Permit to register tools to the storage. Add a sensible default which can be overridden if not sufficient. (For now, all indexers use only one tool) Expects the self.config['tools'] property to be set with one or more tools. Args: tools (dict/[dict]): Either a dict or a list of dict. Returns: List of dict with additional id key. Raises: ValueError if not a list nor a dict. """ tools = self.config['tools'] if isinstance(tools, list): tools = map(self._prepare_tool, tools) elif isinstance(tools, dict): tools = [self._prepare_tool(tools)] else: raise ValueError('Configuration tool(s) must be a dict or list!') return self.idx_storage.indexer_configuration_add(tools) @abc.abstractmethod def filter(self, ids): """Filter missing ids for that particular indexer. Args: ids ([bytes]): list of ids Yields: iterator of missing ids """ pass @abc.abstractmethod def index(self, id, data): """Index computation for the id and associated raw data. Args: id (bytes): identifier data (bytes): id's data from storage or objstorage depending on object type Returns: a dict that makes sense for the persist_index_computations function. """ pass @abc.abstractmethod def persist_index_computations(self, results, policy_update): """Persist the computation resulting from the index. Args: results ([result]): List of results. One result is the result of the index function. policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them Returns: None """ pass - def next_step(self, results): + def next_step(self, results, task): """Do something else with computations results (e.g. send to another queue, ...). (This is not an abstractmethod since it is optional). Args: results ([result]): List of results (dict) as returned by index function. + task (dict): a dict in the form expected by + `scheduler.backend.SchedulerBackend.create_tasks` + without `next_run`, plus a `result_name` key. Returns: None """ - pass + if task: + if getattr(self, 'scheduler', None): + scheduler = self.scheduler + else: + scheduler = get_scheduler(**self.config['scheduler']) + task = deepcopy(task) + result_name = task.pop('result_name') + task['next_run'] = datetime.datetime.now() + task['arguments']['kwargs'][result_name] = self.results + scheduler.create_tasks([task]) @abc.abstractmethod - def run(self, ids, policy_update, **kwargs): + def run(self, ids, policy_update, + next_step=None, **kwargs): """Given a list of ids: - retrieves the data from the storage - executes the indexing computations - stores the results (according to policy_update) Args: ids ([bytes]): id's identifier list - policy_update ([str]): either 'update-dups' or 'ignore-dups' to + policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them + next_step (dict): a dict in the form expected by + `scheduler.backend.SchedulerBackend.create_tasks` + without `next_run`, plus a `result_name` key. **kwargs: passed to the `index` method """ pass class ContentIndexer(BaseIndexer): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Content indexing using the run method Note: the :class:`ContentIndexer` is not an instantiable object. To use it in another context, one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ - def run(self, ids, policy_update, **kwargs): + def run(self, ids, policy_update, + next_step=None, **kwargs): """Given a list of ids: - retrieve the content from the storage - execute the indexing computations - store the results (according to policy_update) Args: ids ([bytes]): sha1's identifier list - policy_update ([str]): either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore - them + policy_update (str): either 'update-dups' or 'ignore-dups' to + respectively update duplicates or ignore + them + next_step (dict): a dict in the form expected by + `scheduler.backend.SchedulerBackend.create_tasks` + without `next_run`, plus a `result_name` key. **kwargs: passed to the `index` method """ results = [] try: for sha1 in ids: try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warn('Content %s not found in objstorage' % hashutil.hash_to_hex(sha1)) continue res = self.index(sha1, raw_content, **kwargs) if res: # If no results, skip it results.append(res) self.persist_index_computations(results, policy_update) self.results = results - return self.next_step(results) + return self.next_step(results, task=next_step) except Exception: self.log.exception( 'Problem when reading contents metadata.') if self.rescheduling_task: self.log.warn('Rescheduling batch') self.rescheduling_task.delay(ids, policy_update) class OriginIndexer(BaseIndexer): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Origin indexing using the run method Note: the :class:`OriginIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ - def run(self, ids, policy_update, parse_ids=False, **kwargs): + def run(self, ids, policy_update, + parse_ids=False, next_step=None, **kwargs): """Given a list of origin ids: - retrieve origins from storage - execute the indexing computations - store the results (according to policy_update) Args: ids ([Union[int, Tuple[str, bytes]]]): list of origin ids or (type, url) tuples. - policy_update ([str]): either 'update-dups' or 'ignore-dups' to + policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them - parse_ids ([bool]: If `True`, will try to convert `ids` + parse_ids (bool: If `True`, will try to convert `ids` from a human input to the valid type. + next_step (dict): a dict in the form expected by + `scheduler.backend.SchedulerBackend.create_tasks` + without `next_run`, plus a `result_name` key. **kwargs: passed to the `index` method """ if parse_ids: ids = [ o.split('+', 1) if ':' in o else int(o) # type+url or id for o in ids] results = [] for id_ in ids: if isinstance(id_, (tuple, list)): if len(id_) != 2: raise TypeError('Expected a (type, url) tuple.') (type_, url) = id_ params = {'type': type_, 'url': url} elif isinstance(id_, int): params = {'id': id_} else: raise TypeError('Invalid value in "ids": %r' % id_) origin = self.storage.origin_get(params) if not origin: self.log.warn('Origins %s not found in storage' % list(ids)) continue try: res = self.index(origin, **kwargs) if origin: # If no results, skip it results.append(res) except Exception: self.log.exception( 'Problem when processing origin %s' % id_) self.persist_index_computations(results, policy_update) self.results = results - return self.next_step(results) + return self.next_step(results, task=next_step) class RevisionIndexer(BaseIndexer): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Revision indexing using the run method Note: the :class:`RevisionIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ - def run(self, ids, policy_update): + def run(self, ids, policy_update, next_step=None): """Given a list of sha1_gits: - retrieve revisions from storage - execute the indexing computations - store the results (according to policy_update) Args: - ids ([bytes]): sha1_git's identifier list - policy_update ([str]): either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore - them + ids ([bytes or str]): sha1_git's identifier list + policy_update (str): either 'update-dups' or 'ignore-dups' to + respectively update duplicates or ignore + them """ results = [] + ids = [id_.encode() if isinstance(id_, str) else id_ + for id_ in ids] revs = self.storage.revision_get(ids) for rev in revs: if not rev: self.log.warn('Revisions %s not found in storage' % list(map(hashutil.hash_to_hex, ids))) continue try: res = self.index(rev) if res: # If no results, skip it results.append(res) except Exception: self.log.exception( 'Problem when processing revision') self.persist_index_computations(results, policy_update) self.results = results - return self.next_step(results) + return self.next_step(results, task=next_step) diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py index 0e5a5a4..5a17314 100644 --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,337 +1,334 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging from swh.indexer.indexer import ContentIndexer, RevisionIndexer, OriginIndexer from swh.indexer.metadata_dictionary import MAPPINGS from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_detector import extract_minimal_metadata_dict from swh.indexer.storage import INDEXER_CFG_KEY from swh.model import hashutil class ContentMetadataIndexer(ContentIndexer): """Content-level indexer This indexer is in charge of: - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing translated_metadata by given context - using the metadata_dictionary as the 'swh-metadata-translator' tool - store result in content_metadata table """ CONFIG_BASE_FILENAME = 'indexer/metadata' def __init__(self, tool, config): # twisted way to use the exact same config of RevisionMetadataIndexer # object that uses internally ContentMetadataIndexer self.config = config self.config['tools'] = tool super().__init__() def filter(self, ids): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.content_metadata_missing(( { 'id': sha1, 'indexer_configuration_id': self.tool['id'], } for sha1 in ids )) def index(self, id, data): """Index sha1s' content and store result. Args: id (bytes): content's identifier data (bytes): raw content in bytes Returns: dict: dictionary representing a content_metadata. If the translation wasn't successful the translated_metadata keys will be returned as None """ result = { 'id': id, 'indexer_configuration_id': self.tool['id'], 'translated_metadata': None } try: mapping_name = self.tool['tool_configuration']['context'] result['translated_metadata'] = MAPPINGS[mapping_name] \ .translate(data) # a twisted way to keep result with indexer object for get_results self.results.append(result) except Exception: self.log.exception( "Problem during tool retrieval of metadata translation") return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_metadata, dict with the following keys: - id (bytes): content's identifier (sha1) - translated_metadata (jsonb): detected metadata policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.idx_storage.content_metadata_add( results, conflict_update=(policy_update == 'update-dups')) def get_results(self): """can be called only if run method was called before Returns: list: list of content_metadata entries calculated by current indexer """ return self.results class RevisionMetadataIndexer(RevisionIndexer): """Revision-level indexer This indexer is in charge of: - filtering revisions already indexed in revision_metadata table with defined computation tool - retrieve all entry_files in root directory - use metadata_detector for file_names containing metadata - compute metadata translation if necessary and possible (depends on tool) - send sha1s to content indexing if possible - store the results for revision """ CONFIG_BASE_FILENAME = 'indexer/metadata' ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'swh-metadata-detector', 'version': '0.0.2', 'configuration': { 'type': 'local', 'context': ['NpmMapping', 'CodemetaMapping'] }, }), } ContentMetadataIndexer = ContentMetadataIndexer def prepare(self): super().prepare() self.tool = self.tools[0] def filter(self, sha1_gits): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.revision_metadata_missing(( { 'id': sha1_git, 'indexer_configuration_id': self.tool['id'], } for sha1_git in sha1_gits )) def index(self, rev): """Index rev by processing it and organizing result. use metadata_detector to iterate on filenames - if one filename detected -> sends file to content indexer - if multiple file detected -> translation needed at revision level Args: rev (bytes): revision artifact from storage Returns: dict: dictionary representing a revision_metadata, with keys: - - id (bytes): rev's identifier (sha1_git) + - id (str): rev's identifier (sha1_git) - indexer_configuration_id (bytes): tool used - translated_metadata (bytes): dict of retrieved metadata """ try: result = { - 'id': rev['id'], + 'id': rev['id'].decode(), 'indexer_configuration_id': self.tool['id'], 'translated_metadata': None } root_dir = rev['directory'] dir_ls = self.storage.directory_ls(root_dir, recursive=False) files = [entry for entry in dir_ls if entry['type'] == 'file'] detected_files = detect_metadata(files) result['translated_metadata'] = self.translate_revision_metadata( detected_files) - except Exception: + except Exception as e: self.log.exception( - 'Problem when indexing rev') + 'Problem when indexing rev: %r', e) return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ # TODO: add functions in storage to keep data in revision_metadata self.idx_storage.revision_metadata_add( results, conflict_update=(policy_update == 'update-dups')) def translate_revision_metadata(self, detected_files): """ Determine plan of action to translate metadata when containing one or multiple detected files: Args: detected_files (dict): dictionary mapping context names (e.g., "npm", "authors") to list of sha1 Returns: dict: dict with translated metadata according to the CodeMeta vocabulary """ translated_metadata = [] tool = { 'name': 'swh-metadata-translator', 'version': '0.0.2', 'configuration': { 'type': 'local', 'context': None }, } # TODO: iterate on each context, on each file # -> get raw_contents # -> translate each content config = { INDEXER_CFG_KEY: self.idx_storage, 'objstorage': self.objstorage } for context in detected_files.keys(): tool['configuration']['context'] = context c_metadata_indexer = self.ContentMetadataIndexer(tool, config) # sha1s that are in content_metadata table sha1s_in_storage = [] metadata_generator = self.idx_storage.content_metadata_get( detected_files[context]) for c in metadata_generator: # extracting translated_metadata sha1 = c['id'] sha1s_in_storage.append(sha1) local_metadata = c['translated_metadata'] # local metadata is aggregated if local_metadata: translated_metadata.append(local_metadata) sha1s_filtered = [item for item in detected_files[context] if item not in sha1s_in_storage] if sha1s_filtered: # schedule indexation of content try: c_metadata_indexer.run(sha1s_filtered, policy_update='ignore-dups') # on the fly possibility: results = c_metadata_indexer.get_results() for result in results: local_metadata = result['translated_metadata'] translated_metadata.append(local_metadata) except Exception as e: self.log.warn("""Exception while indexing content""", e) # transform translated_metadata into min set with swh-metadata-detector min_metadata = extract_minimal_metadata_dict(translated_metadata) return min_metadata class OriginMetadataIndexer(OriginIndexer): def filter(self, ids): return ids - def run(self, revisions_metadata, policy_update, *, origin_head_pairs): + def run(self, revisions_metadata, policy_update, *, origin_head): """Expected to be called with the result of RevisionMetadataIndexer as first argument; ie. not a list of ids as other indexers would. Args: * `revisions_metadata` (List[dict]): contains metadata from revisions, along with the respective revision ids. It is passed by RevisionMetadataIndexer via a Celery chain triggered by OriginIndexer.next_step. * `policy_update`: `'ignore-dups'` or `'update-dups'` - * `origin_head_pairs` (List[dict]): list of dictionaries with + * `origin_head` (dict): {str(origin_id): rev_id.encode()} keys `origin_id` and `revision_id`, which is the result of OriginHeadIndexer. """ - origin_head_map = {pair['origin_id']: pair['revision_id'] - for pair in origin_head_pairs} + origin_head_map = {int(origin_id): rev_id + for (origin_id, rev_id) in origin_head.items()} # Fix up the argument order. revisions_metadata has to be the # first argument because of celery.chain; the next line calls # run() with the usual order, ie. origin ids first. return super().run(ids=list(origin_head_map), policy_update=policy_update, revisions_metadata=revisions_metadata, origin_head_map=origin_head_map) def index(self, origin, *, revisions_metadata, origin_head_map): # Get the last revision of the origin. revision_id = origin_head_map[origin['id']] # Get the metadata of that revision, and return it for revision_metadata in revisions_metadata: if revision_metadata['id'] == revision_id: return { 'origin_id': origin['id'], 'metadata': revision_metadata['translated_metadata'], 'from_revision': revision_id, 'indexer_configuration_id': revision_metadata['indexer_configuration_id'], } - # If you get this KeyError with a message like this: - # 'foo' not in [b'foo'] - # you should check you're not using JSON as task serializer raise KeyError('%r not in %r' % (revision_id, [r['id'] for r in revisions_metadata])) def persist_index_computations(self, results, policy_update): self.idx_storage.origin_intrinsic_metadata_add( results, conflict_update=(policy_update == 'update-dups')) @click.command() @click.option('--revs', '-i', help='Default sha1_git to lookup', multiple=True) def main(revs): _git_sha1s = list(map(hashutil.hash_to_bytes, revs)) rev_metadata_indexer = RevisionMetadataIndexer() rev_metadata_indexer.run(_git_sha1s, 'update-dups') if __name__ == '__main__': logging.basicConfig(level=logging.INFO) main() diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py index e21d531..9de1aa0 100644 --- a/swh/indexer/origin_head.py +++ b/swh/indexer/origin_head.py @@ -1,195 +1,217 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import click import logging -from celery import chain - +from swh.scheduler import get_scheduler +from swh.scheduler.utils import create_task_dict from swh.indexer.indexer import OriginIndexer -from swh.indexer.tasks import OriginMetadata, RevisionMetadata class OriginHeadIndexer(OriginIndexer): """Origin-level indexer. This indexer is in charge of looking up the revision that acts as the "head" of an origin. In git, this is usually the commit pointed to by the 'master' branch.""" ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'origin-metadata', 'version': '0.0.1', 'configuration': {}, }), } CONFIG_BASE_FILENAME = 'indexer/origin_head' - revision_metadata_task = RevisionMetadata() - origin_intrinsic_metadata_task = OriginMetadata() + revision_metadata_task = 'revision_metadata' + origin_intrinsic_metadata_task = 'origin_metadata' def filter(self, ids): yield from ids def persist_index_computations(self, results, policy_update): """Do nothing. The indexer's results are not persistent, they should only be piped to another indexer via the orchestrator.""" pass - def next_step(self, results): + def next_step(self, results, task): """Once the head is found, call the RevisionMetadataIndexer on these revisions, then call the OriginMetadataIndexer with both the origin_id and the revision metadata, so it can copy the revision metadata to the origin's metadata. Args: results (Iterable[dict]): Iterable of return values from `index`. """ + super().next_step(results, task) if self.revision_metadata_task is None and \ self.origin_intrinsic_metadata_task is None: return assert self.revision_metadata_task is not None assert self.origin_intrinsic_metadata_task is not None - return chain( - self.revision_metadata_task.s( - ids=[res['revision_id'] for res in results], - policy_update='update-dups'), - self.origin_intrinsic_metadata_task.s( - origin_head_pairs=results, - policy_update='update-dups'), - )() + + # Second task to run after this one: copy the revision's metadata + # to the origin + sub_task = create_task_dict( + self.origin_intrinsic_metadata_task, + 'oneshot', + origin_head={ + str(result['origin_id']): + result['revision_id'].decode() + for result in results}, + policy_update='update-dups', + ) + del sub_task['next_run'] # Not json-serializable + + # First task to run after this one: index the metadata of the + # revision + task = create_task_dict( + self.revision_metadata_task, + 'oneshot', + ids=[res['revision_id'].decode() for res in results], + policy_update='update-dups', + next_step={ + **sub_task, + 'result_name': 'revisions_metadata'}, + ) + if getattr(self, 'scheduler', None): + scheduler = self.scheduler + else: + scheduler = get_scheduler(**self.config['scheduler']) + scheduler.create_tasks([task]) # Dispatch def index(self, origin): origin_id = origin['id'] latest_snapshot = self.storage.snapshot_get_latest(origin_id) method = getattr(self, '_try_get_%s_head' % origin['type'], None) if method is None: method = self._try_get_head_generic rev_id = method(latest_snapshot) if rev_id is None: return None result = { 'origin_id': origin_id, 'revision_id': rev_id, } return result # VCSs def _try_get_vcs_head(self, snapshot): try: if isinstance(snapshot, dict): branches = snapshot['branches'] if branches[b'HEAD']['target_type'] == 'revision': return branches[b'HEAD']['target'] except KeyError: return None _try_get_hg_head = _try_get_git_head = _try_get_vcs_head # Tarballs _archive_filename_re = re.compile( rb'^' rb'(?P.*)[-_]' rb'(?P[0-9]+(\.[0-9])*)' rb'(?P[-+][a-zA-Z0-9.~]+?)?' rb'(?P(\.[a-zA-Z0-9]+)+)' rb'$') @classmethod def _parse_version(cls, filename): """Extracts the release version from an archive filename, to get an ordering whose maximum is likely to be the last version of the software >>> OriginHeadIndexer._parse_version(b'foo') (-inf,) >>> OriginHeadIndexer._parse_version(b'foo.tar.gz') (-inf,) >>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1.tar.gz') (0, 0, 1, 0) >>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1-beta2.tar.gz') (0, 0, 1, -1, 'beta2') >>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1+foobar.tar.gz') (0, 0, 1, 1, 'foobar') """ res = cls._archive_filename_re.match(filename) if res is None: return (float('-infinity'),) version = [int(n) for n in res.group('version').decode().split('.')] if res.group('preversion') is None: version.append(0) else: preversion = res.group('preversion').decode() if preversion.startswith('-'): version.append(-1) version.append(preversion[1:]) elif preversion.startswith('+'): version.append(1) version.append(preversion[1:]) else: assert False, res.group('preversion') return tuple(version) def _try_get_ftp_head(self, snapshot): archive_names = list(snapshot['branches']) max_archive_name = max(archive_names, key=self._parse_version) r = self._try_resolve_target(snapshot['branches'], max_archive_name) return r # Generic def _try_get_head_generic(self, snapshot): # Works on 'deposit', 'svn', and 'pypi'. try: if isinstance(snapshot, dict): branches = snapshot['branches'] except KeyError: return None else: return ( self._try_resolve_target(branches, b'HEAD') or self._try_resolve_target(branches, b'master') ) def _try_resolve_target(self, branches, target_name): try: target = branches[target_name] while target['target_type'] == 'alias': target = branches[target['target']] if target['target_type'] == 'revision': return target['target'] elif target['target_type'] == 'content': return None # TODO elif target['target_type'] == 'directory': return None # TODO elif target['target_type'] == 'release': return None # TODO else: assert False except KeyError: return None @click.command() @click.option('--origins', '-i', help='Origins to lookup, in the "type+url" format', multiple=True) def main(origins): rev_metadata_indexer = OriginHeadIndexer() rev_metadata_indexer.run(origins, 'update-dups', parse_ids=True) if __name__ == '__main__': logging.basicConfig(level=logging.INFO) main() diff --git a/swh/indexer/tests/test_metadata.py b/swh/indexer/tests/test_metadata.py index 56fffc3..b16f741 100644 --- a/swh/indexer/tests/test_metadata.py +++ b/swh/indexer/tests/test_metadata.py @@ -1,384 +1,384 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest import logging from swh.indexer.metadata_dictionary import CROSSWALK_TABLE, MAPPINGS from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_detector import extract_minimal_metadata_dict from swh.indexer.metadata import ContentMetadataIndexer from swh.indexer.metadata import RevisionMetadataIndexer from swh.indexer.tests.test_utils import MockObjStorage, MockStorage from swh.indexer.tests.test_utils import MockIndexerStorage class TestContentMetadataIndexer(ContentMetadataIndexer): """Specific Metadata whose configuration is enough to satisfy the indexing tests. """ def prepare(self): self.config.update({ 'rescheduling_task': None, }) self.idx_storage = MockIndexerStorage() self.log = logging.getLogger('swh.indexer') self.objstorage = MockObjStorage() self.destination_task = None self.rescheduling_task = self.config['rescheduling_task'] self.tools = self.register_tools(self.config['tools']) self.tool = self.tools[0] self.results = [] class TestRevisionMetadataIndexer(RevisionMetadataIndexer): """Specific indexer whose configuration is enough to satisfy the indexing tests. """ ContentMetadataIndexer = TestContentMetadataIndexer def prepare(self): self.config = { 'rescheduling_task': None, 'storage': { 'cls': 'remote', 'args': { 'url': 'http://localhost:9999', } }, 'tools': { 'name': 'swh-metadata-detector', 'version': '0.0.2', 'configuration': { 'type': 'local', 'context': 'NpmMapping' } } } self.storage = MockStorage() self.idx_storage = MockIndexerStorage() self.log = logging.getLogger('swh.indexer') self.objstorage = MockObjStorage() self.destination_task = None self.rescheduling_task = self.config['rescheduling_task'] self.tools = self.register_tools(self.config['tools']) self.tool = self.tools[0] self.results = [] class Metadata(unittest.TestCase): """ Tests metadata_mock_tool tool for Metadata detection """ def setUp(self): """ shows the entire diff in the results """ self.maxDiff = None self.content_tool = { 'name': 'swh-metadata-translator', 'version': '0.0.2', 'configuration': { 'type': 'local', 'context': 'NpmMapping' } } MockIndexerStorage.added_data = [] def test_crosstable(self): self.assertEqual(CROSSWALK_TABLE['NodeJS'], { 'repository': 'codeRepository', 'os': 'operatingSystem', 'cpu': 'processorRequirements', 'engines': 'processorRequirements', 'dependencies': 'softwareRequirements', 'bundleDependencies': 'softwareRequirements', 'bundledDependencies': 'softwareRequirements', 'peerDependencies': 'softwareRequirements', 'author': 'creator', 'author.email': 'email', 'author.name': 'name', 'contributor': 'contributor', 'keywords': 'keywords', 'license': 'license', 'version': 'version', 'description': 'description', 'name': 'name', 'devDependencies': 'softwareSuggestions', 'optionalDependencies': 'softwareSuggestions', 'bugs': 'issueTracker', 'homepage': 'url' }) def test_compute_metadata_none(self): """ testing content empty content is empty should return None """ # given content = b"" # None if no metadata was found or an error occurred declared_metadata = None # when result = MAPPINGS["NpmMapping"].translate(content) # then self.assertEqual(declared_metadata, result) def test_compute_metadata_npm(self): """ testing only computation of metadata with hard_mapping_npm """ # given content = b""" { "name": "test_metadata", "version": "0.0.2", "description": "Simple package.json test for indexer", "repository": { "type": "git", "url": "https://github.com/moranegg/metadata_test" } } """ declared_metadata = { 'name': 'test_metadata', 'version': '0.0.2', 'description': 'Simple package.json test for indexer', 'codeRepository': { 'type': 'git', 'url': 'https://github.com/moranegg/metadata_test' }, 'other': {} } # when result = MAPPINGS["NpmMapping"].translate(content) # then self.assertEqual(declared_metadata, result) def test_extract_minimal_metadata_dict(self): """ Test the creation of a coherent minimal metadata set """ # given metadata_list = [{ 'name': 'test_1', 'version': '0.0.2', 'description': 'Simple package.json test for indexer', 'codeRepository': { 'type': 'git', 'url': 'https://github.com/moranegg/metadata_test' }, 'other': {} }, { 'name': 'test_0_1', 'version': '0.0.2', 'description': 'Simple package.json test for indexer', 'codeRepository': { 'type': 'git', 'url': 'https://github.com/moranegg/metadata_test' }, 'other': {} }, { 'name': 'test_metadata', 'version': '0.0.2', 'author': 'moranegg', 'other': {} }] # when results = extract_minimal_metadata_dict(metadata_list) # then expected_results = { "developmentStatus": None, "version": ['0.0.2'], "operatingSystem": None, "description": ['Simple package.json test for indexer'], "keywords": None, "issueTracker": None, "name": ['test_1', 'test_0_1', 'test_metadata'], "author": ['moranegg'], "relatedLink": None, "url": None, "license": None, "maintainer": None, "email": None, "softwareRequirements": None, "identifier": None, "codeRepository": [{ 'type': 'git', 'url': 'https://github.com/moranegg/metadata_test' }] } self.assertEqual(expected_results, results) def test_index_content_metadata_npm(self): """ testing NPM with package.json - one sha1 uses a file that can't be translated to metadata and should return None in the translated metadata """ # given sha1s = ['26a9f72a7c87cc9205725cfd879f514ff4f3d8d5', 'd4c647f0fc257591cc9ba1722484229780d1c607', '02fb2c89e14f7fab46701478c83779c7beb7b069'] # this metadata indexer computes only metadata for package.json # in npm context with a hard mapping metadata_indexer = TestContentMetadataIndexer( tool=self.content_tool, config={}) # when metadata_indexer.run(sha1s, policy_update='ignore-dups') results = metadata_indexer.idx_storage.added_data expected_results = [('content_metadata', False, [{ 'indexer_configuration_id': 30, 'translated_metadata': { 'other': {}, 'codeRepository': { 'type': 'git', 'url': 'https://github.com/moranegg/metadata_test' }, 'description': 'Simple package.json test for indexer', 'name': 'test_metadata', 'version': '0.0.1' }, 'id': '26a9f72a7c87cc9205725cfd879f514ff4f3d8d5' }, { 'indexer_configuration_id': 30, 'translated_metadata': { 'softwareRequirements': { 'JSONStream': '~1.3.1', 'abbrev': '~1.1.0', 'ansi-regex': '~2.1.1', 'ansicolors': '~0.3.2', 'ansistyles': '~0.1.3' }, 'issueTracker': { 'url': 'https://github.com/npm/npm/issues' }, 'creator': 'Isaac Z. Schlueter (http://blog.izs.me)', 'codeRepository': { 'type': 'git', 'url': 'https://github.com/npm/npm' }, 'description': 'a package manager for JavaScript', 'softwareSuggestions': { 'tacks': '~1.2.6', 'tap': '~10.3.2' }, 'license': 'Artistic-2.0', 'version': '5.0.3', 'other': { 'preferGlobal': True, 'config': { 'publishtest': False } }, 'name': 'npm', 'keywords': [ 'install', 'modules', 'package manager', 'package.json' ], 'url': 'https://docs.npmjs.com/' }, 'id': 'd4c647f0fc257591cc9ba1722484229780d1c607' }, { 'indexer_configuration_id': 30, 'translated_metadata': None, 'id': '02fb2c89e14f7fab46701478c83779c7beb7b069' }])] # The assertion below returns False sometimes because of nested lists self.assertEqual(expected_results, results) def test_detect_metadata_package_json(self): # given df = [{ 'sha1_git': b'abc', 'name': b'index.js', 'target': b'abc', 'length': 897, 'status': 'visible', 'type': 'file', 'perms': 33188, 'dir_id': b'dir_a', 'sha1': b'bcd' }, { 'sha1_git': b'aab', 'name': b'package.json', 'target': b'aab', 'length': 712, 'status': 'visible', 'type': 'file', 'perms': 33188, 'dir_id': b'dir_a', 'sha1': b'cde' }] # when results = detect_metadata(df) expected_results = { 'NpmMapping': [ b'cde' ] } # then self.assertEqual(expected_results, results) def test_revision_metadata_indexer(self): metadata_indexer = TestRevisionMetadataIndexer() sha1_gits = [ b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', ] metadata_indexer.run(sha1_gits, 'update-dups') results = metadata_indexer.idx_storage.added_data expected_results = [('revision_metadata', True, [{ - 'id': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', + 'id': '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', 'translated_metadata': { 'identifier': None, 'maintainer': None, 'url': [ 'https://github.com/librariesio/yarn-parser#readme' ], 'codeRepository': [{ 'type': 'git', 'url': 'git+https://github.com/librariesio/yarn-parser.git' }], 'author': ['Andrew Nesbitt'], 'license': ['AGPL-3.0'], 'version': ['1.0.0'], 'description': [ 'Tiny web service for parsing yarn.lock files' ], 'relatedLink': None, 'developmentStatus': None, 'operatingSystem': None, 'issueTracker': [{ 'url': 'https://github.com/librariesio/yarn-parser/issues' }], 'softwareRequirements': [{ 'express': '^4.14.0', 'yarn': '^0.21.0', 'body-parser': '^1.15.2' }], 'name': ['yarn-parser'], 'keywords': [['yarn', 'parse', 'lock', 'dependencies']], 'email': None }, 'indexer_configuration_id': 7 }])] # then self.assertEqual(expected_results, results) diff --git a/swh/indexer/tests/test_origin_metadata.py b/swh/indexer/tests/test_origin_metadata.py index 84e218b..1b82b64 100644 --- a/swh/indexer/tests/test_origin_metadata.py +++ b/swh/indexer/tests/test_origin_metadata.py @@ -1,126 +1,142 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import time import logging import unittest from celery import task from swh.indexer.metadata import OriginMetadataIndexer from swh.indexer.tests.test_utils import MockObjStorage, MockStorage from swh.indexer.tests.test_utils import MockIndexerStorage from swh.indexer.tests.test_origin_head import TestOriginHeadIndexer from swh.indexer.tests.test_metadata import TestRevisionMetadataIndexer -from swh.scheduler.tests.celery_testing import CeleryTestFixture -from swh.indexer.tests import start_worker_thread +from swh.scheduler.tests.scheduler_testing import SchedulerTestFixture class TestOriginMetadataIndexer(OriginMetadataIndexer): def prepare(self): self.config = { 'storage': { 'cls': 'remote', 'args': { 'url': 'http://localhost:9999', } }, 'tools': { 'name': 'origin-metadata', 'version': '0.0.1', 'configuration': {} } } self.storage = MockStorage() self.idx_storage = MockIndexerStorage() self.log = logging.getLogger('swh.indexer') self.objstorage = MockObjStorage() self.destination_task = None self.tools = self.register_tools(self.config['tools']) self.tool = self.tools[0] self.results = [] @task def revision_metadata_test_task(*args, **kwargs): indexer = TestRevisionMetadataIndexer() indexer.run(*args, **kwargs) return indexer.results @task def origin_intrinsic_metadata_test_task(*args, **kwargs): indexer = TestOriginMetadataIndexer() indexer.run(*args, **kwargs) return indexer.results class TestOriginHeadIndexer(TestOriginHeadIndexer): - revision_metadata_task = revision_metadata_test_task - origin_intrinsic_metadata_task = origin_intrinsic_metadata_test_task + revision_metadata_task = 'revision_metadata_test_task' + origin_intrinsic_metadata_task = 'origin_intrinsic_metadata_test_task' -class TestOriginMetadata(CeleryTestFixture, unittest.TestCase): +class TestOriginMetadata(SchedulerTestFixture, unittest.TestCase): def setUp(self): super().setUp() self.maxDiff = None MockIndexerStorage.added_data = [] + self.add_scheduler_task_type( + 'revision_metadata_test_task', + 'swh.indexer.tests.test_origin_metadata.' + 'revision_metadata_test_task') + self.add_scheduler_task_type( + 'origin_intrinsic_metadata_test_task', + 'swh.indexer.tests.test_origin_metadata.' + 'origin_intrinsic_metadata_test_task') + TestRevisionMetadataIndexer.scheduler = self.scheduler + + def tearDown(self): + del TestRevisionMetadataIndexer.scheduler + super().tearDown() def test_pipeline(self): indexer = TestOriginHeadIndexer() - with start_worker_thread(): - promise = indexer.run( - ["git+https://github.com/librariesio/yarn-parser"], - policy_update='update-dups', - parse_ids=True) - promise.get() + indexer.scheduler = self.scheduler + indexer.run( + ["git+https://github.com/librariesio/yarn-parser"], + policy_update='update-dups', + parse_ids=True) + + self.run_ready_tasks() # Run the first task + time.sleep(0.1) # Give it time to complete and schedule the 2nd one + self.run_ready_tasks() # Run the second task metadata = { 'identifier': None, 'maintainer': None, 'url': [ 'https://github.com/librariesio/yarn-parser#readme' ], 'codeRepository': [{ 'type': 'git', 'url': 'git+https://github.com/librariesio/yarn-parser.git' }], 'author': ['Andrew Nesbitt'], 'license': ['AGPL-3.0'], 'version': ['1.0.0'], 'description': [ 'Tiny web service for parsing yarn.lock files' ], 'relatedLink': None, 'developmentStatus': None, 'operatingSystem': None, 'issueTracker': [{ 'url': 'https://github.com/librariesio/yarn-parser/issues' }], 'softwareRequirements': [{ 'express': '^4.14.0', 'yarn': '^0.21.0', 'body-parser': '^1.15.2' }], 'name': ['yarn-parser'], 'keywords': [['yarn', 'parse', 'lock', 'dependencies']], 'email': None } rev_metadata = { - 'id': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', + 'id': '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', 'translated_metadata': metadata, 'indexer_configuration_id': 7, } origin_metadata = { 'origin_id': 54974445, - 'from_revision': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', + 'from_revision': '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', 'metadata': metadata, 'indexer_configuration_id': 7, } expected_results = [ ('origin_intrinsic_metadata', True, [origin_metadata]), ('revision_metadata', True, [rev_metadata])] results = list(indexer.idx_storage.added_data) self.assertCountEqual(expected_results, results)