Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/tests/tasks.py
from celery import current_app as app | from celery import current_app as app | ||||
from swh.indexer.metadata import OriginMetadataIndexer, RevisionMetadataIndexer | from swh.indexer.metadata import DirectoryMetadataIndexer, OriginMetadataIndexer | ||||
from .test_metadata import ContentMetadataTestIndexer | from .test_metadata import ContentMetadataTestIndexer | ||||
from .utils import BASE_TEST_CONFIG | from .utils import BASE_TEST_CONFIG | ||||
class RevisionMetadataTestIndexer(RevisionMetadataIndexer): | class DirectoryMetadataTestIndexer(DirectoryMetadataIndexer): | ||||
"""Specific indexer whose configuration is enough to satisfy the | """Specific indexer whose configuration is enough to satisfy the | ||||
indexing tests. | indexing tests. | ||||
""" | """ | ||||
ContentMetadataIndexer = ContentMetadataTestIndexer | ContentMetadataIndexer = ContentMetadataTestIndexer | ||||
def parse_config_file(self, *args, **kwargs): | def parse_config_file(self, *args, **kwargs): | ||||
return { | return { | ||||
**BASE_TEST_CONFIG, | **BASE_TEST_CONFIG, | ||||
"tools": { | "tools": { | ||||
"name": "swh-metadata-detector", | "name": "swh-metadata-detector", | ||||
"version": "0.0.2", | "version": "0.0.2", | ||||
"configuration": {"type": "local", "context": "NpmMapping"}, | "configuration": {"type": "local", "context": "NpmMapping"}, | ||||
}, | }, | ||||
} | } | ||||
class OriginMetadataTestIndexer(OriginMetadataIndexer): | class OriginMetadataTestIndexer(OriginMetadataIndexer): | ||||
def parse_config_file(self, *args, **kwargs): | def parse_config_file(self, *args, **kwargs): | ||||
return {**BASE_TEST_CONFIG, "tools": []} | return {**BASE_TEST_CONFIG, "tools": []} | ||||
def _prepare_sub_indexers(self): | def _prepare_sub_indexers(self): | ||||
self.revision_metadata_indexer = RevisionMetadataTestIndexer() | self.directory_metadata_indexer = DirectoryMetadataTestIndexer() | ||||
@app.task | @app.task | ||||
def revision_intrinsic_metadata(*args, **kwargs): | def directory_intrinsic_metadata(*args, **kwargs): | ||||
indexer = RevisionMetadataTestIndexer() | indexer = DirectoryMetadataTestIndexer() | ||||
indexer.run(*args, **kwargs) | indexer.run(*args, **kwargs) | ||||
print("REV RESULT=", indexer.results) | print("REV RESULT=", indexer.results) | ||||
@app.task | @app.task | ||||
def origin_intrinsic_metadata(*args, **kwargs): | def origin_intrinsic_metadata(*args, **kwargs): | ||||
indexer = OriginMetadataTestIndexer() | indexer = OriginMetadataTestIndexer() | ||||
indexer.run(*args, **kwargs) | indexer.run(*args, **kwargs) |