diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -8,6 +8,8 @@ import logging import shutil import tempfile +import datetime +from copy import deepcopy from swh.storage import get_storage from swh.core.config import SWHConfig @@ -15,6 +17,7 @@ from swh.objstorage.exc import ObjNotFoundError from swh.model import hashutil from swh.scheduler.utils import get_task +from swh.scheduler import get_scheduler from swh.indexer.storage import get_indexer_storage, INDEXER_CFG_KEY @@ -302,7 +305,7 @@ """ pass - def next_step(self, results): + def next_step(self, results, task): """Do something else with computations results (e.g. send to another queue, ...). @@ -311,15 +314,28 @@ Args: results ([result]): List of results (dict) as returned by index function. + task (dict): a dict in the form expected by + `scheduler.backend.SchedulerBackend.create_tasks` + without `next_run`, plus a `result_name` key. Returns: None """ - pass + if task: + if getattr(self, 'scheduler', None): + scheduler = self.scheduler + else: + scheduler = get_scheduler(**self.config['scheduler']) + task = deepcopy(task) + result_name = task.pop('result_name') + task['next_run'] = datetime.datetime.now() + task['arguments']['kwargs'][result_name] = self.results + scheduler.create_tasks([task]) @abc.abstractmethod - def run(self, ids, policy_update, **kwargs): + def run(self, ids, policy_update, + next_step=None, **kwargs): """Given a list of ids: - retrieves the data from the storage @@ -328,8 +344,11 @@ Args: ids ([bytes]): id's identifier list - policy_update ([str]): either 'update-dups' or 'ignore-dups' to + policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them + next_step (dict): a dict in the form expected by + `scheduler.backend.SchedulerBackend.create_tasks` + without `next_run`, plus a `result_name` key. **kwargs: passed to the `index` method """ @@ -347,7 +366,8 @@ """ - def run(self, ids, policy_update, **kwargs): + def run(self, ids, policy_update, + next_step=None, **kwargs): """Given a list of ids: - retrieve the content from the storage @@ -356,9 +376,12 @@ Args: ids ([bytes]): sha1's identifier list - policy_update ([str]): either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore - them + policy_update (str): either 'update-dups' or 'ignore-dups' to + respectively update duplicates or ignore + them + next_step (dict): a dict in the form expected by + `scheduler.backend.SchedulerBackend.create_tasks` + without `next_run`, plus a `result_name` key. **kwargs: passed to the `index` method """ @@ -377,7 +400,7 @@ self.persist_index_computations(results, policy_update) self.results = results - return self.next_step(results) + return self.next_step(results, task=next_step) except Exception: self.log.exception( 'Problem when reading contents metadata.') @@ -396,7 +419,8 @@ class. """ - def run(self, ids, policy_update, parse_ids=False, **kwargs): + def run(self, ids, policy_update, + parse_ids=False, next_step=None, **kwargs): """Given a list of origin ids: - retrieve origins from storage @@ -406,11 +430,14 @@ Args: ids ([Union[int, Tuple[str, bytes]]]): list of origin ids or (type, url) tuples. - policy_update ([str]): either 'update-dups' or 'ignore-dups' to + policy_update (str): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them - parse_ids ([bool]: If `True`, will try to convert `ids` + parse_ids (bool: If `True`, will try to convert `ids` from a human input to the valid type. + next_step (dict): a dict in the form expected by + `scheduler.backend.SchedulerBackend.create_tasks` + without `next_run`, plus a `result_name` key. **kwargs: passed to the `index` method """ @@ -445,7 +472,7 @@ 'Problem when processing origin %s' % id_) self.persist_index_computations(results, policy_update) self.results = results - return self.next_step(results) + return self.next_step(results, task=next_step) class RevisionIndexer(BaseIndexer): @@ -458,7 +485,7 @@ class. """ - def run(self, ids, policy_update): + def run(self, ids, policy_update, next_step=None): """Given a list of sha1_gits: - retrieve revisions from storage @@ -466,13 +493,15 @@ - store the results (according to policy_update) Args: - ids ([bytes]): sha1_git's identifier list - policy_update ([str]): either 'update-dups' or 'ignore-dups' to - respectively update duplicates or ignore - them + ids ([bytes or str]): sha1_git's identifier list + policy_update (str): either 'update-dups' or 'ignore-dups' to + respectively update duplicates or ignore + them """ results = [] + ids = [id_.encode() if isinstance(id_, str) else id_ + for id_ in ids] revs = self.storage.revision_get(ids) for rev in revs: @@ -489,4 +518,4 @@ 'Problem when processing revision') self.persist_index_computations(results, policy_update) self.results = results - return self.next_step(results) + return self.next_step(results, task=next_step) diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -158,14 +158,14 @@ Returns: dict: dictionary representing a revision_metadata, with keys: - - id (bytes): rev's identifier (sha1_git) + - id (str): rev's identifier (sha1_git) - indexer_configuration_id (bytes): tool used - translated_metadata (bytes): dict of retrieved metadata """ try: result = { - 'id': rev['id'], + 'id': rev['id'].decode(), 'indexer_configuration_id': self.tool['id'], 'translated_metadata': None } @@ -176,9 +176,9 @@ detected_files = detect_metadata(files) result['translated_metadata'] = self.translate_revision_metadata( detected_files) - except Exception: + except Exception as e: self.log.exception( - 'Problem when indexing rev') + 'Problem when indexing rev: %r', e) return result def persist_index_computations(self, results, policy_update): @@ -271,7 +271,7 @@ def filter(self, ids): return ids - def run(self, revisions_metadata, policy_update, *, origin_head_pairs): + def run(self, revisions_metadata, policy_update, *, origin_head): """Expected to be called with the result of RevisionMetadataIndexer as first argument; ie. not a list of ids as other indexers would. @@ -282,12 +282,12 @@ passed by RevisionMetadataIndexer via a Celery chain triggered by OriginIndexer.next_step. * `policy_update`: `'ignore-dups'` or `'update-dups'` - * `origin_head_pairs` (List[dict]): list of dictionaries with + * `origin_head` (dict): {str(origin_id): rev_id.encode()} keys `origin_id` and `revision_id`, which is the result of OriginHeadIndexer. """ - origin_head_map = {pair['origin_id']: pair['revision_id'] - for pair in origin_head_pairs} + origin_head_map = {int(origin_id): rev_id + for (origin_id, rev_id) in origin_head.items()} # Fix up the argument order. revisions_metadata has to be the # first argument because of celery.chain; the next line calls @@ -312,9 +312,6 @@ revision_metadata['indexer_configuration_id'], } - # If you get this KeyError with a message like this: - # 'foo' not in [b'foo'] - # you should check you're not using JSON as task serializer raise KeyError('%r not in %r' % (revision_id, [r['id'] for r in revisions_metadata])) diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py --- a/swh/indexer/origin_head.py +++ b/swh/indexer/origin_head.py @@ -7,10 +7,9 @@ import click import logging -from celery import chain - +from swh.scheduler import get_scheduler +from swh.scheduler.utils import create_task_dict from swh.indexer.indexer import OriginIndexer -from swh.indexer.tasks import OriginMetadata, RevisionMetadata class OriginHeadIndexer(OriginIndexer): @@ -31,8 +30,8 @@ CONFIG_BASE_FILENAME = 'indexer/origin_head' - revision_metadata_task = RevisionMetadata() - origin_intrinsic_metadata_task = OriginMetadata() + revision_metadata_task = 'revision_metadata' + origin_intrinsic_metadata_task = 'origin_metadata' def filter(self, ids): yield from ids @@ -42,7 +41,7 @@ should only be piped to another indexer via the orchestrator.""" pass - def next_step(self, results): + def next_step(self, results, task): """Once the head is found, call the RevisionMetadataIndexer on these revisions, then call the OriginMetadataIndexer with both the origin_id and the revision metadata, so it can copy the @@ -52,19 +51,42 @@ results (Iterable[dict]): Iterable of return values from `index`. """ + super().next_step(results, task) if self.revision_metadata_task is None and \ self.origin_intrinsic_metadata_task is None: return assert self.revision_metadata_task is not None assert self.origin_intrinsic_metadata_task is not None - return chain( - self.revision_metadata_task.s( - ids=[res['revision_id'] for res in results], - policy_update='update-dups'), - self.origin_intrinsic_metadata_task.s( - origin_head_pairs=results, - policy_update='update-dups'), - )() + + # Second task to run after this one: copy the revision's metadata + # to the origin + sub_task = create_task_dict( + self.origin_intrinsic_metadata_task, + 'oneshot', + origin_head={ + str(result['origin_id']): + result['revision_id'].decode() + for result in results}, + policy_update='update-dups', + ) + del sub_task['next_run'] # Not json-serializable + + # First task to run after this one: index the metadata of the + # revision + task = create_task_dict( + self.revision_metadata_task, + 'oneshot', + ids=[res['revision_id'].decode() for res in results], + policy_update='update-dups', + next_step={ + **sub_task, + 'result_name': 'revisions_metadata'}, + ) + if getattr(self, 'scheduler', None): + scheduler = self.scheduler + else: + scheduler = get_scheduler(**self.config['scheduler']) + scheduler.create_tasks([task]) # Dispatch diff --git a/swh/indexer/tests/test_metadata.py b/swh/indexer/tests/test_metadata.py --- a/swh/indexer/tests/test_metadata.py +++ b/swh/indexer/tests/test_metadata.py @@ -346,7 +346,7 @@ results = metadata_indexer.idx_storage.added_data expected_results = [('revision_metadata', True, [{ - 'id': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', + 'id': '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', 'translated_metadata': { 'identifier': None, 'maintainer': None, diff --git a/swh/indexer/tests/test_origin_metadata.py b/swh/indexer/tests/test_origin_metadata.py --- a/swh/indexer/tests/test_origin_metadata.py +++ b/swh/indexer/tests/test_origin_metadata.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import time import logging import unittest from celery import task @@ -13,8 +14,7 @@ from swh.indexer.tests.test_origin_head import TestOriginHeadIndexer from swh.indexer.tests.test_metadata import TestRevisionMetadataIndexer -from swh.scheduler.tests.celery_testing import CeleryTestFixture -from swh.indexer.tests import start_worker_thread +from swh.scheduler.tests.scheduler_testing import SchedulerTestFixture class TestOriginMetadataIndexer(OriginMetadataIndexer): @@ -57,24 +57,40 @@ class TestOriginHeadIndexer(TestOriginHeadIndexer): - revision_metadata_task = revision_metadata_test_task - origin_intrinsic_metadata_task = origin_intrinsic_metadata_test_task + revision_metadata_task = 'revision_metadata_test_task' + origin_intrinsic_metadata_task = 'origin_intrinsic_metadata_test_task' -class TestOriginMetadata(CeleryTestFixture, unittest.TestCase): +class TestOriginMetadata(SchedulerTestFixture, unittest.TestCase): def setUp(self): super().setUp() self.maxDiff = None MockIndexerStorage.added_data = [] + self.add_scheduler_task_type( + 'revision_metadata_test_task', + 'swh.indexer.tests.test_origin_metadata.' + 'revision_metadata_test_task') + self.add_scheduler_task_type( + 'origin_intrinsic_metadata_test_task', + 'swh.indexer.tests.test_origin_metadata.' + 'origin_intrinsic_metadata_test_task') + TestRevisionMetadataIndexer.scheduler = self.scheduler + + def tearDown(self): + del TestRevisionMetadataIndexer.scheduler + super().tearDown() def test_pipeline(self): indexer = TestOriginHeadIndexer() - with start_worker_thread(): - promise = indexer.run( - ["git+https://github.com/librariesio/yarn-parser"], - policy_update='update-dups', - parse_ids=True) - promise.get() + indexer.scheduler = self.scheduler + indexer.run( + ["git+https://github.com/librariesio/yarn-parser"], + policy_update='update-dups', + parse_ids=True) + + self.run_ready_tasks() # Run the first task + time.sleep(0.1) # Give it time to complete and schedule the 2nd one + self.run_ready_tasks() # Run the second task metadata = { 'identifier': None, @@ -108,13 +124,13 @@ 'email': None } rev_metadata = { - 'id': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', + 'id': '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', 'translated_metadata': metadata, 'indexer_configuration_id': 7, } origin_metadata = { 'origin_id': 54974445, - 'from_revision': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', + 'from_revision': '8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', 'metadata': metadata, 'indexer_configuration_id': 7, }