diff --git a/swh/indexer/journal_client.py b/swh/indexer/journal_client.py index 4db7389..c8f27ea 100644 --- a/swh/indexer/journal_client.py +++ b/swh/indexer/journal_client.py @@ -1,88 +1,88 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from swh.journal.client import JournalClient from swh.scheduler import get_scheduler from swh.scheduler.utils import create_task_dict class IndexerJournalClient(JournalClient): """Client in charge of listing new received origins and origin_visits in the swh journal. """ CONFIG_BASE_FILENAME = 'indexer/journal_client' ADDITIONAL_CONFIG = { 'scheduler': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5008/', } }), 'origin_visit_tasks': ('List[dict]', [ { - 'type': 'indexer_full_origin_metadata', + 'type': 'indexer_origin_metadata', 'kwargs': { 'policy_update': 'update-dups', 'parse_ids': False, } } ]), } def __init__(self): super().__init__(extra_configuration={ 'object_types': ['origin_visit'], }) self.scheduler = get_scheduler(**self.config['scheduler']) logging.info( 'Starting indexer journal client with config %r', self.config) def process_objects(self, messages): assert set(messages) == {'origin_visit'}, set(messages) for origin_visit in messages['origin_visit']: self.process_origin_visit(origin_visit) def process_origin_visit(self, origin_visit): task_dicts = [] logging.debug('processing origin visit %r', origin_visit) if origin_visit[b'status'] == b'full': for task_config in self.config['origin_visit_tasks']: logging.info( 'Scheduling %s for visit of origin %d', task_config['type'], origin_visit[b'origin']) task_dicts.append(create_task_dict( task_config['type'], 'oneshot', [origin_visit[b'origin']], **task_config['kwargs'], )) else: logging.debug('status is not "full", ignoring.') if task_dicts: self.scheduler.create_tasks(task_dicts) if __name__ == '__main__': logging.basicConfig( level=logging.INFO, format='%(asctime)s %(process)d %(levelname)s %(message)s' ) import click @click.command() def main(): """Log the new received origin and origin_visits. """ IndexerJournalClient().process() main() diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py index a2a18b6..696754d 100644 --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,373 +1,316 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click -import itertools import logging from copy import deepcopy from swh.indexer.indexer import ContentIndexer, RevisionIndexer, OriginIndexer from swh.indexer.origin_head import OriginHeadIndexer from swh.indexer.metadata_dictionary import MAPPINGS from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_detector import extract_minimal_metadata_dict from swh.indexer.storage import INDEXER_CFG_KEY from swh.model import hashutil class ContentMetadataIndexer(ContentIndexer): """Content-level indexer This indexer is in charge of: - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing translated_metadata by given context - using the metadata_dictionary as the 'swh-metadata-translator' tool - store result in content_metadata table """ # Note: This used when the content metadata indexer is used alone # (not the case for example in the case of the RevisionMetadataIndexer) CONFIG_BASE_FILENAME = 'indexer/content_metadata' def filter(self, ids): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.content_metadata_missing(( { 'id': sha1, 'indexer_configuration_id': self.tool['id'], } for sha1 in ids )) def index(self, id, data, log_suffix='unknown revision'): """Index sha1s' content and store result. Args: id (bytes): content's identifier data (bytes): raw content in bytes Returns: dict: dictionary representing a content_metadata. If the translation wasn't successful the translated_metadata keys will be returned as None """ result = { 'id': id, 'indexer_configuration_id': self.tool['id'], 'translated_metadata': None } try: mapping_name = self.tool['tool_configuration']['context'] log_suffix += ', content_id=%s' % hashutil.hash_to_hex(id) result['translated_metadata'] = \ MAPPINGS[mapping_name](log_suffix).translate(data) except Exception: self.log.exception( "Problem during metadata translation " "for content %s" % hashutil.hash_to_hex(id)) if result['translated_metadata'] is None: return None return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_metadata, dict with the following keys: - id (bytes): content's identifier (sha1) - translated_metadata (jsonb): detected metadata policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.idx_storage.content_metadata_add( results, conflict_update=(policy_update == 'update-dups')) class RevisionMetadataIndexer(RevisionIndexer): """Revision-level indexer This indexer is in charge of: - filtering revisions already indexed in revision_metadata table with defined computation tool - retrieve all entry_files in root directory - use metadata_detector for file_names containing metadata - compute metadata translation if necessary and possible (depends on tool) - send sha1s to content indexing if possible - store the results for revision """ CONFIG_BASE_FILENAME = 'indexer/revision_metadata' ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'swh-metadata-detector', 'version': '0.0.2', 'configuration': { 'type': 'local', 'context': list(MAPPINGS), }, }), } def filter(self, sha1_gits): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.revision_metadata_missing(( { 'id': sha1_git, 'indexer_configuration_id': self.tool['id'], } for sha1_git in sha1_gits )) def index(self, rev): """Index rev by processing it and organizing result. use metadata_detector to iterate on filenames - if one filename detected -> sends file to content indexer - if multiple file detected -> translation needed at revision level Args: rev (dict): revision artifact from storage Returns: dict: dictionary representing a revision_metadata, with keys: - id (str): rev's identifier (sha1_git) - indexer_configuration_id (bytes): tool used - translated_metadata: dict of retrieved metadata """ result = { 'id': rev['id'], 'indexer_configuration_id': self.tool['id'], 'translated_metadata': None } try: root_dir = rev['directory'] dir_ls = self.storage.directory_ls(root_dir, recursive=False) files = [entry for entry in dir_ls if entry['type'] == 'file'] detected_files = detect_metadata(files) result['translated_metadata'] = self.translate_revision_metadata( detected_files, log_suffix='revision=%s' % hashutil.hash_to_hex(rev['id']) ) except Exception as e: self.log.exception( 'Problem when indexing rev: %r', e) return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ # TODO: add functions in storage to keep data in revision_metadata self.idx_storage.revision_metadata_add( results, conflict_update=(policy_update == 'update-dups')) def translate_revision_metadata(self, detected_files, log_suffix): """ Determine plan of action to translate metadata when containing one or multiple detected files: Args: detected_files (dict): dictionary mapping context names (e.g., "npm", "authors") to list of sha1 Returns: dict: dict with translated metadata according to the CodeMeta vocabulary """ translated_metadata = [] tool = { 'name': 'swh-metadata-translator', 'version': '0.0.2', 'configuration': { 'type': 'local', 'context': None }, } # TODO: iterate on each context, on each file # -> get raw_contents # -> translate each content config = { k: self.config[k] for k in [INDEXER_CFG_KEY, 'objstorage', 'storage'] } config['tools'] = [tool] for context in detected_files.keys(): cfg = deepcopy(config) cfg['tools'][0]['configuration']['context'] = context c_metadata_indexer = ContentMetadataIndexer(config=cfg) # sha1s that are in content_metadata table sha1s_in_storage = [] metadata_generator = self.idx_storage.content_metadata_get( detected_files[context]) for c in metadata_generator: # extracting translated_metadata sha1 = c['id'] sha1s_in_storage.append(sha1) local_metadata = c['translated_metadata'] # local metadata is aggregated if local_metadata: translated_metadata.append(local_metadata) sha1s_filtered = [item for item in detected_files[context] if item not in sha1s_in_storage] if sha1s_filtered: # content indexing try: c_metadata_indexer.run(sha1s_filtered, policy_update='ignore-dups', log_suffix=log_suffix) # on the fly possibility: for result in c_metadata_indexer.results: local_metadata = result['translated_metadata'] translated_metadata.append(local_metadata) except Exception: self.log.exception( "Exception while indexing metadata on contents") # transform translated_metadata into min set with swh-metadata-detector min_metadata = extract_minimal_metadata_dict(translated_metadata) return min_metadata class OriginMetadataIndexer(OriginIndexer): - CONFIG_BASE_FILENAME = 'indexer/origin_intrinsic_metadata' - - ADDITIONAL_CONFIG = { - 'tools': ('list', []) - } - - USE_TOOLS = False - - def run(self, origin_head, policy_update): - """Expected to be called with the result of RevisionMetadataIndexer - as first argument; ie. not a list of ids as other indexers would. - - Args: - origin_head (dict): {str(origin_id): rev_id} - keys `origin_id` and `revision_id`, which is the result - of OriginHeadIndexer. - policy_update (str): `'ignore-dups'` or `'update-dups'` - """ - origin_head_map = {origin_id: hashutil.hash_to_bytes(rev_id) - for (origin_id, rev_id) in origin_head.items()} - - # Fix up the argument order. revisions_metadata has to be the - # first argument because of celery.chain; the next line calls - # run() with the usual order, ie. origin ids first. - return super().run(ids=list(origin_head_map), - policy_update=policy_update, - parse_ids=False, - origin_head_map=origin_head_map) - - def index(self, origin, *, origin_head_map): - # Get the last revision of the origin. - revision_id = origin_head_map[str(origin['id'])] - - revision_metadata = self.idx_storage \ - .revision_metadata_get([revision_id]) - - results = [] - for item in revision_metadata: - assert item['id'] == revision_id - # Get the metadata of that revision, and return it - results.append({ - 'origin_id': origin['id'], - 'metadata': item['translated_metadata'], - 'from_revision': revision_id, - 'indexer_configuration_id': - item['tool']['id'], - }) - return results - - def persist_index_computations(self, results, policy_update): - self.idx_storage.origin_intrinsic_metadata_add( - list(itertools.chain(*results)), - conflict_update=(policy_update == 'update-dups')) - - -class FullOriginMetadataIndexer(OriginIndexer): CONFIG_BASE_FILENAME = 'indexer/full_origin_intrinsic_metadata' ADDITIONAL_CONFIG = { 'tools': ('list', []) } USE_TOOLS = False def __init__(self): super().__init__() self.origin_head_indexer = OriginHeadIndexer() self.revision_metadata_indexer = RevisionMetadataIndexer() def index(self, origin): head_result = self.origin_head_indexer.index(origin) if not head_result: return rev_id = head_result['revision_id'] rev = list(self.storage.revision_get([rev_id])) if not rev: self.warning('Missing head revision %s of origin %r', (hashutil.hash_to_bytes(rev_id), origin)) return assert len(rev) == 1 rev = rev[0] rev_metadata = self.revision_metadata_indexer.index(rev) orig_metadata = { 'from_revision': rev_metadata['id'], 'origin_id': origin['id'], 'metadata': rev_metadata['translated_metadata'], 'indexer_configuration_id': rev_metadata['indexer_configuration_id'], } return (orig_metadata, rev_metadata) def persist_index_computations(self, results, policy_update): self.idx_storage.revision_metadata_add( [rev_item for (orig_item, rev_item) in results], conflict_update=(policy_update == 'update-dups')) self.idx_storage.origin_intrinsic_metadata_add( [orig_item for (orig_item, rev_item) in results], conflict_update=(policy_update == 'update-dups')) @click.command() @click.option('--revs', '-i', help='Default sha1_git to lookup', multiple=True) def main(revs): _git_sha1s = list(map(hashutil.hash_to_bytes, revs)) rev_metadata_indexer = RevisionMetadataIndexer() rev_metadata_indexer.run(_git_sha1s, 'update-dups') if __name__ == '__main__': logging.basicConfig(level=logging.INFO) main() diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py index ce63708..0a95cc9 100644 --- a/swh/indexer/origin_head.py +++ b/swh/indexer/origin_head.py @@ -1,218 +1,162 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import click import logging -from swh.scheduler import get_scheduler -from swh.scheduler.utils import create_task_dict from swh.indexer.indexer import OriginIndexer -from swh.model.hashutil import hash_to_hex - class OriginHeadIndexer(OriginIndexer): """Origin-level indexer. This indexer is in charge of looking up the revision that acts as the "head" of an origin. In git, this is usually the commit pointed to by the 'master' branch.""" ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'origin-metadata', 'version': '0.0.1', 'configuration': {}, }), - 'tasks': ('dict', { - 'revision_metadata': 'revision_metadata', - 'origin_intrinsic_metadata': 'origin_metadata', - }) } CONFIG_BASE_FILENAME = 'indexer/origin_head' def persist_index_computations(self, results, policy_update): """Do nothing. The indexer's results are not persistent, they should only be piped to another indexer.""" pass - def next_step(self, results, task): - """Once the head is found, call the RevisionMetadataIndexer - on these revisions, then call the OriginMetadataIndexer with - both the origin_id and the revision metadata, so it can copy the - revision metadata to the origin's metadata. - - Args: - results (Iterable[dict]): Iterable of return values from `index`. - - """ - super().next_step(results, task) - revision_metadata_task = self.config['tasks']['revision_metadata'] - origin_intrinsic_metadata_task = self.config['tasks'][ - 'origin_intrinsic_metadata'] - if revision_metadata_task is None and \ - origin_intrinsic_metadata_task is None: - return - assert revision_metadata_task is not None - assert origin_intrinsic_metadata_task is not None - - # Second task to run after this one: copy the revision's metadata - # to the origin - sub_task = create_task_dict( - origin_intrinsic_metadata_task, - 'oneshot', - origin_head={ - str(result['origin_id']): - hash_to_hex(result['revision_id']) - for result in results}, - policy_update='update-dups', - ) - del sub_task['next_run'] # Not json-serializable - - # First task to run after this one: index the metadata of the - # revision - task = create_task_dict( - revision_metadata_task, - 'oneshot', - ids=[hash_to_hex(res['revision_id']) for res in results], - policy_update='update-dups', - next_step=sub_task, - ) - if getattr(self, 'scheduler', None): - scheduler = self.scheduler - else: - scheduler = get_scheduler(**self.config['scheduler']) - scheduler.create_tasks([task]) - # Dispatch def index(self, origin): origin_id = origin['id'] latest_snapshot = self.storage.snapshot_get_latest(origin_id) method = getattr(self, '_try_get_%s_head' % origin['type'], None) if method is None: method = self._try_get_head_generic rev_id = method(latest_snapshot) if rev_id is None: return None result = { 'origin_id': origin_id, 'revision_id': rev_id, } return result # VCSs def _try_get_vcs_head(self, snapshot): try: if isinstance(snapshot, dict): branches = snapshot['branches'] if branches[b'HEAD']['target_type'] == 'revision': return branches[b'HEAD']['target'] except KeyError: return None _try_get_hg_head = _try_get_git_head = _try_get_vcs_head # Tarballs _archive_filename_re = re.compile( rb'^' rb'(?P.*)[-_]' rb'(?P[0-9]+(\.[0-9])*)' rb'(?P[-+][a-zA-Z0-9.~]+?)?' rb'(?P(\.[a-zA-Z0-9]+)+)' rb'$') @classmethod def _parse_version(cls, filename): """Extracts the release version from an archive filename, to get an ordering whose maximum is likely to be the last version of the software >>> OriginHeadIndexer._parse_version(b'foo') (-inf,) >>> OriginHeadIndexer._parse_version(b'foo.tar.gz') (-inf,) >>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1.tar.gz') (0, 0, 1, 0) >>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1-beta2.tar.gz') (0, 0, 1, -1, 'beta2') >>> OriginHeadIndexer._parse_version(b'gnu-hello-0.0.1+foobar.tar.gz') (0, 0, 1, 1, 'foobar') """ res = cls._archive_filename_re.match(filename) if res is None: return (float('-infinity'),) version = [int(n) for n in res.group('version').decode().split('.')] if res.group('preversion') is None: version.append(0) else: preversion = res.group('preversion').decode() if preversion.startswith('-'): version.append(-1) version.append(preversion[1:]) elif preversion.startswith('+'): version.append(1) version.append(preversion[1:]) else: assert False, res.group('preversion') return tuple(version) def _try_get_ftp_head(self, snapshot): archive_names = list(snapshot['branches']) max_archive_name = max(archive_names, key=self._parse_version) r = self._try_resolve_target(snapshot['branches'], max_archive_name) return r # Generic def _try_get_head_generic(self, snapshot): # Works on 'deposit', 'svn', and 'pypi'. try: if isinstance(snapshot, dict): branches = snapshot['branches'] except KeyError: return None else: return ( self._try_resolve_target(branches, b'HEAD') or self._try_resolve_target(branches, b'master') ) def _try_resolve_target(self, branches, target_name): try: target = branches[target_name] while target['target_type'] == 'alias': target = branches[target['target']] if target['target_type'] == 'revision': return target['target'] elif target['target_type'] == 'content': return None # TODO elif target['target_type'] == 'directory': return None # TODO elif target['target_type'] == 'release': return None # TODO else: assert False except KeyError: return None @click.command() @click.option('--origins', '-i', help='Origins to lookup, in the "type+url" format', multiple=True) def main(origins): rev_metadata_indexer = OriginHeadIndexer() rev_metadata_indexer.run(origins) if __name__ == '__main__': logging.basicConfig(level=logging.INFO) main() diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py index ecc6daa..8c08675 100644 --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -1,85 +1,79 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from celery import current_app as app from .mimetype import MimetypeIndexer, MimetypeRangeIndexer from .language import LanguageIndexer from .ctags import CtagsIndexer from .fossology_license import ( FossologyLicenseIndexer, FossologyLicenseRangeIndexer ) from .rehash import RecomputeChecksums from .metadata import ( - RevisionMetadataIndexer, OriginMetadataIndexer, FullOriginMetadataIndexer, + RevisionMetadataIndexer, OriginMetadataIndexer ) from .origin_head import OriginHeadIndexer @app.task(name=__name__ + '.RevisionMetadata') def revision_metadata(*args, **kwargs): results = RevisionMetadataIndexer().run(*args, **kwargs) return getattr(results, 'results', results) @app.task(name=__name__ + '.OriginMetadata') def origin_metadata(*args, **kwargs): results = OriginMetadataIndexer().run(*args, **kwargs) return getattr(results, 'results', results) -@app.task(name=__name__ + '.FullOriginMetadata') -def full_origin_metadata(*args, **kwargs): - results = FullOriginMetadataIndexer().run(*args, **kwargs) - return getattr(results, 'results', results) - - @app.task(name=__name__ + '.OriginHead') def origin_head(*args, **kwargs): results = OriginHeadIndexer().run(*args, **kwargs) return getattr(results, 'results', results) @app.task(name=__name__ + '.ContentLanguage') def content_language(*args, **kwargs): results = LanguageIndexer().run(*args, **kwargs) return getattr(results, 'results', results) @app.task(name=__name__ + '.Ctags') def ctags(*args, **kwargs): results = CtagsIndexer().run(*args, **kwargs) return getattr(results, 'results', results) @app.task(name=__name__ + '.ContentFossologyLicense') def fossology_license(*args, **kwargs): results = FossologyLicenseIndexer().run(*args, **kwargs) return getattr(results, 'results', results) @app.task(name=__name__ + '.RecomputeChecksums') def recompute_checksums(*args, **kwargs): results = RecomputeChecksums().run(*args, **kwargs) return getattr(results, 'results', results) @app.task(name=__name__ + '.ContentMimetype') def mimetype(*args, **kwargs): results = MimetypeIndexer().run(*args, **kwargs) return {'status': 'eventful' if results else 'uneventful'} @app.task(name=__name__ + '.ContentRangeMimetype') def range_mimetype(*args, **kwargs): results = MimetypeRangeIndexer(*args, **kwargs) return {'status': 'eventful' if results else 'uneventful'} @app.task(name=__name__ + '.ContentRangeFossologyLicense') def range_license(*args, **kwargs): results = FossologyLicenseRangeIndexer(*args, **kwargs) return {'status': 'eventful' if results else 'uneventful'} diff --git a/swh/indexer/tests/tasks.py b/swh/indexer/tests/tasks.py index cd5a425..48656a7 100644 --- a/swh/indexer/tests/tasks.py +++ b/swh/indexer/tests/tasks.py @@ -1,48 +1,53 @@ from celery import current_app as app from swh.indexer.metadata import ( OriginMetadataIndexer, RevisionMetadataIndexer ) +from .test_origin_head import OriginHeadTestIndexer from .test_metadata import ContentMetadataTestIndexer from .utils import BASE_TEST_CONFIG class RevisionMetadataTestIndexer(RevisionMetadataIndexer): """Specific indexer whose configuration is enough to satisfy the indexing tests. """ ContentMetadataIndexer = ContentMetadataTestIndexer def parse_config_file(self, *args, **kwargs): return { **BASE_TEST_CONFIG, 'tools': { 'name': 'swh-metadata-detector', 'version': '0.0.2', 'configuration': { 'type': 'local', 'context': 'NpmMapping' } } } class OriginMetadataTestIndexer(OriginMetadataIndexer): def parse_config_file(self, *args, **kwargs): return { **BASE_TEST_CONFIG, 'tools': [] } + def _prepare_sub_indexers(self): + self.origin_head_indexer = OriginHeadTestIndexer() + self.revision_metadata_indexer = RevisionMetadataTestIndexer() + @app.task def revision_metadata(*args, **kwargs): indexer = RevisionMetadataTestIndexer() indexer.run(*args, **kwargs) print('REV RESULT=', indexer.results) @app.task def origin_intrinsic_metadata(*args, **kwargs): indexer = OriginMetadataTestIndexer() indexer.run(*args, **kwargs) diff --git a/swh/indexer/tests/test_origin_metadata.py b/swh/indexer/tests/test_origin_metadata.py index 24b611b..d46125b 100644 --- a/swh/indexer/tests/test_origin_metadata.py +++ b/swh/indexer/tests/test_origin_metadata.py @@ -1,217 +1,113 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest -from celery.result import AsyncResult from unittest import mock from swh.objstorage.objstorage_in_memory import InMemoryObjStorage from swh.model.hashutil import hash_to_bytes -from swh.scheduler.celery_backend.runner import run_ready_tasks from swh.storage.in_memory import Storage -from swh.indexer.metadata import ( - OriginMetadataIndexer, RevisionMetadataIndexer, - FullOriginMetadataIndexer -) -from swh.indexer.origin_head import OriginHeadIndexer +from swh.indexer.metadata import OriginMetadataIndexer from swh.indexer.storage.in_memory import IndexerStorage from .utils import fill_storage, fill_obj_storage, BASE_TEST_CONFIG from .test_metadata import REVISION_METADATA_CONFIG ORIGIN_HEAD_CONFIG = { **BASE_TEST_CONFIG, 'tools': { 'name': 'origin-metadata', 'version': '0.0.1', 'configuration': {}, }, 'tasks': { 'revision_metadata': 'revision_metadata', 'origin_intrinsic_metadata': 'origin_intrinsic_metadata', } } @pytest.mark.db -@mock.patch('swh.indexer.metadata.RevisionMetadataIndexer.parse_config_file') -@mock.patch('swh.indexer.origin_head.OriginHeadIndexer.parse_config_file') -@mock.patch('swh.indexer.storage.in_memory.IndexerStorage') -@mock.patch('swh.storage.in_memory.Storage') -def test_pipeline(storage_mock, idx_storage_mock, - origin_head_parse_config, revision_metadata_parse_config, - swh_app, celery_session_worker, indexer_scheduler): - scheduler = indexer_scheduler - # Always returns the same instance of the idx storage, because - # this function is called by each of the three indexers. - objstorage = InMemoryObjStorage() - storage = Storage() - idx_storage = IndexerStorage() - - origin_head_parse_config.return_value = ORIGIN_HEAD_CONFIG - revision_metadata_parse_config.return_value = REVISION_METADATA_CONFIG - storage_mock.return_value = storage - idx_storage_mock.return_value = idx_storage - - fill_obj_storage(objstorage) - fill_storage(storage) - - # TODO: find a better way to share the ContentMetadataIndexer use - # the same objstorage instance. - import swh.objstorage - old_inmem_objstorage = swh.objstorage._STORAGE_CLASSES['memory'] - swh.objstorage._STORAGE_CLASSES['memory'] = lambda: objstorage - try: - RevisionMetadataIndexer.scheduler = scheduler - OriginMetadataIndexer.scheduler = scheduler - indexer = OriginHeadIndexer() - indexer.scheduler = scheduler - indexer.run(["git+https://github.com/librariesio/yarn-parser"]) - tasks = [] - - tasks.extend(run_ready_tasks(scheduler, swh_app)) # Run the first task - # Wait for the task to complete and schedule the 2nd one - task = [x for x in tasks if x['task'] == 1] - assert len(task) == 1 - promise = AsyncResult(id=task[0]['backend_id']) - promise.wait() - - tasks.extend(run_ready_tasks(scheduler, swh_app)) # Run the 2nd task - task = [x for x in tasks if x['task'] == 2] - assert len(task) == 1 - promise = AsyncResult(id=task[0]['backend_id']) - promise.wait() - finally: - swh.objstorage._STORAGE_CLASSES['memory'] = old_inmem_objstorage - del RevisionMetadataIndexer.scheduler - del OriginMetadataIndexer.scheduler - - origin = storage.origin_get({ - 'type': 'git', - 'url': 'https://github.com/librariesio/yarn-parser'}) - rev_id = hash_to_bytes('8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f') - - metadata = { - '@context': 'https://doi.org/10.5063/schema/codemeta-2.0', - 'url': - 'https://github.com/librariesio/yarn-parser#readme', - 'codeRepository': - 'git+git+https://github.com/librariesio/yarn-parser.git', - 'author': [{ - 'type': 'Person', - 'name': 'Andrew Nesbitt' - }], - 'license': 'https://spdx.org/licenses/AGPL-3.0', - 'version': '1.0.0', - 'description': - 'Tiny web service for parsing yarn.lock files', - 'issueTracker': - 'https://github.com/librariesio/yarn-parser/issues', - 'name': 'yarn-parser', - 'keywords': ['yarn', 'parse', 'lock', 'dependencies'], - } - rev_metadata = { - 'id': rev_id, - 'translated_metadata': metadata, - } - origin_metadata = { - 'origin_id': origin['id'], - 'from_revision': rev_id, - 'metadata': metadata, - } - - results = list(indexer.idx_storage.revision_metadata_get([rev_id])) - for result in results: - del result['tool'] - assert results == [rev_metadata] - - results = list(indexer.idx_storage.origin_intrinsic_metadata_get([ - origin['id']])) - for result in results: - del result['tool'] - assert results == [origin_metadata] - - @mock.patch('swh.indexer.metadata.RevisionMetadataIndexer.parse_config_file') @mock.patch('swh.indexer.origin_head.OriginHeadIndexer.parse_config_file') @mock.patch('swh.indexer.storage.in_memory.IndexerStorage') @mock.patch('swh.storage.in_memory.Storage') def test_full_origin_metadata_indexer( storage_mock, idx_storage_mock, origin_head_parse_config, revision_metadata_parse_config): # Always returns the same instance of the idx storage, because # this function is called by each of the three indexers. objstorage = InMemoryObjStorage() storage = Storage() idx_storage = IndexerStorage() origin_head_parse_config.return_value = ORIGIN_HEAD_CONFIG revision_metadata_parse_config.return_value = REVISION_METADATA_CONFIG storage_mock.return_value = storage idx_storage_mock.return_value = idx_storage fill_obj_storage(objstorage) fill_storage(storage) # TODO: find a better way to share the ContentMetadataIndexer use # the same objstorage instance. import swh.objstorage old_inmem_objstorage = swh.objstorage._STORAGE_CLASSES['memory'] swh.objstorage._STORAGE_CLASSES['memory'] = lambda: objstorage try: - indexer = FullOriginMetadataIndexer() + indexer = OriginMetadataIndexer() indexer.storage = storage indexer.idx_storage = idx_storage indexer.run(["git+https://github.com/librariesio/yarn-parser"]) finally: swh.objstorage._STORAGE_CLASSES['memory'] = old_inmem_objstorage origin = storage.origin_get({ 'type': 'git', 'url': 'https://github.com/librariesio/yarn-parser'}) rev_id = hash_to_bytes('8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f') metadata = { '@context': 'https://doi.org/10.5063/schema/codemeta-2.0', 'url': 'https://github.com/librariesio/yarn-parser#readme', 'codeRepository': 'git+git+https://github.com/librariesio/yarn-parser.git', 'author': [{ 'type': 'Person', 'name': 'Andrew Nesbitt' }], 'license': 'https://spdx.org/licenses/AGPL-3.0', 'version': '1.0.0', 'description': 'Tiny web service for parsing yarn.lock files', 'issueTracker': 'https://github.com/librariesio/yarn-parser/issues', 'name': 'yarn-parser', 'keywords': ['yarn', 'parse', 'lock', 'dependencies'], } rev_metadata = { 'id': rev_id, 'translated_metadata': metadata, } origin_metadata = { 'origin_id': origin['id'], 'from_revision': rev_id, 'metadata': metadata, } results = list(indexer.idx_storage.revision_metadata_get([rev_id])) for result in results: del result['tool'] assert results == [rev_metadata] results = list(indexer.idx_storage.origin_intrinsic_metadata_get([ origin['id']])) for result in results: del result['tool'] assert results == [origin_metadata]