Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_cli.py
Show First 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | storage_config = { | ||||
] | ] | ||||
} | } | ||||
storage = get_storage(**storage_config) | storage = get_storage(**storage_config) | ||||
with patch('swh.journal.cli.get_storage') as get_storage_mock: | with patch('swh.journal.cli.get_storage') as get_storage_mock: | ||||
get_storage_mock.return_value = storage | get_storage_mock.return_value = storage | ||||
yield storage | yield storage | ||||
def invoke(catch_exceptions, args): | def invoke(catch_exceptions, args, env=None): | ||||
runner = CliRunner() | runner = CliRunner() | ||||
with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: | with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: | ||||
config_fd.write(CLI_CONFIG) | config_fd.write(CLI_CONFIG) | ||||
config_fd.seek(0) | config_fd.seek(0) | ||||
args = ['-C' + config_fd.name] + args | args = ['-C' + config_fd.name] + args | ||||
result = runner.invoke(cli, args, obj={'log_level': logging.DEBUG}) | result = runner.invoke( | ||||
cli, args, obj={'log_level': logging.DEBUG}, env=env, | |||||
) | |||||
if not catch_exceptions and result.exception: | if not catch_exceptions and result.exception: | ||||
print(result.output) | print(result.output) | ||||
raise result.exception | raise result.exception | ||||
return result | return result | ||||
def test_replay( | def test_replay( | ||||
storage, | storage, | ||||
▲ Show 20 Lines • Show All 107 Lines • ▼ Show 20 Lines | def test_replay_content( | ||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | ||||
for (sha1, content) in contents.items(): | for (sha1, content) in contents.items(): | ||||
assert sha1 in objstorages['dst'], sha1 | assert sha1 in objstorages['dst'], sha1 | ||||
assert objstorages['dst'].get(sha1) == content | assert objstorages['dst'].get(sha1) == content | ||||
@_patch_objstorages(['src', 'dst']) | @_patch_objstorages(['src', 'dst']) | ||||
def test_replay_content_static_group_id( | |||||
objstorages, | |||||
storage, | |||||
kafka_prefix: str, | |||||
kafka_server: Tuple[Popen, int], | |||||
caplog): | |||||
(_, kafka_port) = kafka_server | |||||
kafka_prefix += '.swh.journal.objects' | |||||
contents = _fill_objstorage_and_kafka( | |||||
kafka_port, kafka_prefix, objstorages) | |||||
# Setup log capture to fish the consumer settings out of the log messages | |||||
caplog.set_level(logging.DEBUG, 'swh.journal.client') | |||||
result = invoke(False, [ | |||||
'content-replay', | |||||
'--broker', '127.0.0.1:%d' % kafka_port, | |||||
'--group-id', 'test-cli-consumer', | |||||
'--prefix', kafka_prefix, | |||||
'--max-messages', '10', | |||||
], {'KAFKA_GROUP_INSTANCE_ID': 'static-group-instance-id'}) | |||||
expected = r'Done.\n' | |||||
assert result.exit_code == 0, result.output | |||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | |||||
consumer_settings = None | |||||
for record in caplog.records: | |||||
if 'Consumer settings' in record.message: | |||||
consumer_settings = record.args | |||||
break | |||||
assert consumer_settings is not None, ( | |||||
'Failed to get consumer settings out of the consumer log. ' | |||||
'See log capture for details.' | |||||
) | |||||
assert consumer_settings['group.instance.id'] == 'static-group-instance-id' | |||||
assert consumer_settings['session.timeout.ms'] == 60 * 10 * 1000 | |||||
assert consumer_settings['max.poll.interval.ms'] == 90 * 10 * 1000 | |||||
for (sha1, content) in contents.items(): | |||||
assert sha1 in objstorages['dst'], sha1 | |||||
assert objstorages['dst'].get(sha1) == content | |||||
@_patch_objstorages(['src', 'dst']) | |||||
def test_replay_content_exclude( | def test_replay_content_exclude( | ||||
objstorages, | objstorages, | ||||
storage, | storage, | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_server: Tuple[Popen, int]): | kafka_server: Tuple[Popen, int]): | ||||
(_, kafka_port) = kafka_server | (_, kafka_port) = kafka_server | ||||
kafka_prefix += '.swh.journal.objects' | kafka_prefix += '.swh.journal.objects' | ||||
Show All 27 Lines |