diff --git a/swh/indexer/journal_client.py b/swh/indexer/journal_client.py --- a/swh/indexer/journal_client.py +++ b/swh/indexer/journal_client.py @@ -26,7 +26,7 @@ }), 'origin_visit_tasks': ('List[dict]', [ { - 'type': 'indexer_full_origin_metadata', + 'type': 'indexer_origin_metadata', 'kwargs': { 'policy_update': 'update-dups', 'parse_ids': False, diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -4,7 +4,6 @@ # See top-level LICENSE file for more information import click -import itertools import logging from copy import deepcopy @@ -257,62 +256,6 @@ class OriginMetadataIndexer(OriginIndexer): - CONFIG_BASE_FILENAME = 'indexer/origin_intrinsic_metadata' - - ADDITIONAL_CONFIG = { - 'tools': ('list', []) - } - - USE_TOOLS = False - - def run(self, origin_head, policy_update): - """Expected to be called with the result of RevisionMetadataIndexer - as first argument; ie. not a list of ids as other indexers would. - - Args: - origin_head (dict): {str(origin_id): rev_id} - keys `origin_id` and `revision_id`, which is the result - of OriginHeadIndexer. - policy_update (str): `'ignore-dups'` or `'update-dups'` - """ - origin_head_map = {origin_id: hashutil.hash_to_bytes(rev_id) - for (origin_id, rev_id) in origin_head.items()} - - # Fix up the argument order. revisions_metadata has to be the - # first argument because of celery.chain; the next line calls - # run() with the usual order, ie. origin ids first. - return super().run(ids=list(origin_head_map), - policy_update=policy_update, - parse_ids=False, - origin_head_map=origin_head_map) - - def index(self, origin, *, origin_head_map): - # Get the last revision of the origin. - revision_id = origin_head_map[str(origin['id'])] - - revision_metadata = self.idx_storage \ - .revision_metadata_get([revision_id]) - - results = [] - for item in revision_metadata: - assert item['id'] == revision_id - # Get the metadata of that revision, and return it - results.append({ - 'origin_id': origin['id'], - 'metadata': item['translated_metadata'], - 'from_revision': revision_id, - 'indexer_configuration_id': - item['tool']['id'], - }) - return results - - def persist_index_computations(self, results, policy_update): - self.idx_storage.origin_intrinsic_metadata_add( - list(itertools.chain(*results)), - conflict_update=(policy_update == 'update-dups')) - - -class FullOriginMetadataIndexer(OriginIndexer): CONFIG_BASE_FILENAME = 'indexer/full_origin_intrinsic_metadata' ADDITIONAL_CONFIG = { diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py --- a/swh/indexer/origin_head.py +++ b/swh/indexer/origin_head.py @@ -7,12 +7,8 @@ import click import logging -from swh.scheduler import get_scheduler -from swh.scheduler.utils import create_task_dict from swh.indexer.indexer import OriginIndexer -from swh.model.hashutil import hash_to_hex - class OriginHeadIndexer(OriginIndexer): """Origin-level indexer. @@ -28,10 +24,6 @@ 'version': '0.0.1', 'configuration': {}, }), - 'tasks': ('dict', { - 'revision_metadata': 'revision_metadata', - 'origin_intrinsic_metadata': 'origin_metadata', - }) } CONFIG_BASE_FILENAME = 'indexer/origin_head' @@ -41,54 +33,6 @@ should only be piped to another indexer.""" pass - def next_step(self, results, task): - """Once the head is found, call the RevisionMetadataIndexer - on these revisions, then call the OriginMetadataIndexer with - both the origin_id and the revision metadata, so it can copy the - revision metadata to the origin's metadata. - - Args: - results (Iterable[dict]): Iterable of return values from `index`. - - """ - super().next_step(results, task) - revision_metadata_task = self.config['tasks']['revision_metadata'] - origin_intrinsic_metadata_task = self.config['tasks'][ - 'origin_intrinsic_metadata'] - if revision_metadata_task is None and \ - origin_intrinsic_metadata_task is None: - return - assert revision_metadata_task is not None - assert origin_intrinsic_metadata_task is not None - - # Second task to run after this one: copy the revision's metadata - # to the origin - sub_task = create_task_dict( - origin_intrinsic_metadata_task, - 'oneshot', - origin_head={ - str(result['origin_id']): - hash_to_hex(result['revision_id']) - for result in results}, - policy_update='update-dups', - ) - del sub_task['next_run'] # Not json-serializable - - # First task to run after this one: index the metadata of the - # revision - task = create_task_dict( - revision_metadata_task, - 'oneshot', - ids=[hash_to_hex(res['revision_id']) for res in results], - policy_update='update-dups', - next_step=sub_task, - ) - if getattr(self, 'scheduler', None): - scheduler = self.scheduler - else: - scheduler = get_scheduler(**self.config['scheduler']) - scheduler.create_tasks([task]) - # Dispatch def index(self, origin): diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -14,7 +14,7 @@ ) from .rehash import RecomputeChecksums from .metadata import ( - RevisionMetadataIndexer, OriginMetadataIndexer, FullOriginMetadataIndexer, + RevisionMetadataIndexer, OriginMetadataIndexer ) from .origin_head import OriginHeadIndexer @@ -31,12 +31,6 @@ return getattr(results, 'results', results) -@app.task(name=__name__ + '.FullOriginMetadata') -def full_origin_metadata(*args, **kwargs): - results = FullOriginMetadataIndexer().run(*args, **kwargs) - return getattr(results, 'results', results) - - @app.task(name=__name__ + '.OriginHead') def origin_head(*args, **kwargs): results = OriginHeadIndexer().run(*args, **kwargs) diff --git a/swh/indexer/tests/tasks.py b/swh/indexer/tests/tasks.py --- a/swh/indexer/tests/tasks.py +++ b/swh/indexer/tests/tasks.py @@ -3,6 +3,7 @@ from swh.indexer.metadata import ( OriginMetadataIndexer, RevisionMetadataIndexer ) +from .test_origin_head import OriginHeadTestIndexer from .test_metadata import ContentMetadataTestIndexer from .utils import BASE_TEST_CONFIG @@ -34,6 +35,10 @@ 'tools': [] } + def _prepare_sub_indexers(self): + self.origin_head_indexer = OriginHeadTestIndexer() + self.revision_metadata_indexer = RevisionMetadataTestIndexer() + @app.task def revision_metadata(*args, **kwargs): diff --git a/swh/indexer/tests/test_origin_metadata.py b/swh/indexer/tests/test_origin_metadata.py --- a/swh/indexer/tests/test_origin_metadata.py +++ b/swh/indexer/tests/test_origin_metadata.py @@ -5,19 +5,13 @@ import pytest -from celery.result import AsyncResult from unittest import mock from swh.objstorage.objstorage_in_memory import InMemoryObjStorage from swh.model.hashutil import hash_to_bytes -from swh.scheduler.celery_backend.runner import run_ready_tasks from swh.storage.in_memory import Storage -from swh.indexer.metadata import ( - OriginMetadataIndexer, RevisionMetadataIndexer, - FullOriginMetadataIndexer -) -from swh.indexer.origin_head import OriginHeadIndexer +from swh.indexer.metadata import OriginMetadataIndexer from swh.indexer.storage.in_memory import IndexerStorage from .utils import fill_storage, fill_obj_storage, BASE_TEST_CONFIG @@ -43,104 +37,6 @@ @mock.patch('swh.indexer.origin_head.OriginHeadIndexer.parse_config_file') @mock.patch('swh.indexer.storage.in_memory.IndexerStorage') @mock.patch('swh.storage.in_memory.Storage') -def test_pipeline(storage_mock, idx_storage_mock, - origin_head_parse_config, revision_metadata_parse_config, - swh_app, celery_session_worker, indexer_scheduler): - scheduler = indexer_scheduler - # Always returns the same instance of the idx storage, because - # this function is called by each of the three indexers. - objstorage = InMemoryObjStorage() - storage = Storage() - idx_storage = IndexerStorage() - - origin_head_parse_config.return_value = ORIGIN_HEAD_CONFIG - revision_metadata_parse_config.return_value = REVISION_METADATA_CONFIG - storage_mock.return_value = storage - idx_storage_mock.return_value = idx_storage - - fill_obj_storage(objstorage) - fill_storage(storage) - - # TODO: find a better way to share the ContentMetadataIndexer use - # the same objstorage instance. - import swh.objstorage - old_inmem_objstorage = swh.objstorage._STORAGE_CLASSES['memory'] - swh.objstorage._STORAGE_CLASSES['memory'] = lambda: objstorage - try: - RevisionMetadataIndexer.scheduler = scheduler - OriginMetadataIndexer.scheduler = scheduler - indexer = OriginHeadIndexer() - indexer.scheduler = scheduler - indexer.run(["git+https://github.com/librariesio/yarn-parser"]) - tasks = [] - - tasks.extend(run_ready_tasks(scheduler, swh_app)) # Run the first task - # Wait for the task to complete and schedule the 2nd one - task = [x for x in tasks if x['task'] == 1] - assert len(task) == 1 - promise = AsyncResult(id=task[0]['backend_id']) - promise.wait() - - tasks.extend(run_ready_tasks(scheduler, swh_app)) # Run the 2nd task - task = [x for x in tasks if x['task'] == 2] - assert len(task) == 1 - promise = AsyncResult(id=task[0]['backend_id']) - promise.wait() - finally: - swh.objstorage._STORAGE_CLASSES['memory'] = old_inmem_objstorage - del RevisionMetadataIndexer.scheduler - del OriginMetadataIndexer.scheduler - - origin = storage.origin_get({ - 'type': 'git', - 'url': 'https://github.com/librariesio/yarn-parser'}) - rev_id = hash_to_bytes('8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f') - - metadata = { - '@context': 'https://doi.org/10.5063/schema/codemeta-2.0', - 'url': - 'https://github.com/librariesio/yarn-parser#readme', - 'codeRepository': - 'git+git+https://github.com/librariesio/yarn-parser.git', - 'author': [{ - 'type': 'Person', - 'name': 'Andrew Nesbitt' - }], - 'license': 'https://spdx.org/licenses/AGPL-3.0', - 'version': '1.0.0', - 'description': - 'Tiny web service for parsing yarn.lock files', - 'issueTracker': - 'https://github.com/librariesio/yarn-parser/issues', - 'name': 'yarn-parser', - 'keywords': ['yarn', 'parse', 'lock', 'dependencies'], - } - rev_metadata = { - 'id': rev_id, - 'translated_metadata': metadata, - } - origin_metadata = { - 'origin_id': origin['id'], - 'from_revision': rev_id, - 'metadata': metadata, - } - - results = list(indexer.idx_storage.revision_metadata_get([rev_id])) - for result in results: - del result['tool'] - assert results == [rev_metadata] - - results = list(indexer.idx_storage.origin_intrinsic_metadata_get([ - origin['id']])) - for result in results: - del result['tool'] - assert results == [origin_metadata] - - -@mock.patch('swh.indexer.metadata.RevisionMetadataIndexer.parse_config_file') -@mock.patch('swh.indexer.origin_head.OriginHeadIndexer.parse_config_file') -@mock.patch('swh.indexer.storage.in_memory.IndexerStorage') -@mock.patch('swh.storage.in_memory.Storage') def test_full_origin_metadata_indexer( storage_mock, idx_storage_mock, origin_head_parse_config, revision_metadata_parse_config): @@ -164,7 +60,7 @@ old_inmem_objstorage = swh.objstorage._STORAGE_CLASSES['memory'] swh.objstorage._STORAGE_CLASSES['memory'] = lambda: objstorage try: - indexer = FullOriginMetadataIndexer() + indexer = OriginMetadataIndexer() indexer.storage = storage indexer.idx_storage = idx_storage indexer.run(["git+https://github.com/librariesio/yarn-parser"])