diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -279,20 +279,16 @@ def filter(self, ids): return ids - def run(self, revisions_metadata, policy_update, *, origin_head): + def run(self, origin_head, policy_update): """Expected to be called with the result of RevisionMetadataIndexer as first argument; ie. not a list of ids as other indexers would. Args: - * `revisions_metadata` (List[dict]): contains metadata from - revisions, along with the respective revision ids. It is - passed by RevisionMetadataIndexer via a Celery chain - triggered by OriginIndexer.next_step. - * `policy_update`: `'ignore-dups'` or `'update-dups'` * `origin_head` (dict): {str(origin_id): rev_id.encode()} keys `origin_id` and `revision_id`, which is the result of OriginHeadIndexer. + * `policy_update`: `'ignore-dups'` or `'update-dups'` """ origin_head_map = {int(origin_id): rev_id for (origin_id, rev_id) in origin_head.items()} @@ -303,26 +299,25 @@ return super().run(ids=list(origin_head_map), policy_update=policy_update, parse_ids=False, - revisions_metadata=revisions_metadata, origin_head_map=origin_head_map) - def index(self, origin, *, revisions_metadata, origin_head_map): + def index(self, origin, *, origin_head_map): # Get the last revision of the origin. revision_id = origin_head_map[origin['id']] - # Get the metadata of that revision, and return it - for revision_metadata in revisions_metadata: - if revision_metadata['id'] == revision_id: - return { - 'origin_id': origin['id'], - 'metadata': revision_metadata['translated_metadata'], - 'from_revision': revision_id, - 'indexer_configuration_id': - revision_metadata['indexer_configuration_id'], - } - - raise KeyError('%r not in %r' % - (revision_id, [r['id'] for r in revisions_metadata])) + revision_metadata = self.idx_storage \ + .revision_metadata_get([revision_id]) + + for item in revision_metadata: + assert item['id'] == revision_id + # Get the metadata of that revision, and return it + return { + 'origin_id': origin['id'], + 'metadata': item['translated_metadata'], + 'from_revision': revision_id, + 'indexer_configuration_id': + item['indexer_configuration_id'], + } def persist_index_computations(self, results, policy_update): self.idx_storage.origin_intrinsic_metadata_add( diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py --- a/swh/indexer/origin_head.py +++ b/swh/indexer/origin_head.py @@ -82,9 +82,7 @@ 'oneshot', ids=[res['revision_id'].decode() for res in results], policy_update='update-dups', - next_step={ - **sub_task, - 'result_name': 'revisions_metadata'}, + next_step=sub_task, ) if getattr(self, 'scheduler', None): scheduler = self.scheduler diff --git a/swh/indexer/tests/test_utils.py b/swh/indexer/tests/test_utils.py --- a/swh/indexer/tests/test_utils.py +++ b/swh/indexer/tests/test_utils.py @@ -269,6 +269,7 @@ """ added_data = [] + revision_metadata = {} def indexer_configuration_add(self, tools): results = [] @@ -315,8 +316,15 @@ ('content_metadata', conflict_update, metadata)) def revision_metadata_add(self, metadata, conflict_update=None): + assert conflict_update self.added_data.append( ('revision_metadata', conflict_update, metadata)) + for item in metadata: + self.revision_metadata.setdefault(item['id'], []).append(item) + + def revision_metadata_get(self, ids): + for id_ in ids: + yield from self.revision_metadata.get(id_) def origin_intrinsic_metadata_add(self, metadata, conflict_update=None): self.added_data.append(