Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cli.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
# WARNING: do not import unnecessary things here to keep cli startup time under | # WARNING: do not import unnecessary things here to keep cli startup time under | ||||
# control | # control | ||||
import logging | import logging | ||||
import os | import os | ||||
from typing import Dict, Optional | from typing import Dict, Optional | ||||
import click | import click | ||||
from swh.core.cli import CONTEXT_SETTINGS | from swh.core.cli import CONTEXT_SETTINGS | ||||
from swh.core.cli import swh as swh_cli_group | from swh.core.cli import swh as swh_cli_group | ||||
from swh.storage.replay import ModelObjectDeserializer | |||||
try: | try: | ||||
from systemd.daemon import notify | from systemd.daemon import notify | ||||
except ImportError: | except ImportError: | ||||
notify = None | notify = None | ||||
@swh_cli_group.group(name="storage", context_settings=CONTEXT_SETTINGS) | @swh_cli_group.group(name="storage", context_settings=CONTEXT_SETTINGS) | ||||
▲ Show 20 Lines • Show All 175 Lines • ▼ Show 20 Lines | def replay(ctx, stop_after_objects, object_types): | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.replay import process_replay_objects | from swh.storage.replay import process_replay_objects | ||||
ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "write") | ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "write") | ||||
conf = ctx.obj["config"] | conf = ctx.obj["config"] | ||||
storage = get_storage(**conf.pop("storage")) | storage = get_storage(**conf.pop("storage")) | ||||
if "error_reporter" in conf: | |||||
from redis import Redis | |||||
reporter = Redis(**conf["error_reporter"]).set | |||||
else: | |||||
reporter = None | |||||
validate = conf.get("privileged", False) | |||||
if not validate and reporter: | |||||
ctx.fail( | |||||
"Invalid configuration: you cannot have 'error_reporter' set if " | |||||
"'privileged' is False; we cannot validate anonymized objects." | |||||
) | |||||
deserializer = ModelObjectDeserializer(reporter=reporter, validate=validate) | |||||
client_cfg = conf.pop("journal_client") | client_cfg = conf.pop("journal_client") | ||||
client_cfg["value_deserializer"] = deserializer.convert | |||||
if object_types: | if object_types: | ||||
client_cfg["object_types"] = object_types | client_cfg["object_types"] = object_types | ||||
if stop_after_objects: | if stop_after_objects: | ||||
client_cfg["stop_after_objects"] = stop_after_objects | client_cfg["stop_after_objects"] = stop_after_objects | ||||
try: | try: | ||||
client = get_journal_client(**client_cfg) | client = get_journal_client(**client_cfg) | ||||
except ValueError as exc: | except ValueError as exc: | ||||
ctx.fail(exc) | ctx.fail(exc) | ||||
worker_fn = functools.partial(process_replay_objects, storage=storage) | worker_fn = functools.partial(process_replay_objects, storage=storage) | ||||
if notify: | if notify: | ||||
Show All 37 Lines |