Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/cli.py
Show First 20 Lines • Show All 209 Lines • ▼ Show 20 Lines | ): | ||||
kwargs = {"retries_left": 1} | kwargs = {"retries_left": 1} | ||||
schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs) | schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs) | ||||
@indexer_cli_group.command("journal-client") | @indexer_cli_group.command("journal-client") | ||||
@click.argument( | @click.argument( | ||||
"indexer", | "indexer", | ||||
type=click.Choice(["origin-intrinsic-metadata", "*"]), | type=click.Choice(["origin-intrinsic-metadata", "extrinsic-metadata", "*"]), | ||||
required=False | required=False | ||||
# TODO: remove required=False after we stop using it | # TODO: remove required=False after we stop using it | ||||
) | ) | ||||
@click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") | @click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") | ||||
@click.option( | @click.option( | ||||
"--origin-metadata-task-type", | "--origin-metadata-task-type", | ||||
default="index-origin-metadata", | default="index-origin-metadata", | ||||
help="Name of the task running the origin metadata indexer.", | help="Name of the task running the origin metadata indexer.", | ||||
Show All 30 Lines | ): | ||||
* schedules tasks to run relevant indexers (currently, only | * schedules tasks to run relevant indexers (currently, only | ||||
origin-intrinsic-metadata) on these new objects otherwise. | origin-intrinsic-metadata) on these new objects otherwise. | ||||
Passing '*' as indexer name runs all indexers. | Passing '*' as indexer name runs all indexers. | ||||
""" | """ | ||||
import functools | import functools | ||||
import warnings | import warnings | ||||
from swh.indexer.indexer import ObjectsDict | from swh.indexer.indexer import BaseIndexer, ObjectsDict | ||||
from swh.indexer.journal_client import process_journal_objects | from swh.indexer.journal_client import process_journal_objects | ||||
from swh.journal.client import get_journal_client | from swh.journal.client import get_journal_client | ||||
from swh.scheduler import get_scheduler | from swh.scheduler import get_scheduler | ||||
cfg = ctx.obj["config"] | cfg = ctx.obj["config"] | ||||
journal_cfg = cfg.get("journal", {}) | journal_cfg = cfg.get("journal", {}) | ||||
scheduler = _get_api(get_scheduler, cfg, "scheduler", scheduler_url) | scheduler = _get_api(get_scheduler, cfg, "scheduler", scheduler_url) | ||||
Show All 24 Lines | if indexer is None: | ||||
process_journal_objects, | process_journal_objects, | ||||
scheduler=scheduler, | scheduler=scheduler, | ||||
task_names={ | task_names={ | ||||
"origin_metadata": origin_metadata_task_type, | "origin_metadata": origin_metadata_task_type, | ||||
}, | }, | ||||
) | ) | ||||
) | ) | ||||
idx: Optional[BaseIndexer] = None | |||||
if indexer in ("origin-intrinsic-metadata", "*"): | if indexer in ("origin-intrinsic-metadata", "*"): | ||||
from swh.indexer.metadata import OriginMetadataIndexer | from swh.indexer.metadata import OriginMetadataIndexer | ||||
object_types.add("origin_visit_status") | object_types.add("origin_visit_status") | ||||
idx = OriginMetadataIndexer() | idx = OriginMetadataIndexer() | ||||
idx.catch_exceptions = False # don't commit offsets if indexation failed | idx.catch_exceptions = False # don't commit offsets if indexation failed | ||||
worker_fns.append(idx.process_journal_objects) | worker_fns.append(idx.process_journal_objects) | ||||
if indexer in ("extrinsic-metadata", "*"): | |||||
from swh.indexer.metadata import ExtrinsicMetadataIndexer | |||||
object_types.add("raw_extrinsic_metadata") | |||||
idx = ExtrinsicMetadataIndexer() | |||||
idx.catch_exceptions = False # don't commit offsets if indexation failed | |||||
worker_fns.append(idx.process_journal_objects) | |||||
if not worker_fns: | if not worker_fns: | ||||
raise click.ClickException(f"Unknown indexer: {indexer}") | raise click.ClickException(f"Unknown indexer: {indexer}") | ||||
client = get_journal_client( | client = get_journal_client( | ||||
cls="kafka", | cls="kafka", | ||||
brokers=brokers, | brokers=brokers, | ||||
prefix=prefix, | prefix=prefix, | ||||
group_id=group_id, | group_id=group_id, | ||||
▲ Show 20 Lines • Show All 42 Lines • Show Last 20 Lines |