Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/replayer/tests/test_cli.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import Counter | from collections import Counter | ||||
import copy | import copy | ||||
import functools | import functools | ||||
import logging | import logging | ||||
▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | def decorator(f): | ||||
get_objstorage_mock.side_effect = get_mock_objstorage | get_objstorage_mock.side_effect = get_mock_objstorage | ||||
f(*args, objstorages=objstorages, **kwargs) | f(*args, objstorages=objstorages, **kwargs) | ||||
return newf | return newf | ||||
return decorator | return decorator | ||||
def invoke(*args, env=None, journal_config=None): | def invoke(*args, env=None, **kwargs): | ||||
config = copy.deepcopy(CLI_CONFIG) | config = copy.deepcopy(CLI_CONFIG) | ||||
if journal_config: | config.update(kwargs) | ||||
config["journal_client"] = journal_config | |||||
vlorentz: the conditional isn't needed | |||||
Done Inline Actionsindeed, thanks douardda: indeed, thanks | |||||
runner = CliRunner() | runner = CliRunner() | ||||
with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: | with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: | ||||
yaml.dump(config, config_fd) | yaml.dump(config, config_fd) | ||||
config_fd.seek(0) | config_fd.seek(0) | ||||
args = ["-C" + config_fd.name] + list(args) | args = ["-C" + config_fd.name] + list(args) | ||||
return runner.invoke( | return runner.invoke( | ||||
objstorage_cli_group, args, obj={"log_level": logging.DEBUG}, env=env, | objstorage_cli_group, args, obj={"log_level": logging.DEBUG}, env=env, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 49 Lines • ▼ Show 20 Lines | ): | ||||
contents = _fill_objstorage_and_kafka( | contents = _fill_objstorage_and_kafka( | ||||
kafka_server, kafka_prefix, objstorages["src"] | kafka_server, kafka_prefix, objstorages["src"] | ||||
) | ) | ||||
result = invoke( | result = invoke( | ||||
"replay", | "replay", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
str(NUM_CONTENTS), | str(NUM_CONTENTS), | ||||
journal_config={ | journal_client={ | ||||
"cls": "kafka", | |||||
"brokers": kafka_server, | "brokers": kafka_server, | ||||
"group_id": kafka_consumer_group, | "group_id": kafka_consumer_group, | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
}, | }, | ||||
) | ) | ||||
expected = r"Done.\n" | expected = r"Done.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
Show All 21 Lines | ): | ||||
caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | ||||
expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) | expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) | ||||
result = invoke( | result = invoke( | ||||
"replay", | "replay", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
str(NUM_CONTENTS), | str(NUM_CONTENTS), | ||||
journal_config={ | journal_client={ | ||||
"cls": "kafka", | |||||
"brokers": kafka_server, | "brokers": kafka_server, | ||||
"group_id": kafka_consumer_group, | "group_id": kafka_consumer_group, | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
}, | }, | ||||
) | ) | ||||
expected = r"Done.\n" | expected = r"Done.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
Show All 29 Lines | ): | ||||
# Setup log capture to fish the consumer settings out of the log messages | # Setup log capture to fish the consumer settings out of the log messages | ||||
caplog.set_level(logging.DEBUG, "swh.journal.client") | caplog.set_level(logging.DEBUG, "swh.journal.client") | ||||
result = invoke( | result = invoke( | ||||
"replay", | "replay", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
str(NUM_CONTENTS), | str(NUM_CONTENTS), | ||||
env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"}, | env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"}, | ||||
journal_config={ | journal_client={ | ||||
"cls": "kafka", | |||||
"brokers": kafka_server, | "brokers": kafka_server, | ||||
"group_id": kafka_consumer_group, | "group_id": kafka_consumer_group, | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
}, | }, | ||||
) | ) | ||||
expected = r"Done.\n" | expected = r"Done.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
Show All 40 Lines | with tempfile.NamedTemporaryFile(mode="w+b") as fd: | ||||
fd.seek(0) | fd.seek(0) | ||||
result = invoke( | result = invoke( | ||||
"replay", | "replay", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
str(NUM_CONTENTS), | str(NUM_CONTENTS), | ||||
"--exclude-sha1-file", | "--exclude-sha1-file", | ||||
fd.name, | fd.name, | ||||
journal_config={ | journal_client={ | ||||
"cls": "kafka", | |||||
"brokers": kafka_server, | "brokers": kafka_server, | ||||
"group_id": kafka_consumer_group, | "group_id": kafka_consumer_group, | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
}, | }, | ||||
) | ) | ||||
expected = r"Done.\n" | expected = r"Done.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | ): | ||||
caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | ||||
result = invoke( | result = invoke( | ||||
"replay", | "replay", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
str(NUM_CONTENTS), | str(NUM_CONTENTS), | ||||
"--check-dst" if check_dst else "--no-check-dst", | "--check-dst" if check_dst else "--no-check-dst", | ||||
journal_config={ | journal_client={ | ||||
"cls": "kafka", | |||||
"brokers": kafka_server, | "brokers": kafka_server, | ||||
"group_id": kafka_consumer_group, | "group_id": kafka_consumer_group, | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
}, | }, | ||||
) | ) | ||||
expected = r"Done.\n" | expected = r"Done.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
▲ Show 20 Lines • Show All 92 Lines • ▼ Show 20 Lines | ): | ||||
objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures) | objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures) | ||||
caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | ||||
result = invoke( | result = invoke( | ||||
"replay", | "replay", | ||||
"--check-dst", | "--check-dst", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
str(NUM_CONTENTS), | str(NUM_CONTENTS), | ||||
journal_config={ | journal_client={ | ||||
"cls": "kafka", | |||||
"brokers": kafka_server, | "brokers": kafka_server, | ||||
"group_id": kafka_consumer_group, | "group_id": kafka_consumer_group, | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
}, | |||||
replayer={ | |||||
"error_reporter": {"host": redis_proc.host, "port": redis_proc.port}, | "error_reporter": {"host": redis_proc.host, "port": redis_proc.port}, | ||||
}, | }, | ||||
) | ) | ||||
expected = r"Done.\n" | expected = r"Done.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
# check that exactly NUM_CONTENTS_DST 'in' operations have failed once | # check that exactly NUM_CONTENTS_DST 'in' operations have failed once | ||||
▲ Show 20 Lines • Show All 78 Lines • ▼ Show 20 Lines | ): | ||||
) | ) | ||||
caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | ||||
result = invoke( | result = invoke( | ||||
"replay", | "replay", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
str(NUM_CONTENTS), | str(NUM_CONTENTS), | ||||
journal_config={ | journal_client={ | ||||
"cls": "kafka", | |||||
"brokers": kafka_server, | "brokers": kafka_server, | ||||
"group_id": kafka_consumer_group, | "group_id": kafka_consumer_group, | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
}, | |||||
replayer={ | |||||
"error_reporter": {"host": redis_proc.host, "port": redis_proc.port}, | "error_reporter": {"host": redis_proc.host, "port": redis_proc.port}, | ||||
}, | }, | ||||
) | ) | ||||
expected = r"Done.\n" | expected = r"Done.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
# check the logs looks as expected | # check the logs looks as expected | ||||
▲ Show 20 Lines • Show All 64 Lines • ▼ Show 20 Lines | for i, sha1 in enumerate(contents): | ||||
contents_deleted.add(hash_to_hex(sha1)) | contents_deleted.add(hash_to_hex(sha1)) | ||||
caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | caplog.set_level(logging.DEBUG, "swh.objstorage.replayer.replay") | ||||
result = invoke( | result = invoke( | ||||
"replay", | "replay", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
str(NUM_CONTENTS), | str(NUM_CONTENTS), | ||||
journal_config={ | journal_client={ | ||||
"cls": "kafka", | |||||
"brokers": kafka_server, | "brokers": kafka_server, | ||||
"group_id": kafka_consumer_group, | "group_id": kafka_consumer_group, | ||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||
}, | }, | ||||
) | ) | ||||
expected = r"Done.\n" | expected = r"Done.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
Show All 27 Lines |
the conditional isn't needed