Changeset View
Changeset View
Standalone View
Standalone View
swh/search/cli.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import functools | import functools | ||||
import click | import click | ||||
from swh.core import config | from swh.core import config | ||||
from swh.core.cli import CONTEXT_SETTINGS | from swh.core.cli import CONTEXT_SETTINGS | ||||
from swh.journal.cli import get_journal_client | from swh.journal.client import get_journal_client | ||||
from . import get_search | from . import get_search | ||||
from .journal_client import process_journal_objects | from .journal_client import process_journal_objects | ||||
from .api.server import load_and_check_config, app | from .api.server import load_and_check_config, app | ||||
@click.group(name="search", context_settings=CONTEXT_SETTINGS) | @click.group(name="search", context_settings=CONTEXT_SETTINGS) | ||||
@click.option( | @click.option( | ||||
Show All 29 Lines | |||||
@journal_client.command("objects") | @journal_client.command("objects") | ||||
@click.option( | @click.option( | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
"-m", | "-m", | ||||
default=None, | default=None, | ||||
type=int, | type=int, | ||||
help="Maximum number of objects to replay. Default is to " "run forever.", | help="Maximum number of objects to replay. Default is to run forever.", | ||||
) | ) | ||||
@click.pass_context | @click.pass_context | ||||
def journal_client_objects(ctx, stop_after_objects): | def journal_client_objects(ctx, stop_after_objects): | ||||
"""Listens for new objects from the SWH Journal, and schedules tasks | """Listens for new objects from the SWH Journal, and schedules tasks | ||||
to run relevant indexers (currently, only origin) | to run relevant indexers (currently, only origin) | ||||
on these new objects.""" | on these new objects.""" | ||||
config = ctx.obj["config"] | |||||
journal_cfg = config["journal"] | |||||
client = get_journal_client( | client = get_journal_client( | ||||
ctx, | cls="kafka", | ||||
object_types=["origin", "origin_visit"], | object_types=["origin", "origin_visit"], | ||||
stop_after_objects=stop_after_objects, | stop_after_objects=stop_after_objects, | ||||
vlorentz: replace this with `**journal_cfg` | |||||
**journal_cfg, | |||||
) | ) | ||||
search = get_search(**ctx.obj["config"]["search"]) | search = get_search(**config["search"]) | ||||
worker_fn = functools.partial(process_journal_objects, search=search,) | worker_fn = functools.partial(process_journal_objects, search=search,) | ||||
nb_messages = 0 | nb_messages = 0 | ||||
try: | try: | ||||
Not Done Inline Actionsremove this vlorentz: remove this | |||||
nb_messages = client.process(worker_fn) | nb_messages = client.process(worker_fn) | ||||
print("Processed %d messages." % nb_messages) | print("Processed %d messages." % nb_messages) | ||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
ctx.exit(0) | ctx.exit(0) | ||||
else: | else: | ||||
print("Done.") | print("Done.") | ||||
finally: | finally: | ||||
client.close() | client.close() | ||||
Show All 16 Lines |
replace this with **journal_cfg