Changeset View
Changeset View
Standalone View
Standalone View
swh/search/cli.py
Show First 20 Lines • Show All 53 Lines • ▼ Show 20 Lines | @click.option( | ||||
default=None, | default=None, | ||||
type=int, | type=int, | ||||
help="Maximum number of objects to replay. Default is to run forever.", | help="Maximum number of objects to replay. Default is to run forever.", | ||||
) | ) | ||||
@click.option( | @click.option( | ||||
"--object-type", | "--object-type", | ||||
"-o", | "-o", | ||||
multiple=True, | multiple=True, | ||||
default=["origin", "origin_visit", "origin_visit_status"], | |||||
help="Default list of object types to subscribe to", | help="Default list of object types to subscribe to", | ||||
) | ) | ||||
@click.option( | @click.option( | ||||
"--prefix", | "--prefix", "-p", help="Topic prefix to use (e.g swh.journal.indexed)", | ||||
"-p", | |||||
default="swh.journal.objects", | |||||
help="Topic prefix to use (e.g swh.journal.indexed)", | |||||
) | ) | ||||
@click.pass_context | @click.pass_context | ||||
def journal_client_objects(ctx, stop_after_objects, object_type, prefix): | def journal_client_objects(ctx, stop_after_objects, object_type, prefix): | ||||
"""Listens for new objects from the SWH Journal, and schedules tasks | """Listens for new objects from the SWH Journal, and schedules tasks | ||||
to run relevant indexers (currently, origin and origin_visit) | to run relevant indexers (currently, origin and origin_visit) | ||||
on these new objects. | on these new objects. | ||||
""" | """ | ||||
import functools | import functools | ||||
from swh.journal.client import get_journal_client | from swh.journal.client import get_journal_client | ||||
from . import get_search | from . import get_search | ||||
from .journal_client import process_journal_objects | from .journal_client import process_journal_objects | ||||
config = ctx.obj["config"] | config = ctx.obj["config"] | ||||
journal_cfg = config["journal"] | journal_cfg = config["journal"] | ||||
object_types = object_type or journal_cfg.get("object_types") | journal_cfg["object_types"] = object_type or journal_cfg.get("object_types", []) | ||||
prefix = prefix or journal_cfg.get("prefix") | journal_cfg["prefix"] = prefix or journal_cfg.get("prefix") | ||||
journal_cfg["stop_after_objects"] = stop_after_objects or journal_cfg.get( | |||||
client = get_journal_client( | "stop_after_objects" | ||||
cls="kafka", | |||||
object_types=object_types, | |||||
stop_after_objects=stop_after_objects, | |||||
**journal_cfg, | |||||
) | ) | ||||
if len(journal_cfg["object_types"]) == 0: | |||||
raise ValueError("'object_types' must be specified by cli or configuration") | |||||
if journal_cfg["prefix"] is None: | |||||
raise ValueError("'prefix' must be specified by cli or configuration") | |||||
client = get_journal_client(cls="kafka", **journal_cfg,) | |||||
search = get_search(**config["search"]) | search = get_search(**config["search"]) | ||||
worker_fn = functools.partial(process_journal_objects, search=search,) | worker_fn = functools.partial(process_journal_objects, search=search,) | ||||
nb_messages = 0 | nb_messages = 0 | ||||
try: | try: | ||||
nb_messages = client.process(worker_fn) | nb_messages = client.process(worker_fn) | ||||
print("Processed %d messages." % nb_messages) | print("Processed %d messages." % nb_messages) | ||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
Show All 23 Lines |