diff --git a/swh/indexer/__init__.py b/swh/indexer/__init__.py --- a/swh/indexer/__init__.py +++ b/swh/indexer/__init__.py @@ -21,6 +21,8 @@ 'ctags': 'swh.indexer.tasks.SWHCtagsTask', 'fossology_license': 'swh.indexer.tasks.SWHContentFossologyLicenseTask', 'rehash': 'swh.indexer.tasks.SWHRecomputeChecksumsTask', + 'revision_metadata_task': 'swh.indexer.tasks.SWHRevisionMetadataTask', + 'origin_metadata_task': 'swh.indexer.tasks.SWHOriginMetadataTask', } diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -374,7 +374,7 @@ results.append(res) self.persist_index_computations(results, policy_update) - self.next_step(results) + return self.next_step(results) except Exception: self.log.exception( 'Problem when reading contents metadata.') @@ -440,6 +440,7 @@ self.log.exception( 'Problem when processing origin %s' % id_) self.persist_index_computations(results, policy_update) + return self.next_step(results) class RevisionIndexer(BaseIndexer): @@ -482,3 +483,4 @@ self.log.exception( 'Problem when processing revision') self.persist_index_computations(results, policy_update) + return self.next_step(results) diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -5,7 +5,7 @@ import click import logging -from swh.indexer.indexer import ContentIndexer, RevisionIndexer +from swh.indexer.indexer import ContentIndexer, RevisionIndexer, BaseIndexer from swh.indexer.metadata_dictionary import compute_metadata from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_detector import extract_minimal_metadata_dict @@ -266,6 +266,20 @@ return min_metadata +class OriginMetadataIndexer(BaseIndexer): + def filter(self, ids): + return ids + + def run(self, ids, policy_update, *, origin_head_map): + return 42 # TODO + + def index(self, ids): + return 5 # TODO + + def persist_index_computations(self, results): + pass # TODO + + @click.command() @click.option('--revs', '-i', help='Default sha1_git to lookup', multiple=True) diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py --- a/swh/indexer/orchestrator.py +++ b/swh/indexer/orchestrator.py @@ -93,6 +93,7 @@ self.tasks = tasks def run(self, ids): + all_results = [] for name, (idx_class, filtering, batch_size) in self.indexers.items(): if filtering: policy_update = 'ignore-dups' @@ -111,10 +112,12 @@ policy_update=policy_update) celery_tasks.append(celery_task) - self._run_tasks(celery_tasks) + all_results.append(self._run_tasks(celery_tasks)) + + return all_results def _run_tasks(self, celery_tasks): - group(celery_tasks).delay() + return group(celery_tasks).delay() class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer): diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py --- a/swh/indexer/origin_head.py +++ b/swh/indexer/origin_head.py @@ -7,7 +7,10 @@ import click import logging +from celery import chain + from swh.indexer.indexer import OriginIndexer +from swh.indexer.tasks import SWHOriginMetadataTask, SWHRevisionMetadataTask class OriginHeadIndexer(OriginIndexer): @@ -26,6 +29,11 @@ }), } + CONFIG_BASE_FILENAME = 'indexer/origin_head' + + revision_metadata_task = SWHRevisionMetadataTask() + origin_metadata_task = SWHOriginMetadataTask() + def filter(self, ids): yield from ids @@ -34,6 +42,32 @@ should only be piped to another indexer via the orchestrator.""" pass + def next_step(self, results): + """Once the head is found, call the RevisionMetadataIndexer + on these revisions, then call the OriginMetadataIndexer with + both the origin_id and the revision metadata, so it can copy the + revision metadata to the origin's metadata. + + Args: + results (Iterable[dict]): Iterable of return values from `index`. + + """ + if self.revision_metadata_task is None or \ + self.origin_metadata_task is None: + return + assert self.revision_metadata_task is not None + assert self.origin_metadata_task is not None + origin_to_revision = {res['origin_id']: res['revision_id'] + for res in results} + return chain( + self.revision_metadata_task.s( + ids=[res['revision_id'] for res in results], + policy_update='update-dups'), + self.origin_metadata_task.s( + origin_head_map=origin_to_revision, + policy_update='update-dups'), + )() + # Dispatch def index(self, origin): diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -5,7 +5,7 @@ import logging -from swh.scheduler.task import Task +from celery import Task from .orchestrator import OrchestratorAllContentsIndexer from .orchestrator import OrchestratorTextContentsIndexer @@ -14,6 +14,7 @@ from .ctags import CtagsIndexer from .fossology_license import ContentFossologyLicenseIndexer from .rehash import RecomputeChecksums +from .metadata import RevisionMetadataIndexer, OriginMetadataIndexer logging.basicConfig(level=logging.INFO) @@ -40,6 +41,22 @@ OrchestratorTextContentsIndexer().run(*args, **kwargs) +class SWHRevisionMetadataTask(Task): + task_queue = 'swh_indexer_revision_metadata' + + serializer = 'msgpack' + + def run_task(self, *args, **kwargs): + RevisionMetadataIndexer().run(*args, **kwargs) + + +class SWHOriginMetadataTask(Task): + task_queue = 'swh_indexer_origin_metadata' + + def run_task(self, *args, **kwargs): + OriginMetadataIndexer().run(*args, **kwargs) + + class SWHContentMimetypeTask(Task): """Task which computes the mimetype, encoding from the sha1's content. diff --git a/swh/indexer/tests/__init__.py b/swh/indexer/tests/__init__.py --- a/swh/indexer/tests/__init__.py +++ b/swh/indexer/tests/__init__.py @@ -1,2 +1,27 @@ +import os from os import path + +from celery import shared_task +from celery.contrib.testing.worker import _start_worker_thread +from celery import current_app + +__all__ = ['start_worker_thread'] + DATA_DIR = path.join(path.dirname(__file__), 'data') + +os.environ['CELERY_BROKER_URL'] = 'memory://' +os.environ['CELERY_RESULT_BACKEND'] = 'redis://localhost' +current_app.conf.update( + celery_task_serializer='msgpack', + ) + + +def start_worker_thread(): + return _start_worker_thread(current_app, loglevel='DEBUG') + + +# Needed to pass an assertion, see +# https://github.com/celery/celery/pull/5111 +@shared_task(name='celery.ping') +def ping(): + return 'pong' diff --git a/swh/indexer/tests/test_orchestrator.py b/swh/indexer/tests/test_orchestrator.py --- a/swh/indexer/tests/test_orchestrator.py +++ b/swh/indexer/tests/test_orchestrator.py @@ -5,13 +5,16 @@ import unittest +import celery + from swh.indexer.orchestrator import BaseOrchestratorIndexer -from swh.indexer.indexer import RevisionIndexer -from swh.indexer.tests.test_utils import MockIndexerStorage -from swh.scheduler.task import Task +from swh.indexer.indexer import BaseIndexer +from swh.indexer.tests.test_utils import MockIndexerStorage, MockStorage + +from . import start_worker_thread -class BaseTestIndexer(RevisionIndexer): +class BaseTestIndexer(BaseIndexer): ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'foo', @@ -22,16 +25,20 @@ def prepare(self): self.idx_storage = MockIndexerStorage() + self.storage = MockStorage() def check(self): pass def filter(self, ids): - self.filtered = ids + self.filtered.append(ids) return ids + def run(self, ids, policy_update): + return self.index(ids) + def index(self, ids): - self.indexed = ids + self.indexed.append(ids) return [id_ + '_indexed_by_' + self.__class__.__name__ for id_ in ids] @@ -40,37 +47,49 @@ class Indexer1(BaseTestIndexer): + filtered = [] + indexed = [] + def filter(self, ids): return super().filter([id_ for id_ in ids if '1' in id_]) class Indexer2(BaseTestIndexer): + filtered = [] + indexed = [] + def filter(self, ids): return super().filter([id_ for id_ in ids if '2' in id_]) class Indexer3(BaseTestIndexer): + filtered = [] + indexed = [] + def filter(self, ids): return super().filter([id_ for id_ in ids if '3' in id_]) -class Indexer1Task(Task): - pass +@celery.task +def indexer1_task(*args, **kwargs): + return Indexer1().run(*args, **kwargs) -class Indexer2Task(Task): - pass +@celery.task +def indexer2_task(*args, **kwargs): + return Indexer2().run(*args, **kwargs) -class Indexer3Task(Task): - pass +@celery.task +def indexer3_task(self, *args, **kwargs): + return Indexer3().run(*args, **kwargs) class TestOrchestrator12(BaseOrchestratorIndexer): TASK_NAMES = { - 'indexer1': 'swh.indexer.tests.test_orchestrator.Indexer1Task', - 'indexer2': 'swh.indexer.tests.test_orchestrator.Indexer2Task', - 'indexer3': 'swh.indexer.tests.test_orchestrator.Indexer3Task', + 'indexer1': 'swh.indexer.tests.test_orchestrator.indexer1_task', + 'indexer2': 'swh.indexer.tests.test_orchestrator.indexer2_task', + 'indexer3': 'swh.indexer.tests.test_orchestrator.indexer3_task', } INDEXER_CLASSES = { @@ -98,63 +117,84 @@ } self.prepare_tasks() + +class MockedTestOrchestrator12(TestOrchestrator12): def _run_tasks(self, celery_tasks): self.running_tasks.extend(celery_tasks) class OrchestratorTest(unittest.TestCase): + def test_orchestrator_filter(self): + return + with start_worker_thread(): + o = TestOrchestrator12() + o.prepare() + promises = o.run(['id12', 'id2']) + results = [] + for promise in promises: + results.append(promise.get(timeout=10)) + self.assertCountEqual( + results, + [[['id12_indexed_by_Indexer1']], + [['id12_indexed_by_Indexer2', + 'id2_indexed_by_Indexer2']]]) + self.assertEqual(Indexer2.indexed, [['id12', 'id2']]) + self.assertEqual(Indexer1.indexed, [['id12']]) + + +class MockedOrchestratorTest(unittest.TestCase): maxDiff = None - def test_orchestrator_filter(self): - o = TestOrchestrator12() + def test_mocked_orchestrator_filter(self): + o = MockedTestOrchestrator12() o.prepare() o.run(['id12', 'id2']) self.assertCountEqual(o.running_tasks, [ - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.Indexer1Task'}, - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12', 'id2'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.Indexer2Task'}, - ]) - - def test_orchestrator_batch(self): - o = TestOrchestrator12() + {'args': (), + 'chord_size': None, + 'immutable': False, + 'kwargs': {'ids': ['id12'], + 'policy_update': 'ignore-dups'}, + 'options': {}, + 'subtask_type': None, + 'task': 'swh.indexer.tests.test_orchestrator.indexer1_task'}, + {'args': (), + 'chord_size': None, + 'immutable': False, + 'kwargs': {'ids': ['id12', 'id2'], + 'policy_update': 'ignore-dups'}, + 'options': {}, + 'subtask_type': None, + 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, + ]) + + def test_mocked_orchestrator_batch(self): + o = MockedTestOrchestrator12() o.prepare() o.run(['id12', 'id2a', 'id2b', 'id2c']) self.assertCountEqual(o.running_tasks, [ - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.Indexer1Task'}, - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12', 'id2a'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.Indexer2Task'}, - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id2b', 'id2c'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.Indexer2Task'}, - ]) + {'args': (), + 'chord_size': None, + 'immutable': False, + 'kwargs': {'ids': ['id12'], + 'policy_update': 'ignore-dups'}, + 'options': {}, + 'subtask_type': None, + 'task': 'swh.indexer.tests.test_orchestrator.indexer1_task'}, + {'args': (), + 'chord_size': None, + 'immutable': False, + 'kwargs': {'ids': ['id12', 'id2a'], + 'policy_update': 'ignore-dups'}, + 'options': {}, + 'subtask_type': None, + 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, + {'args': (), + 'chord_size': None, + 'immutable': False, + 'kwargs': {'ids': ['id2b', 'id2c'], + 'policy_update': 'ignore-dups'}, + 'options': {}, + 'subtask_type': None, + 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, + ]) diff --git a/swh/indexer/tests/test_origin_head.py b/swh/indexer/tests/test_origin_head.py --- a/swh/indexer/tests/test_origin_head.py +++ b/swh/indexer/tests/test_origin_head.py @@ -41,6 +41,12 @@ 'project': None, 'type': 'svn', 'url': 'http://0-512-md.googlecode.com/svn/'}, + { + 'id': 424242, + 'lister': None, + 'project': None, + 'type': 'git', + 'url': 'https://github.com/moranegg/metadata_test'}, ] SNAPSHOTS = { @@ -114,6 +120,11 @@ 'target_type': 'revision'}}, 'id': b'\xa1\xa2\x8c\n\xb3\x87\xa8\xf9\xe0a\x8c\xb7' b'\x05\xea\xb8\x1f\xc4H\xf4s'}, + 424242: { + 'branches': { + b'HEAD': { + 'target': b'head_branch_metadata_test', + 'target_type': 'revision'}}} } @@ -135,6 +146,10 @@ """Specific indexer whose configuration is enough to satisfy the indexing tests. """ + + revision_metadata_task = None + origin_metadata_task = None + def prepare(self): self.config = { 'tools': { diff --git a/swh/indexer/tests/test_origin_metadata.py b/swh/indexer/tests/test_origin_metadata.py new file mode 100644 --- /dev/null +++ b/swh/indexer/tests/test_origin_metadata.py @@ -0,0 +1,71 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +import unittest +from celery import task + +from swh.indexer.metadata import OriginMetadataIndexer +from swh.indexer.tests.test_utils import MockObjStorage, MockStorage +from swh.indexer.tests.test_utils import MockIndexerStorage +from swh.indexer.tests.test_origin_head import TestOriginHeadIndexer +from swh.indexer.tests.test_metadata import TestRevisionMetadataIndexer + +from . import start_worker_thread + + +class TestOriginMetadataIndexer(OriginMetadataIndexer): + def prepare(self): + self.config = { + 'storage': { + 'cls': 'remote', + 'args': { + 'url': 'http://localhost:9999', + } + }, + 'tools': { + 'name': 'origin-metadata', + 'version': '0.0.1', + 'configuration': {} + } + } + self.storage = MockStorage() + self.idx_storage = MockIndexerStorage() + self.log = logging.getLogger('swh.indexer') + self.objstorage = MockObjStorage() + self.destination_task = None + self.tools = self.register_tools(self.config['tools']) + self.tool = self.tools[0] + self.results = [] + + +@task +def test_revision_metadata_task(*args, **kwargs): + TestRevisionMetadataIndexer().run(*args, **kwargs) + + +@task +def test_origin_metadata_task(*args, **kwargs): + TestOriginMetadataIndexer().run(*args, **kwargs) + + +class TestOriginHeadIndexer(TestOriginHeadIndexer): + revision_metadata_task = test_revision_metadata_task + origin_metadata_task = test_origin_metadata_task + + +class TestOriginMetadata(unittest.TestCase): + def test_pipeline(self): + indexer = TestOriginHeadIndexer() + with start_worker_thread(): + promise = indexer.run( + ["git+https://github.com/moranegg/metadata_test"], + policy_update='remove-dups', + parse_ids=True) + print('coin') + for item in promise.collect(intermediate=True): + print('+++ ' + repr(item)) + print('--- ' + repr(promise.get())) + print('coincoin')