diff --git a/swh/indexer/__init__.py b/swh/indexer/__init__.py --- a/swh/indexer/__init__.py +++ b/swh/indexer/__init__.py @@ -21,6 +21,8 @@ 'ctags': 'swh.indexer.tasks.SWHCtagsTask', 'fossology_license': 'swh.indexer.tasks.SWHContentFossologyLicenseTask', 'rehash': 'swh.indexer.tasks.SWHRecomputeChecksumsTask', + 'revision_metadata_task': 'swh.indexer.tasks.SWHRevisionMetadataTask', + 'origin_metadata_task': 'swh.indexer.tasks.SWHOriginMetadataTask', } diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -319,7 +319,7 @@ pass @abc.abstractmethod - def run(self, ids, policy_update): + def run(self, ids, policy_update, **kwargs): """Given a list of ids: - retrieves the data from the storage @@ -330,6 +330,7 @@ ids ([bytes]): id's identifier list policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them + **kwargs: passed to the `index` method """ pass @@ -346,7 +347,7 @@ """ - def run(self, ids, policy_update): + def run(self, ids, policy_update, **kwargs): """Given a list of ids: - retrieve the content from the storage @@ -358,6 +359,7 @@ policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them + **kwargs: passed to the `index` method """ results = [] @@ -369,12 +371,13 @@ self.log.warn('Content %s not found in objstorage' % hashutil.hash_to_hex(sha1)) continue - res = self.index(sha1, raw_content) + res = self.index(sha1, raw_content, **kwargs) if res: # If no results, skip it results.append(res) self.persist_index_computations(results, policy_update) - self.next_step(results) + self.results = results + return self.next_step(results) except Exception: self.log.exception( 'Problem when reading contents metadata.') @@ -393,7 +396,7 @@ class. """ - def run(self, ids, policy_update, parse_ids=False): + def run(self, ids, policy_update, parse_ids=False, **kwargs): """Given a list of origin ids: - retrieve origins from storage @@ -408,6 +411,7 @@ them parse_ids ([bool]: If `True`, will try to convert `ids` from a human input to the valid type. + **kwargs: passed to the `index` method """ if parse_ids: @@ -426,20 +430,22 @@ elif isinstance(id_, int): params = {'id': id_} else: - raise TypeError('Invalid value for "ids": %r' % id_) + raise TypeError('Invalid value in "ids": %r' % id_) origin = self.storage.origin_get(params) if not origin: self.log.warn('Origins %s not found in storage' % list(ids)) continue try: - res = self.index(origin) + res = self.index(origin, **kwargs) if origin: # If no results, skip it results.append(res) except Exception: self.log.exception( 'Problem when processing origin %s' % id_) self.persist_index_computations(results, policy_update) + self.results = results + return self.next_step(results) class RevisionIndexer(BaseIndexer): @@ -482,3 +488,5 @@ self.log.exception( 'Problem when processing revision') self.persist_index_computations(results, policy_update) + self.results = results + return self.next_step(results) diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -5,7 +5,7 @@ import click import logging -from swh.indexer.indexer import ContentIndexer, RevisionIndexer +from swh.indexer.indexer import ContentIndexer, RevisionIndexer, OriginIndexer from swh.indexer.metadata_dictionary import compute_metadata from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_detector import extract_minimal_metadata_dict @@ -266,6 +266,36 @@ return min_metadata +class OriginMetadataIndexer(OriginIndexer): + def filter(self, ids): + return ids + + def run(self, revisions_metadata, policy_update, *, origin_head_pairs): + """Expected to be called with the result of RevisionMetadataIndexer + as first argument; ie. not a list of ids as other indexers would.""" + origin_head_map = {pair['origin_id']: pair['revision_id'] + for pair in origin_head_pairs} + return super().run(ids=list(origin_head_map), + policy_update=policy_update, + revisions_metadata=revisions_metadata, + origin_head_map=origin_head_map) + + def index(self, origin, *, revisions_metadata, origin_head_map): + revision_id = origin_head_map[origin['id']] + for revision_metadata in revisions_metadata: + if revision_metadata['id'] == revision_id: + return revision_metadata + # If you get this KeyError with a message like this: + # 'foo' not in [b'foo'] + # you should check you're not using JSON as task serializer + raise KeyError('%r not in %r' % + (revision_id, [r['id'] for r in revisions_metadata])) + + def persist_index_computations(self, results, policy_update): + self.idx_storage.origin_metadata_add( + results, conflict_update=(policy_update == 'update-dups')) + + @click.command() @click.option('--revs', '-i', help='Default sha1_git to lookup', multiple=True) diff --git a/swh/indexer/orchestrator.py b/swh/indexer/orchestrator.py --- a/swh/indexer/orchestrator.py +++ b/swh/indexer/orchestrator.py @@ -93,6 +93,7 @@ self.tasks = tasks def run(self, ids): + all_results = [] for name, (idx_class, filtering, batch_size) in self.indexers.items(): if filtering: policy_update = 'ignore-dups' @@ -111,10 +112,12 @@ policy_update=policy_update) celery_tasks.append(celery_task) - self._run_tasks(celery_tasks) + all_results.append(self._run_tasks(celery_tasks)) + + return all_results def _run_tasks(self, celery_tasks): - group(celery_tasks).delay() + return group(celery_tasks).delay() class OrchestratorAllContentsIndexer(BaseOrchestratorIndexer): diff --git a/swh/indexer/origin_head.py b/swh/indexer/origin_head.py --- a/swh/indexer/origin_head.py +++ b/swh/indexer/origin_head.py @@ -7,7 +7,10 @@ import click import logging +from celery import chain + from swh.indexer.indexer import OriginIndexer +from swh.indexer.tasks import SWHOriginMetadataTask, SWHRevisionMetadataTask class OriginHeadIndexer(OriginIndexer): @@ -26,6 +29,11 @@ }), } + CONFIG_BASE_FILENAME = 'indexer/origin_head' + + revision_metadata_task = SWHRevisionMetadataTask() + origin_metadata_task = SWHOriginMetadataTask() + def filter(self, ids): yield from ids @@ -34,6 +42,30 @@ should only be piped to another indexer via the orchestrator.""" pass + def next_step(self, results): + """Once the head is found, call the RevisionMetadataIndexer + on these revisions, then call the OriginMetadataIndexer with + both the origin_id and the revision metadata, so it can copy the + revision metadata to the origin's metadata. + + Args: + results (Iterable[dict]): Iterable of return values from `index`. + + """ + if self.revision_metadata_task is None or \ + self.origin_metadata_task is None: + return + assert self.revision_metadata_task is not None + assert self.origin_metadata_task is not None + return chain( + self.revision_metadata_task.s( + ids=[res['revision_id'] for res in results], + policy_update='update-dups'), + self.origin_metadata_task.s( + origin_head_pairs=results, + policy_update='update-dups'), + )() + # Dispatch def index(self, origin): diff --git a/swh/indexer/tasks.py b/swh/indexer/tasks.py --- a/swh/indexer/tasks.py +++ b/swh/indexer/tasks.py @@ -5,7 +5,7 @@ import logging -from swh.scheduler.task import Task +import celery from .orchestrator import OrchestratorAllContentsIndexer from .orchestrator import OrchestratorTextContentsIndexer @@ -14,10 +14,18 @@ from .ctags import CtagsIndexer from .fossology_license import ContentFossologyLicenseIndexer from .rehash import RecomputeChecksums +from .metadata import RevisionMetadataIndexer, OriginMetadataIndexer logging.basicConfig(level=logging.INFO) +class Task(celery.Task): + def run_task(self, *args, **kwargs): + indexer = self.Indexer().run(*args, **kwargs) + indexer.run() + return indexer.results + + class SWHOrchestratorAllContentsTask(Task): """Main task in charge of reading batch contents (of any type) and broadcasting them back to other tasks. @@ -25,8 +33,7 @@ """ task_queue = 'swh_indexer_orchestrator_content_all' - def run_task(self, *args, **kwargs): - OrchestratorAllContentsIndexer().run(*args, **kwargs) + Indexer = OrchestratorAllContentsIndexer class SWHOrchestratorTextContentsTask(Task): @@ -36,8 +43,21 @@ """ task_queue = 'swh_indexer_orchestrator_content_text' - def run_task(self, *args, **kwargs): - OrchestratorTextContentsIndexer().run(*args, **kwargs) + Indexer = OrchestratorTextContentsIndexer + + +class SWHRevisionMetadataTask(Task): + task_queue = 'swh_indexer_revision_metadata' + + serializer = 'msgpack' + + Indexer = RevisionMetadataIndexer + + +class SWHOriginMetadataTask(Task): + task_queue = 'swh_indexer_origin_metadata' + + Indexer = OriginMetadataIndexer class SWHContentMimetypeTask(Task): @@ -46,8 +66,7 @@ """ task_queue = 'swh_indexer_content_mimetype' - def run_task(self, *args, **kwargs): - ContentMimetypeIndexer().run(*args, **kwargs) + Indexer = ContentMimetypeIndexer class SWHContentLanguageTask(Task): @@ -66,8 +85,7 @@ """ task_queue = 'swh_indexer_content_ctags' - def run_task(self, *args, **kwargs): - CtagsIndexer().run(*args, **kwargs) + Indexer = CtagsIndexer class SWHContentFossologyLicenseTask(Task): @@ -76,8 +94,7 @@ """ task_queue = 'swh_indexer_content_fossology_license' - def run_task(self, *args, **kwargs): - ContentFossologyLicenseIndexer().run(*args, **kwargs) + Indexer = ContentFossologyLicenseIndexer class SWHRecomputeChecksumsTask(Task): @@ -86,5 +103,4 @@ """ task_queue = 'swh_indexer_content_rehash' - def run_task(self, *args, **kwargs): - RecomputeChecksums().run(*args, **kwargs) + Indexer = RecomputeChecksums diff --git a/swh/indexer/tests/__init__.py b/swh/indexer/tests/__init__.py --- a/swh/indexer/tests/__init__.py +++ b/swh/indexer/tests/__init__.py @@ -1,2 +1,28 @@ +import os from os import path + +from celery import shared_task +from celery.contrib.testing.worker import _start_worker_thread +from celery import current_app + +__all__ = ['start_worker_thread'] + DATA_DIR = path.join(path.dirname(__file__), 'data') + +os.environ['CELERY_BROKER_URL'] = 'memory://' +os.environ['CELERY_RESULT_BACKEND'] = 'redis://localhost' +os.environ['CELERY_TASK_SERIALIZER'] = 'msgpack' +current_app.conf.update( + task_serializer='msgpack', + ) + + +def start_worker_thread(): + return _start_worker_thread(current_app, loglevel='DEBUG') + + +# Needed to pass an assertion, see +# https://github.com/celery/celery/pull/5111 +@shared_task(name='celery.ping') +def ping(): + return 'pong' diff --git a/swh/indexer/tests/test_metadata.py b/swh/indexer/tests/test_metadata.py --- a/swh/indexer/tests/test_metadata.py +++ b/swh/indexer/tests/test_metadata.py @@ -86,6 +86,7 @@ 'context': 'npm' } } + MockIndexerStorage.added_data = [] def test_compute_metadata_none(self): """ @@ -209,9 +210,9 @@ # when metadata_indexer.run(sha1s, policy_update='ignore-dups') - results = metadata_indexer.idx_storage.state + results = metadata_indexer.idx_storage.added_data - expected_results = [{ + expected_results = [('content_metadata', False, [{ 'indexer_configuration_id': 30, 'translated_metadata': { 'other': {}, @@ -270,7 +271,7 @@ 'indexer_configuration_id': 30, 'translated_metadata': None, 'id': '02fb2c89e14f7fab46701478c83779c7beb7b069' - }] + }])] # The assertion below returns False sometimes because of nested lists self.assertEqual(expected_results, results) @@ -318,9 +319,9 @@ ] metadata_indexer.run(sha1_gits, 'update-dups') - results = metadata_indexer.idx_storage.state + results = metadata_indexer.idx_storage.added_data - expected_results = [{ + expected_results = [('revision_metadata', True, [{ 'id': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', 'translated_metadata': { 'identifier': None, @@ -354,6 +355,6 @@ 'email': None }, 'indexer_configuration_id': 7 - }] + }])] # then self.assertEqual(expected_results, results) diff --git a/swh/indexer/tests/test_orchestrator.py b/swh/indexer/tests/test_orchestrator.py --- a/swh/indexer/tests/test_orchestrator.py +++ b/swh/indexer/tests/test_orchestrator.py @@ -5,13 +5,16 @@ import unittest +import celery + from swh.indexer.orchestrator import BaseOrchestratorIndexer -from swh.indexer.indexer import RevisionIndexer -from swh.indexer.tests.test_utils import MockIndexerStorage -from swh.scheduler.task import Task +from swh.indexer.indexer import BaseIndexer +from swh.indexer.tests.test_utils import MockIndexerStorage, MockStorage + +from . import start_worker_thread -class BaseTestIndexer(RevisionIndexer): +class BaseTestIndexer(BaseIndexer): ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'foo', @@ -22,16 +25,20 @@ def prepare(self): self.idx_storage = MockIndexerStorage() + self.storage = MockStorage() def check(self): pass def filter(self, ids): - self.filtered = ids + self.filtered.append(ids) return ids + def run(self, ids, policy_update): + return self.index(ids) + def index(self, ids): - self.indexed = ids + self.indexed.append(ids) return [id_ + '_indexed_by_' + self.__class__.__name__ for id_ in ids] @@ -40,37 +47,49 @@ class Indexer1(BaseTestIndexer): + filtered = [] + indexed = [] + def filter(self, ids): return super().filter([id_ for id_ in ids if '1' in id_]) class Indexer2(BaseTestIndexer): + filtered = [] + indexed = [] + def filter(self, ids): return super().filter([id_ for id_ in ids if '2' in id_]) class Indexer3(BaseTestIndexer): + filtered = [] + indexed = [] + def filter(self, ids): return super().filter([id_ for id_ in ids if '3' in id_]) -class Indexer1Task(Task): - pass +@celery.task +def indexer1_task(*args, **kwargs): + return Indexer1().run(*args, **kwargs) -class Indexer2Task(Task): - pass +@celery.task +def indexer2_task(*args, **kwargs): + return Indexer2().run(*args, **kwargs) -class Indexer3Task(Task): - pass +@celery.task +def indexer3_task(self, *args, **kwargs): + return Indexer3().run(*args, **kwargs) class TestOrchestrator12(BaseOrchestratorIndexer): TASK_NAMES = { - 'indexer1': 'swh.indexer.tests.test_orchestrator.Indexer1Task', - 'indexer2': 'swh.indexer.tests.test_orchestrator.Indexer2Task', - 'indexer3': 'swh.indexer.tests.test_orchestrator.Indexer3Task', + 'indexer1': 'swh.indexer.tests.test_orchestrator.indexer1_task', + 'indexer2': 'swh.indexer.tests.test_orchestrator.indexer2_task', + 'indexer3': 'swh.indexer.tests.test_orchestrator.indexer3_task', } INDEXER_CLASSES = { @@ -98,63 +117,84 @@ } self.prepare_tasks() + +class MockedTestOrchestrator12(TestOrchestrator12): def _run_tasks(self, celery_tasks): self.running_tasks.extend(celery_tasks) class OrchestratorTest(unittest.TestCase): + def test_orchestrator_filter(self): + return + with start_worker_thread(): + o = TestOrchestrator12() + o.prepare() + promises = o.run(['id12', 'id2']) + results = [] + for promise in promises: + results.append(promise.get(timeout=10)) + self.assertCountEqual( + results, + [[['id12_indexed_by_Indexer1']], + [['id12_indexed_by_Indexer2', + 'id2_indexed_by_Indexer2']]]) + self.assertEqual(Indexer2.indexed, [['id12', 'id2']]) + self.assertEqual(Indexer1.indexed, [['id12']]) + + +class MockedOrchestratorTest(unittest.TestCase): maxDiff = None - def test_orchestrator_filter(self): - o = TestOrchestrator12() + def test_mocked_orchestrator_filter(self): + o = MockedTestOrchestrator12() o.prepare() o.run(['id12', 'id2']) self.assertCountEqual(o.running_tasks, [ - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.Indexer1Task'}, - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12', 'id2'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.Indexer2Task'}, - ]) - - def test_orchestrator_batch(self): - o = TestOrchestrator12() + {'args': (), + 'chord_size': None, + 'immutable': False, + 'kwargs': {'ids': ['id12'], + 'policy_update': 'ignore-dups'}, + 'options': {}, + 'subtask_type': None, + 'task': 'swh.indexer.tests.test_orchestrator.indexer1_task'}, + {'args': (), + 'chord_size': None, + 'immutable': False, + 'kwargs': {'ids': ['id12', 'id2'], + 'policy_update': 'ignore-dups'}, + 'options': {}, + 'subtask_type': None, + 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, + ]) + + def test_mocked_orchestrator_batch(self): + o = MockedTestOrchestrator12() o.prepare() o.run(['id12', 'id2a', 'id2b', 'id2c']) self.assertCountEqual(o.running_tasks, [ - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.Indexer1Task'}, - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id12', 'id2a'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.Indexer2Task'}, - {'args': (), - 'chord_size': None, - 'immutable': False, - 'kwargs': {'ids': ['id2b', 'id2c'], - 'policy_update': 'ignore-dups'}, - 'options': {}, - 'subtask_type': None, - 'task': 'swh.indexer.tests.test_orchestrator.Indexer2Task'}, - ]) + {'args': (), + 'chord_size': None, + 'immutable': False, + 'kwargs': {'ids': ['id12'], + 'policy_update': 'ignore-dups'}, + 'options': {}, + 'subtask_type': None, + 'task': 'swh.indexer.tests.test_orchestrator.indexer1_task'}, + {'args': (), + 'chord_size': None, + 'immutable': False, + 'kwargs': {'ids': ['id12', 'id2a'], + 'policy_update': 'ignore-dups'}, + 'options': {}, + 'subtask_type': None, + 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, + {'args': (), + 'chord_size': None, + 'immutable': False, + 'kwargs': {'ids': ['id2b', 'id2c'], + 'policy_update': 'ignore-dups'}, + 'options': {}, + 'subtask_type': None, + 'task': 'swh.indexer.tests.test_orchestrator.indexer2_task'}, + ]) diff --git a/swh/indexer/tests/test_origin_head.py b/swh/indexer/tests/test_origin_head.py --- a/swh/indexer/tests/test_origin_head.py +++ b/swh/indexer/tests/test_origin_head.py @@ -8,133 +8,17 @@ from nose.tools import istest from swh.indexer.origin_head import OriginHeadIndexer -from swh.indexer.tests.test_utils import MockIndexerStorage - -ORIGINS = [ - { - 'id': 52189575, - 'lister': None, - 'project': None, - 'type': 'git', - 'url': 'https://github.com/SoftwareHeritage/swh-storage'}, - { - 'id': 4423668, - 'lister': None, - 'project': None, - 'type': 'ftp', - 'url': 'rsync://ftp.gnu.org/gnu/3dldf'}, - { - 'id': 77775770, - 'lister': None, - 'project': None, - 'type': 'deposit', - 'url': 'https://forge.softwareheritage.org/source/jesuisgpl/'}, - { - 'id': 85072327, - 'lister': None, - 'project': None, - 'type': 'pypi', - 'url': 'https://pypi.org/project/limnoria/'}, - { - 'id': 49908349, - 'lister': None, - 'project': None, - 'type': 'svn', - 'url': 'http://0-512-md.googlecode.com/svn/'}, - ] - -SNAPSHOTS = { - 52189575: { - 'branches': { - b'refs/heads/add-revision-origin-cache': { - 'target': b'L[\xce\x1c\x88\x8eF\t\xf1"\x19\x1e\xfb\xc0' - b's\xe7/\xe9l\x1e', - 'target_type': 'revision'}, - b'HEAD': { - 'target': b'8K\x12\x00d\x03\xcc\xe4]bS\xe3\x8f{\xd7}' - b'\xac\xefrm', - 'target_type': 'revision'}, - b'refs/tags/v0.0.103': { - 'target': b'\xb6"Im{\xfdLb\xb0\x94N\xea\x96m\x13x\x88+' - b'\x0f\xdd', - 'target_type': 'release'}, - }}, - 4423668: { - 'branches': { - b'3DLDF-1.1.4.tar.gz': { - 'target': b'dJ\xfb\x1c\x91\xf4\x82B%]6\xa2\x90|\xd3\xfc' - b'"G\x99\x11', - 'target_type': 'revision'}, - b'3DLDF-2.0.2.tar.gz': { - 'target': b'\xb6\x0e\xe7\x9e9\xac\xaa\x19\x9e=' - b'\xd1\xc5\x00\\\xc6\xfc\xe0\xa6\xb4V', - 'target_type': 'revision'}, - b'3DLDF-2.0.3-examples.tar.gz': { - 'target': b'!H\x19\xc0\xee\x82-\x12F1\xbd\x97' - b'\xfe\xadZ\x80\x80\xc1\x83\xff', - 'target_type': 'revision'}, - b'3DLDF-2.0.3.tar.gz': { - 'target': b'\x8e\xa9\x8e/\xea}\x9feF\xf4\x9f\xfd\xee' - b'\xcc\x1a\xb4`\x8c\x8by', - 'target_type': 'revision'}, - b'3DLDF-2.0.tar.gz': { - 'target': b'F6*\xff(?\x19a\xef\xb6\xc2\x1fv$S\xe3G' - b'\xd3\xd1m', - b'target_type': 'revision'} - }}, - 77775770: { - 'branches': { - b'master': { - 'target': b'\xe7n\xa4\x9c\x9f\xfb\xb7\xf76\x11\x08{' - b'\xa6\xe9\x99\xb1\x9e]q\xeb', - 'target_type': 'revision'} - }, - 'id': b"h\xc0\xd2a\x04\xd4~'\x8d\xd6\xbe\x07\xeda\xfa\xfbV" - b"\x1d\r "}, - 85072327: { - 'branches': { - b'HEAD': { - 'target': b'releases/2018.09.09', - 'target_type': 'alias'}, - b'releases/2018.09.01': { - 'target': b'<\xee1(\xe8\x8d_\xc1\xc9\xa6rT\xf1\x1d' - b'\xbb\xdfF\xfdw\xcf', - 'target_type': 'revision'}, - b'releases/2018.09.09': { - 'target': b'\x83\xb9\xb6\xc7\x05\xb1%\xd0\xfem\xd8k' - b'A\x10\x9d\xc5\xfa2\xf8t', - 'target_type': 'revision'}}, - 'id': b'{\xda\x8e\x84\x7fX\xff\x92\x80^\x93V\x18\xa3\xfay' - b'\x12\x9e\xd6\xb3'}, - 49908349: { - 'branches': { - b'master': { - 'target': b'\xe4?r\xe1,\x88\xab\xec\xe7\x9a\x87\xb8' - b'\xc9\xad#.\x1bw=\x18', - 'target_type': 'revision'}}, - 'id': b'\xa1\xa2\x8c\n\xb3\x87\xa8\xf9\xe0a\x8c\xb7' - b'\x05\xea\xb8\x1f\xc4H\xf4s'}, - } - - -class MockStorage: - def origin_get(self, id_): - for origin in ORIGINS: - if origin['type'] == id_['type'] and origin['url'] == id_['url']: - return origin - assert False, id_ - - def snapshot_get_latest(self, origin_id): - if origin_id in SNAPSHOTS: - return SNAPSHOTS[origin_id] - else: - assert False, origin_id +from swh.indexer.tests.test_utils import MockIndexerStorage, MockStorage class TestOriginHeadIndexer(OriginHeadIndexer): """Specific indexer whose configuration is enough to satisfy the indexing tests. """ + + revision_metadata_task = None + origin_metadata_task = None + def prepare(self): self.config = { 'tools': { diff --git a/swh/indexer/tests/test_origin_metadata.py b/swh/indexer/tests/test_origin_metadata.py new file mode 100644 --- /dev/null +++ b/swh/indexer/tests/test_origin_metadata.py @@ -0,0 +1,117 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +import unittest +from celery import task + +from swh.indexer.metadata import OriginMetadataIndexer +from swh.indexer.tests.test_utils import MockObjStorage, MockStorage +from swh.indexer.tests.test_utils import MockIndexerStorage +from swh.indexer.tests.test_origin_head import TestOriginHeadIndexer +from swh.indexer.tests.test_metadata import TestRevisionMetadataIndexer + +from . import start_worker_thread + + +class TestOriginMetadataIndexer(OriginMetadataIndexer): + def prepare(self): + self.config = { + 'storage': { + 'cls': 'remote', + 'args': { + 'url': 'http://localhost:9999', + } + }, + 'tools': { + 'name': 'origin-metadata', + 'version': '0.0.1', + 'configuration': {} + } + } + self.storage = MockStorage() + self.idx_storage = MockIndexerStorage() + self.log = logging.getLogger('swh.indexer') + self.objstorage = MockObjStorage() + self.destination_task = None + self.tools = self.register_tools(self.config['tools']) + self.tool = self.tools[0] + self.results = [] + + +@task +def test_revision_metadata_task(*args, **kwargs): + indexer = TestRevisionMetadataIndexer() + indexer.run(*args, **kwargs) + return indexer.results + + +@task +def test_origin_metadata_task(*args, **kwargs): + indexer = TestOriginMetadataIndexer() + indexer.run(*args, **kwargs) + return indexer.results + + +class TestOriginHeadIndexer(TestOriginHeadIndexer): + revision_metadata_task = test_revision_metadata_task + origin_metadata_task = test_origin_metadata_task + + +class TestOriginMetadata(unittest.TestCase): + def setUp(self): + self.maxDiff = None + MockIndexerStorage.added_data = [] + + def test_pipeline(self): + indexer = TestOriginHeadIndexer() + with start_worker_thread(): + promise = indexer.run( + ["git+https://github.com/moranegg/metadata_test"], + policy_update='update-dups', + parse_ids=True) + promise.get() + + metadata = { + 'id': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', + 'translated_metadata': { + 'identifier': None, + 'maintainer': None, + 'url': [ + 'https://github.com/librariesio/yarn-parser#readme' + ], + 'codeRepository': [{ + 'type': 'git', + 'url': 'git+https://github.com/librariesio/yarn-parser.git' + }], + 'author': ['Andrew Nesbitt'], + 'license': ['AGPL-3.0'], + 'version': ['1.0.0'], + 'description': [ + 'Tiny web service for parsing yarn.lock files' + ], + 'relatedLink': None, + 'developmentStatus': None, + 'operatingSystem': None, + 'issueTracker': [{ + 'url': 'https://github.com/librariesio/yarn-parser/issues' + }], + 'softwareRequirements': [{ + 'express': '^4.14.0', + 'yarn': '^0.21.0', + 'body-parser': '^1.15.2' + }], + 'name': ['yarn-parser'], + 'keywords': [['yarn', 'parse', 'lock', 'dependencies']], + 'email': None + }, + 'indexer_configuration_id': 7 + } + expected_results = [ + ('origin_metadata', True, [metadata]), + ('revision_metadata', True, [metadata])] + + results = list(indexer.idx_storage.added_data) + self.assertCountEqual(expected_results, results) diff --git a/swh/indexer/tests/test_utils.py b/swh/indexer/tests/test_utils.py --- a/swh/indexer/tests/test_utils.py +++ b/swh/indexer/tests/test_utils.py @@ -6,6 +6,123 @@ from swh.objstorage.exc import ObjNotFoundError +ORIGINS = [ + { + 'id': 52189575, + 'lister': None, + 'project': None, + 'type': 'git', + 'url': 'https://github.com/SoftwareHeritage/swh-storage'}, + { + 'id': 4423668, + 'lister': None, + 'project': None, + 'type': 'ftp', + 'url': 'rsync://ftp.gnu.org/gnu/3dldf'}, + { + 'id': 77775770, + 'lister': None, + 'project': None, + 'type': 'deposit', + 'url': 'https://forge.softwareheritage.org/source/jesuisgpl/'}, + { + 'id': 85072327, + 'lister': None, + 'project': None, + 'type': 'pypi', + 'url': 'https://pypi.org/project/limnoria/'}, + { + 'id': 49908349, + 'lister': None, + 'project': None, + 'type': 'svn', + 'url': 'http://0-512-md.googlecode.com/svn/'}, + { + 'id': 424242, + 'lister': None, + 'project': None, + 'type': 'git', + 'url': 'https://github.com/moranegg/metadata_test'}, + ] + +SNAPSHOTS = { + 52189575: { + 'branches': { + b'refs/heads/add-revision-origin-cache': { + 'target': b'L[\xce\x1c\x88\x8eF\t\xf1"\x19\x1e\xfb\xc0' + b's\xe7/\xe9l\x1e', + 'target_type': 'revision'}, + b'HEAD': { + 'target': b'8K\x12\x00d\x03\xcc\xe4]bS\xe3\x8f{\xd7}' + b'\xac\xefrm', + 'target_type': 'revision'}, + b'refs/tags/v0.0.103': { + 'target': b'\xb6"Im{\xfdLb\xb0\x94N\xea\x96m\x13x\x88+' + b'\x0f\xdd', + 'target_type': 'release'}, + }}, + 4423668: { + 'branches': { + b'3DLDF-1.1.4.tar.gz': { + 'target': b'dJ\xfb\x1c\x91\xf4\x82B%]6\xa2\x90|\xd3\xfc' + b'"G\x99\x11', + 'target_type': 'revision'}, + b'3DLDF-2.0.2.tar.gz': { + 'target': b'\xb6\x0e\xe7\x9e9\xac\xaa\x19\x9e=' + b'\xd1\xc5\x00\\\xc6\xfc\xe0\xa6\xb4V', + 'target_type': 'revision'}, + b'3DLDF-2.0.3-examples.tar.gz': { + 'target': b'!H\x19\xc0\xee\x82-\x12F1\xbd\x97' + b'\xfe\xadZ\x80\x80\xc1\x83\xff', + 'target_type': 'revision'}, + b'3DLDF-2.0.3.tar.gz': { + 'target': b'\x8e\xa9\x8e/\xea}\x9feF\xf4\x9f\xfd\xee' + b'\xcc\x1a\xb4`\x8c\x8by', + 'target_type': 'revision'}, + b'3DLDF-2.0.tar.gz': { + 'target': b'F6*\xff(?\x19a\xef\xb6\xc2\x1fv$S\xe3G' + b'\xd3\xd1m', + b'target_type': 'revision'} + }}, + 77775770: { + 'branches': { + b'master': { + 'target': b'\xe7n\xa4\x9c\x9f\xfb\xb7\xf76\x11\x08{' + b'\xa6\xe9\x99\xb1\x9e]q\xeb', + 'target_type': 'revision'} + }, + 'id': b"h\xc0\xd2a\x04\xd4~'\x8d\xd6\xbe\x07\xeda\xfa\xfbV" + b"\x1d\r "}, + 85072327: { + 'branches': { + b'HEAD': { + 'target': b'releases/2018.09.09', + 'target_type': 'alias'}, + b'releases/2018.09.01': { + 'target': b'<\xee1(\xe8\x8d_\xc1\xc9\xa6rT\xf1\x1d' + b'\xbb\xdfF\xfdw\xcf', + 'target_type': 'revision'}, + b'releases/2018.09.09': { + 'target': b'\x83\xb9\xb6\xc7\x05\xb1%\xd0\xfem\xd8k' + b'A\x10\x9d\xc5\xfa2\xf8t', + 'target_type': 'revision'}}, + 'id': b'{\xda\x8e\x84\x7fX\xff\x92\x80^\x93V\x18\xa3\xfay' + b'\x12\x9e\xd6\xb3'}, + 49908349: { + 'branches': { + b'master': { + 'target': b'\xe4?r\xe1,\x88\xab\xec\xe7\x9a\x87\xb8' + b'\xc9\xad#.\x1bw=\x18', + 'target_type': 'revision'}}, + 'id': b'\xa1\xa2\x8c\n\xb3\x87\xa8\xf9\xe0a\x8c\xb7' + b'\x05\xea\xb8\x1f\xc4H\xf4s'}, + 424242: { + 'branches': { + b'HEAD': { + 'target': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f', + 'target_type': 'revision'}}} + } + class MockObjStorage: """Mock an swh-objstorage objstorage with predefined contents. @@ -120,6 +237,8 @@ """Mock an swh-indexer storage. """ + added_data = [] + def indexer_configuration_add(self, tools): tool = tools[0] if tool['tool_name'] == 'swh-metadata-translator': @@ -156,12 +275,16 @@ yield from [] def content_metadata_add(self, metadata, conflict_update=None): - self.state = metadata - self.conflict_update = conflict_update + self.added_data.append( + ('content_metadata', conflict_update, metadata)) def revision_metadata_add(self, metadata, conflict_update=None): - self.state = metadata - self.conflict_update = conflict_update + self.added_data.append( + ('revision_metadata', conflict_update, metadata)) + + def origin_metadata_add(self, metadata, conflict_update=None): + self.added_data.append( + ('origin_metadata', conflict_update, metadata)) def content_metadata_get(self, sha1s): return [{ @@ -210,6 +333,23 @@ outputs. """ + def origin_get(self, id_): + for origin in ORIGINS: + for (k, v) in id_.items(): + if origin[k] != v: + break + else: + # This block is run iff we didn't break, ie. if all supplied + # parts of the id are set to the expected value. + return origin + assert False, id_ + + def snapshot_get_latest(self, origin_id): + if origin_id in SNAPSHOTS: + return SNAPSHOTS[origin_id] + else: + assert False, origin_id + def revision_get(self, revisions): return [{ 'id': b'8dbb6aeb036e7fd80664eb8bfd1507881af1ba9f',