Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/tests/tasks.py
| from celery import current_app as app | from celery import current_app as app | ||||
| from swh.indexer.metadata import OriginMetadataIndexer, RevisionMetadataIndexer | from swh.indexer.metadata import DirectoryMetadataIndexer, OriginMetadataIndexer | ||||
| from .test_metadata import ContentMetadataTestIndexer | from .test_metadata import ContentMetadataTestIndexer | ||||
| from .utils import BASE_TEST_CONFIG | from .utils import BASE_TEST_CONFIG | ||||
| class RevisionMetadataTestIndexer(RevisionMetadataIndexer): | class DirectoryMetadataTestIndexer(DirectoryMetadataIndexer): | ||||
| """Specific indexer whose configuration is enough to satisfy the | """Specific indexer whose configuration is enough to satisfy the | ||||
| indexing tests. | indexing tests. | ||||
| """ | """ | ||||
| ContentMetadataIndexer = ContentMetadataTestIndexer | ContentMetadataIndexer = ContentMetadataTestIndexer | ||||
| def parse_config_file(self, *args, **kwargs): | def parse_config_file(self, *args, **kwargs): | ||||
| return { | return { | ||||
| **BASE_TEST_CONFIG, | **BASE_TEST_CONFIG, | ||||
| "tools": { | "tools": { | ||||
| "name": "swh-metadata-detector", | "name": "swh-metadata-detector", | ||||
| "version": "0.0.2", | "version": "0.0.2", | ||||
| "configuration": {"type": "local", "context": "NpmMapping"}, | "configuration": {"type": "local", "context": "NpmMapping"}, | ||||
| }, | }, | ||||
| } | } | ||||
| class OriginMetadataTestIndexer(OriginMetadataIndexer): | class OriginMetadataTestIndexer(OriginMetadataIndexer): | ||||
| def parse_config_file(self, *args, **kwargs): | def parse_config_file(self, *args, **kwargs): | ||||
| return {**BASE_TEST_CONFIG, "tools": []} | return {**BASE_TEST_CONFIG, "tools": []} | ||||
| def _prepare_sub_indexers(self): | def _prepare_sub_indexers(self): | ||||
| self.revision_metadata_indexer = RevisionMetadataTestIndexer() | self.directory_metadata_indexer = DirectoryMetadataTestIndexer() | ||||
| @app.task | @app.task | ||||
| def revision_intrinsic_metadata(*args, **kwargs): | def directory_intrinsic_metadata(*args, **kwargs): | ||||
| indexer = RevisionMetadataTestIndexer() | indexer = DirectoryMetadataTestIndexer() | ||||
| indexer.run(*args, **kwargs) | indexer.run(*args, **kwargs) | ||||
| print("REV RESULT=", indexer.results) | print("REV RESULT=", indexer.results) | ||||
| @app.task | @app.task | ||||
| def origin_intrinsic_metadata(*args, **kwargs): | def origin_intrinsic_metadata(*args, **kwargs): | ||||
| indexer = OriginMetadataTestIndexer() | indexer = OriginMetadataTestIndexer() | ||||
| indexer.run(*args, **kwargs) | indexer.run(*args, **kwargs) |