Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/cli.py
Show First 20 Lines • Show All 282 Lines • ▼ Show 20 Lines | ): | ||||
from swh.journal.client import get_journal_client | from swh.journal.client import get_journal_client | ||||
from swh.scheduler import get_scheduler | from swh.scheduler import get_scheduler | ||||
cfg = ctx.obj["config"] | cfg = ctx.obj["config"] | ||||
journal_cfg = cfg.get("journal", {}) | journal_cfg = cfg.get("journal", {}) | ||||
scheduler = _get_api(get_scheduler, cfg, "scheduler", scheduler_url) | scheduler = _get_api(get_scheduler, cfg, "scheduler", scheduler_url) | ||||
brokers = brokers or journal_cfg.get("brokers") | if brokers: | ||||
if not brokers: | journal_cfg["brokers"] = brokers | ||||
if not journal_cfg.get("brokers"): | |||||
raise ValueError("The brokers configuration is mandatory.") | raise ValueError("The brokers configuration is mandatory.") | ||||
prefix = prefix or journal_cfg.get("prefix") | if prefix: | ||||
group_id = group_id or journal_cfg.get("group_id") | journal_cfg["prefix"] = prefix | ||||
if group_id: | |||||
journal_cfg["group_id"] = group_id | |||||
origin_metadata_task_type = origin_metadata_task_type or journal_cfg.get( | origin_metadata_task_type = origin_metadata_task_type or journal_cfg.get( | ||||
"origin_metadata_task_type" | "origin_metadata_task_type" | ||||
) | ) | ||||
stop_after_objects = stop_after_objects or journal_cfg.get("stop_after_objects") | if stop_after_objects: | ||||
batch_size = batch_size or journal_cfg.get("batch_size", 200) | journal_cfg["stop_after_objects"] = stop_after_objects | ||||
if batch_size: | |||||
journal_cfg["batch_size"] = batch_size | |||||
object_types = set() | object_types = set() | ||||
worker_fns: List[Callable[[ObjectsDict], Dict]] = [] | worker_fns: List[Callable[[ObjectsDict], Dict]] = [] | ||||
if indexer is None: | if indexer is None: | ||||
warnings.warn( | warnings.warn( | ||||
"'swh indexer journal-client' with no argument creates scheduler tasks " | "'swh indexer journal-client' with no argument creates scheduler tasks " | ||||
"to index, rather than index directly.", | "to index, rather than index directly.", | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | if indexer in ("content_fossology_license", "*"): | ||||
idx.catch_exceptions = False # don't commit offsets if indexation failed | idx.catch_exceptions = False # don't commit offsets if indexation failed | ||||
worker_fns.append(idx.process_journal_objects) | worker_fns.append(idx.process_journal_objects) | ||||
if not worker_fns: | if not worker_fns: | ||||
raise click.ClickException(f"Unknown indexer: {indexer}") | raise click.ClickException(f"Unknown indexer: {indexer}") | ||||
client = get_journal_client( | client = get_journal_client( | ||||
cls="kafka", | cls="kafka", | ||||
brokers=brokers, | |||||
prefix=prefix, | |||||
group_id=group_id, | |||||
object_types=list(object_types), | object_types=list(object_types), | ||||
stop_after_objects=stop_after_objects, | **journal_cfg, | ||||
batch_size=batch_size, | |||||
) | ) | ||||
def worker_fn(objects: ObjectsDict): | def worker_fn(objects: ObjectsDict): | ||||
for fn in worker_fns: | for fn in worker_fns: | ||||
fn(objects) | fn(objects) | ||||
try: | try: | ||||
client.process(worker_fn) | client.process(worker_fn) | ||||
Show All 32 Lines |