Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/cli.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Iterator | from typing import Callable, Dict, Iterator, List, Optional | ||||
# WARNING: do not import unnecessary things here to keep cli startup time under | # WARNING: do not import unnecessary things here to keep cli startup time under | ||||
# control | # control | ||||
import click | import click | ||||
from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup | from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup | ||||
from swh.core.cli import swh as swh_cli_group | from swh.core.cli import swh as swh_cli_group | ||||
▲ Show 20 Lines • Show All 193 Lines • ▼ Show 20 Lines | ): | ||||
origins = list_origins_by_producer(idx_storage, mappings, tool_ids) | origins = list_origins_by_producer(idx_storage, mappings, tool_ids) | ||||
kwargs = {"retries_left": 1} | kwargs = {"retries_left": 1} | ||||
schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs) | schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs) | ||||
@indexer_cli_group.command("journal-client") | @indexer_cli_group.command("journal-client") | ||||
@click.argument( | |||||
"indexer", | |||||
type=click.Choice(["origin-intrinsic-metadata", "*"]), | |||||
required=False | |||||
# TODO: remove required=False after we stop using it | |||||
) | |||||
@click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") | @click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") | ||||
@click.option( | @click.option( | ||||
"--origin-metadata-task-type", | "--origin-metadata-task-type", | ||||
default="index-origin-metadata", | default="index-origin-metadata", | ||||
help="Name of the task running the origin metadata indexer.", | help="Name of the task running the origin metadata indexer.", | ||||
) | ) | ||||
@click.option( | @click.option( | ||||
"--broker", "brokers", type=str, multiple=True, help="Kafka broker to connect to." | "--broker", "brokers", type=str, multiple=True, help="Kafka broker to connect to." | ||||
) | ) | ||||
@click.option( | @click.option( | ||||
"--prefix", type=str, default=None, help="Prefix of Kafka topic names to read from." | "--prefix", type=str, default=None, help="Prefix of Kafka topic names to read from." | ||||
) | ) | ||||
@click.option("--group-id", type=str, help="Consumer/group id for reading from Kafka.") | @click.option("--group-id", type=str, help="Consumer/group id for reading from Kafka.") | ||||
@click.option( | @click.option( | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
"-m", | "-m", | ||||
default=None, | default=None, | ||||
type=int, | type=int, | ||||
help="Maximum number of objects to replay. Default is to run forever.", | help="Maximum number of objects to replay. Default is to run forever.", | ||||
) | ) | ||||
@click.pass_context | @click.pass_context | ||||
def journal_client( | def journal_client( | ||||
ctx, | ctx, | ||||
scheduler_url, | indexer: Optional[str], | ||||
origin_metadata_task_type, | scheduler_url: str, | ||||
brokers, | origin_metadata_task_type: str, | ||||
prefix, | brokers: List[str], | ||||
group_id, | prefix: str, | ||||
stop_after_objects, | group_id: str, | ||||
stop_after_objects: Optional[int], | |||||
): | ): | ||||
"""Listens for new objects from the SWH Journal, and schedules tasks | """ | ||||
to run relevant indexers (currently, only origin-intrinsic-metadata) | Listens for new objects from the SWH Journal, and either: | ||||
on these new objects.""" | |||||
* runs the indexer with the name passed as argument, if any | |||||
* schedules tasks to run relevant indexers (currently, only | |||||
origin-intrinsic-metadata) on these new objects otherwise. | |||||
Passing '*' as indexer name runs all indexers. | |||||
""" | |||||
import functools | import functools | ||||
import warnings | |||||
from swh.indexer.indexer import ObjectsDict | |||||
from swh.indexer.journal_client import process_journal_objects | from swh.indexer.journal_client import process_journal_objects | ||||
from swh.journal.client import get_journal_client | from swh.journal.client import get_journal_client | ||||
from swh.scheduler import get_scheduler | from swh.scheduler import get_scheduler | ||||
cfg = ctx.obj["config"] | cfg = ctx.obj["config"] | ||||
journal_cfg = cfg.get("journal", {}) | journal_cfg = cfg.get("journal", {}) | ||||
scheduler = _get_api(get_scheduler, cfg, "scheduler", scheduler_url) | scheduler = _get_api(get_scheduler, cfg, "scheduler", scheduler_url) | ||||
brokers = brokers or journal_cfg.get("brokers") | brokers = brokers or journal_cfg.get("brokers") | ||||
if not brokers: | if not brokers: | ||||
raise ValueError("The brokers configuration is mandatory.") | raise ValueError("The brokers configuration is mandatory.") | ||||
prefix = prefix or journal_cfg.get("prefix") | prefix = prefix or journal_cfg.get("prefix") | ||||
group_id = group_id or journal_cfg.get("group_id") | group_id = group_id or journal_cfg.get("group_id") | ||||
origin_metadata_task_type = origin_metadata_task_type or journal_cfg.get( | origin_metadata_task_type = origin_metadata_task_type or journal_cfg.get( | ||||
"origin_metadata_task_type" | "origin_metadata_task_type" | ||||
) | ) | ||||
stop_after_objects = stop_after_objects or journal_cfg.get("stop_after_objects") | stop_after_objects = stop_after_objects or journal_cfg.get("stop_after_objects") | ||||
object_types = set() | |||||
worker_fns: List[Callable[[ObjectsDict], Dict]] = [] | |||||
if indexer is None: | |||||
warnings.warn( | |||||
"'swh indexer journal-client' with no argument creates scheduler tasks " | |||||
"to index, rather than index directly.", | |||||
DeprecationWarning, | |||||
) | |||||
object_types.add("origin_visit_status") | |||||
worker_fns.append( | |||||
functools.partial( | |||||
process_journal_objects, | |||||
scheduler=scheduler, | |||||
task_names={ | |||||
"origin_metadata": origin_metadata_task_type, | |||||
}, | |||||
) | |||||
) | |||||
if indexer in ("origin-intrinsic-metadata", "*"): | |||||
from swh.indexer.metadata import OriginMetadataIndexer | |||||
object_types.add("origin_visit_status") | |||||
idx = OriginMetadataIndexer() | |||||
idx.catch_exceptions = False # don't commit offsets if indexation failed | |||||
worker_fns.append(idx.process_journal_objects) | |||||
if not worker_fns: | |||||
raise click.ClickException(f"Unknown indexer: {indexer}") | |||||
client = get_journal_client( | client = get_journal_client( | ||||
cls="kafka", | cls="kafka", | ||||
brokers=brokers, | brokers=brokers, | ||||
prefix=prefix, | prefix=prefix, | ||||
group_id=group_id, | group_id=group_id, | ||||
object_types=["origin_visit_status"], | object_types=list(object_types), | ||||
stop_after_objects=stop_after_objects, | stop_after_objects=stop_after_objects, | ||||
) | ) | ||||
worker_fn = functools.partial( | def worker_fn(objects: ObjectsDict): | ||||
process_journal_objects, | for fn in worker_fns: | ||||
scheduler=scheduler, | fn(objects) | ||||
task_names={ | |||||
"origin_metadata": origin_metadata_task_type, | |||||
}, | |||||
) | |||||
try: | try: | ||||
client.process(worker_fn) | client.process(worker_fn) | ||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
ctx.exit(0) | ctx.exit(0) | ||||
else: | else: | ||||
print("Done.") | print("Done.") | ||||
finally: | finally: | ||||
client.close() | client.close() | ||||
Show All 26 Lines |