objstorages = {'dst': <swh.objstorage.backends.in_memory.InMemoryObjStorage object at 0x7ff9df5a56d8>, 'src': <swh.objstorage.backends.in_memory.InMemoryObjStorage object at 0x7ff9df5a5780>}
kafka_prefix = 'wupzewqpqk', kafka_consumer_group = 'test-consumer-wupzewqpqk'
kafka_server = '127.0.0.1:38817'
caplog = <_pytest.logging.LogCaptureFixture object at 0x7ff9df27b400>
@_patch_objstorages(["src", "dst"])
def test_replay_content_static_group_id(
objstorages,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: Tuple[Popen, int],
caplog,
):
"""Check the content replayer in normal conditions
with KAFKA_GROUP_INSTANCE_ID set
"""
contents = _fill_objstorage_and_kafka(
kafka_server, kafka_prefix, objstorages["src"]
)
# Setup log capture to fish the consumer settings out of the log messages
caplog.set_level(logging.DEBUG, "swh.journal.client")
result = invoke(
"replay",
"--stop-after-objects",
str(NUM_CONTENTS),
env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"},
journal_client={
"cls": "kafka",
"brokers": kafka_server,
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
consumer_settings: Optional[Dict[str, Any]] = None
for record in caplog.records:
print(record.message, record.args)
if "Consumer settings" in record.message:
consumer_settings = {}
elif consumer_settings is not None and len(record.args) == 2:
key, value = record.args
consumer_settings[key] = value
assert consumer_settings is not None, (
"Failed to get consumer settings out of the consumer log. "
"See log capture for details."
)
print(consumer_settings)
> assert consumer_settings["group.instance.id"] == "static-group-instance-id"
E KeyError: 'group.instance.id'
.tox/py3/lib/python3.7/site-packages/swh/objstorage/replayer/tests/test_cli.py:263: KeyError
TEST RESULT
TEST RESULT
- Run At
- Oct 27 2022, 4:31 PM