Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/cli.py
Show First 20 Lines • Show All 211 Lines • ▼ Show 20 Lines | ): | ||||
schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs) | schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs) | ||||
@indexer_cli_group.command("journal-client") | @indexer_cli_group.command("journal-client") | ||||
@click.argument( | @click.argument( | ||||
"indexer", | "indexer", | ||||
type=click.Choice( | type=click.Choice( | ||||
[ | [ | ||||
"origin-intrinsic-metadata", | "origin_intrinsic_metadata", | ||||
"extrinsic-metadata", | "extrinsic_metadata", | ||||
"content-mimetype", | "content_mimetype", | ||||
"content-fossology-license", | "content_fossology_license", | ||||
"*", | "*", | ||||
] | ] | ||||
), | ), | ||||
required=False | required=False | ||||
# TODO: remove required=False after we stop using it | # TODO: remove required=False after we stop using it | ||||
) | ) | ||||
@click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") | @click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") | ||||
@click.option( | @click.option( | ||||
Show All 26 Lines | def journal_client( | ||||
group_id: str, | group_id: str, | ||||
stop_after_objects: Optional[int], | stop_after_objects: Optional[int], | ||||
): | ): | ||||
""" | """ | ||||
Listens for new objects from the SWH Journal, and either: | Listens for new objects from the SWH Journal, and either: | ||||
* runs the indexer with the name passed as argument, if any | * runs the indexer with the name passed as argument, if any | ||||
* schedules tasks to run relevant indexers (currently, only | * schedules tasks to run relevant indexers (currently, only | ||||
origin-intrinsic-metadata) on these new objects otherwise. | origin_intrinsic_metadata) on these new objects otherwise. | ||||
Passing '*' as indexer name runs all indexers. | Passing '*' as indexer name runs all indexers. | ||||
""" | """ | ||||
import functools | import functools | ||||
import warnings | import warnings | ||||
from swh.indexer.indexer import BaseIndexer, ObjectsDict | from swh.indexer.indexer import BaseIndexer, ObjectsDict | ||||
from swh.indexer.journal_client import process_journal_objects | from swh.indexer.journal_client import process_journal_objects | ||||
Show All 33 Lines | if indexer is None: | ||||
task_names={ | task_names={ | ||||
"origin_metadata": origin_metadata_task_type, | "origin_metadata": origin_metadata_task_type, | ||||
}, | }, | ||||
) | ) | ||||
) | ) | ||||
idx: Optional[BaseIndexer] = None | idx: Optional[BaseIndexer] = None | ||||
if indexer in ("origin-intrinsic-metadata", "*"): | if indexer in ("origin_intrinsic_metadata", "*"): | ||||
from swh.indexer.metadata import OriginMetadataIndexer | from swh.indexer.metadata import OriginMetadataIndexer | ||||
object_types.add("origin_visit_status") | object_types.add("origin_visit_status") | ||||
idx = OriginMetadataIndexer() | idx = OriginMetadataIndexer() | ||||
idx.catch_exceptions = False # don't commit offsets if indexation failed | idx.catch_exceptions = False # don't commit offsets if indexation failed | ||||
worker_fns.append(idx.process_journal_objects) | worker_fns.append(idx.process_journal_objects) | ||||
if indexer in ("extrinsic-metadata", "*"): | if indexer in ("extrinsic_metadata", "*"): | ||||
from swh.indexer.metadata import ExtrinsicMetadataIndexer | from swh.indexer.metadata import ExtrinsicMetadataIndexer | ||||
object_types.add("raw_extrinsic_metadata") | object_types.add("raw_extrinsic_metadata") | ||||
idx = ExtrinsicMetadataIndexer() | idx = ExtrinsicMetadataIndexer() | ||||
idx.catch_exceptions = False # don't commit offsets if indexation failed | idx.catch_exceptions = False # don't commit offsets if indexation failed | ||||
worker_fns.append(idx.process_journal_objects) | worker_fns.append(idx.process_journal_objects) | ||||
if indexer in ("content-mimetype", "*"): | if indexer in ("content_mimetype", "*"): | ||||
from swh.indexer.mimetype import MimetypeIndexer | from swh.indexer.mimetype import MimetypeIndexer | ||||
object_types.add("content") | object_types.add("content") | ||||
idx = MimetypeIndexer() | idx = MimetypeIndexer() | ||||
idx.catch_exceptions = False # don't commit offsets if indexation failed | idx.catch_exceptions = False # don't commit offsets if indexation failed | ||||
worker_fns.append(idx.process_journal_objects) | worker_fns.append(idx.process_journal_objects) | ||||
if indexer in ("content-fossology-license", "*"): | if indexer in ("content_fossology_license", "*"): | ||||
from swh.indexer.fossology_license import FossologyLicenseIndexer | from swh.indexer.fossology_license import FossologyLicenseIndexer | ||||
object_types.add("content") | object_types.add("content") | ||||
idx = FossologyLicenseIndexer() | idx = FossologyLicenseIndexer() | ||||
idx.catch_exceptions = False # don't commit offsets if indexation failed | idx.catch_exceptions = False # don't commit offsets if indexation failed | ||||
worker_fns.append(idx.process_journal_objects) | worker_fns.append(idx.process_journal_objects) | ||||
if not worker_fns: | if not worker_fns: | ||||
▲ Show 20 Lines • Show All 49 Lines • Show Last 20 Lines |