diff --git a/swh/objstorage/replayer/cli.py b/swh/objstorage/replayer/cli.py --- a/swh/objstorage/replayer/cli.py +++ b/swh/objstorage/replayer/cli.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2020 The Software Heritage developers +# Copyright (C) 2016-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -61,6 +61,40 @@ ``--check-dst`` sets whether the replayer should check in the destination ObjStorage before copying an object. You can turn that off if you know you're copying to an empty ObjStorage. + + The expected configuration file should have 3 sections: + + - objstorage: the source object storage from which to retrieve objects to + copy; this objstorage can (and should) be a read-only objstorage, + + https://docs.softwareheritage.org/devel/apidoc/swh.objstorage.html + + - objstorage_dst: the destination objstorage in which objects will be + written into, + + - journal_client: the configuration of the kafka journal from which the + `content` topic will be consumed to get the list of content objects to + copy from the source objstorage to the destination one. + + https://docs.softwareheritage.org/devel/apidoc/swh.journal.client.html + + In addition to these 3 mandatory sections, an optional 'replayer' section + can be provided with an 'error_reporter' config entry allowing to specify a + Redis connection parameter set that will be used to report objects that + could not be copied, eg.:: + + objstorage: + [...] + objstorage_dst: + [...] + journal_client: + [...] + replayer: + error_reporter: + host: redis.local + port: 6379 + db: 1 + """ import functools import mmap @@ -100,12 +134,14 @@ else: exclude_fn = None - journal_cfg = conf["journal_client"] - journal_cfg.setdefault("cls", "kafka") - if "error_reporter" in journal_cfg: + journal_cfg = conf.pop("journal_client") + replayer_cfg = conf.pop("replayer", {}) + if "error_reporter" in replayer_cfg: from redis import Redis + from swh.objstorage.replayer import replay - replay.REPORTER = Redis(**journal_cfg.pop("error_reporter")).set + + replay.REPORTER = Redis(**replayer_cfg.get("error_reporter")).set client = get_journal_client( **journal_cfg, stop_after_objects=stop_after_objects, object_types=("content",), diff --git a/swh/objstorage/replayer/tests/test_cli.py b/swh/objstorage/replayer/tests/test_cli.py --- a/swh/objstorage/replayer/tests/test_cli.py +++ b/swh/objstorage/replayer/tests/test_cli.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019-2020 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -62,10 +62,9 @@ return decorator -def invoke(*args, env=None, journal_config=None): +def invoke(*args, env=None, **kwargs): config = copy.deepcopy(CLI_CONFIG) - if journal_config: - config["journal_client"] = journal_config + config.update(kwargs) runner = CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: @@ -132,7 +131,8 @@ "replay", "--stop-after-objects", str(NUM_CONTENTS), - journal_config={ + journal_client={ + "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, @@ -170,7 +170,8 @@ "replay", "--stop-after-objects", str(NUM_CONTENTS), - journal_config={ + journal_client={ + "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, @@ -216,7 +217,8 @@ "--stop-after-objects", str(NUM_CONTENTS), env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"}, - journal_config={ + journal_client={ + "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, @@ -273,7 +275,8 @@ str(NUM_CONTENTS), "--exclude-sha1-file", fd.name, - journal_config={ + journal_client={ + "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, @@ -337,7 +340,8 @@ "--stop-after-objects", str(NUM_CONTENTS), "--check-dst" if check_dst else "--no-check-dst", - journal_config={ + journal_client={ + "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, @@ -446,10 +450,13 @@ "--check-dst", "--stop-after-objects", str(NUM_CONTENTS), - journal_config={ + journal_client={ + "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, + }, + replayer={ "error_reporter": {"host": redis_proc.host, "port": redis_proc.port}, }, ) @@ -544,10 +551,13 @@ "replay", "--stop-after-objects", str(NUM_CONTENTS), - journal_config={ + journal_client={ + "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix, + }, + replayer={ "error_reporter": {"host": redis_proc.host, "port": redis_proc.port}, }, ) @@ -628,7 +638,8 @@ "replay", "--stop-after-objects", str(NUM_CONTENTS), - journal_config={ + journal_client={ + "cls": "kafka", "brokers": kafka_server, "group_id": kafka_consumer_group, "prefix": kafka_prefix,