diff --git a/swh/storage/cli.py b/swh/storage/cli.py --- a/swh/storage/cli.py +++ b/swh/storage/cli.py @@ -15,7 +15,6 @@ from swh.core.cli import swh as swh_cli_group from swh.storage.replay import ModelObjectDeserializer - try: from systemd.daemon import notify except ImportError: @@ -192,8 +191,37 @@ def replay(ctx, stop_after_objects, object_types): """Fill a Storage by reading a Journal. - There can be several 'replayers' filling a Storage as long as they use - the same `group-id`. + This is typically used for a mirror configuration, reading the Software + Heritage kafka journal to retrieve objects of the Software Heritage main + storage to feed a replication storage. There can be several 'replayers' + filling a Storage as long as they use the same `group-id`. + + The expected configuration file should have 2 sections: + + - storage: the configuration of the storage in which to add objects + received from the kafka journal, + + - journal_client: the configuration of access to the kafka journal. See the + documentation of `swh.journal` for more details on the possible + configuration entries in this section. + + https://docs.softwareheritage.org/devel/apidoc/swh.journal.client.html + + In addition to these 2 mandatory config sections, a third 'replayer' may be + specified with a 'error_reporter' config entry allowing to specify redis + connection parameters that will be used to report non-recoverable mirroring, + eg.:: + + storage: + [...] + journal_client: + [...] + replayer: + error_reporter: + host: redis.local + port: 6379 + db: 1 + """ import functools @@ -206,13 +234,16 @@ conf = ctx.obj["config"] storage = get_storage(**conf.pop("storage")) - if "error_reporter" in conf: + client_cfg = conf.pop("journal_client") + replayer_cfg = conf.pop("replayer", {}) + + if "error_reporter" in replayer_cfg: from redis import Redis - reporter = Redis(**conf["error_reporter"]).set + reporter = Redis(**replayer_cfg.get("error_reporter")).set else: reporter = None - validate = conf.get("privileged", False) + validate = client_cfg.get("privileged", False) if not validate and reporter: ctx.fail( @@ -222,7 +253,6 @@ deserializer = ModelObjectDeserializer(reporter=reporter, validate=validate) - client_cfg = conf.pop("journal_client") client_cfg["value_deserializer"] = deserializer.convert if object_types: client_cfg["object_types"] = object_types