Changeset View
Changeset View
Standalone View
Standalone View
swh/counters/tests/test_cli.py
Show All 38 Lines | if not catch_exceptions and result.exception: | ||||
print(result.output) | print(result.output) | ||||
raise result.exception | raise result.exception | ||||
return result | return result | ||||
def test__journal_client__worker_function_invoked( | def test__journal_client__worker_function_invoked( | ||||
mocker, kafka_server, kafka_prefix, journal_config, local_redis_host | mocker, kafka_server, kafka_prefix, journal_config, local_redis_host | ||||
): | ): | ||||
mock = mocker.patch("swh.counters.journal_client.process_journal_messages_by_keys") | mock = mocker.patch("swh.counters.journal_client.process_journal_messages") | ||||
producer = Producer( | producer = Producer( | ||||
{ | { | ||||
"bootstrap.servers": kafka_server, | "bootstrap.servers": kafka_server, | ||||
"client.id": "test-producer", | "client.id": "test-producer", | ||||
"acks": "all", | "acks": "all", | ||||
} | } | ||||
) | ) | ||||
topic = f"{kafka_prefix}.content" | topic = f"{kafka_prefix}.content" | ||||
value = value_to_kafka({"key": "value"}) | value = value_to_kafka({"key": "value"}) | ||||
producer.produce(topic=topic, key=b"message1", value=value) | producer.produce(topic=topic, key=b"message1", value=value) | ||||
invoke( | invoke( | ||||
False, | False, | ||||
# Missing --object-types (and no config key) will make the cli raise | # Missing --object-types (and no config key) will make the cli raise | ||||
[ | ["journal-client", "--stop-after-objects", "1", "--object-type", "content",], | ||||
"journal-client", | |||||
"--stop-after-objects", | |||||
"1", | |||||
"--object-type", | |||||
"content", | |||||
"keys", | |||||
], | |||||
journal_config, | journal_config, | ||||
redis_host=local_redis_host, | redis_host=local_redis_host, | ||||
) | ) | ||||
assert mock.call_count == 1 | assert mock.call_count == 1 | ||||
def test__journal_client__missing_main_journal_config_key(local_redis_host): | def test__journal_client__missing_main_journal_config_key(local_redis_host): | ||||
"""Missing configuration on journal should raise""" | """Missing configuration on journal should raise""" | ||||
with pytest.raises(KeyError, match="journal"): | with pytest.raises(KeyError, match="journal"): | ||||
invoke( | invoke( | ||||
catch_exceptions=False, | catch_exceptions=False, | ||||
args=["journal-client", "--stop-after-objects", "1", "messages"], | args=["journal-client", "--stop-after-objects", "1"], | ||||
config="", # missing config will make it raise | config="", # missing config will make it raise | ||||
redis_host=local_redis_host, | redis_host=local_redis_host, | ||||
) | ) | ||||
def test__journal_client__missing_journal_config_keys(local_redis_host): | def test__journal_client__missing_journal_config_keys(local_redis_host): | ||||
"""Missing configuration on mandatory journal keys should raise""" | """Missing configuration on mandatory journal keys should raise""" | ||||
kafka_prefix = "swh.journal.objects" | kafka_prefix = "swh.journal.objects" | ||||
Show All 15 Lines | for key in journal_config["journal"].keys(): | ||||
args=[ | args=[ | ||||
"journal-client", | "journal-client", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
"1", | "1", | ||||
"--prefix", | "--prefix", | ||||
kafka_prefix, | kafka_prefix, | ||||
"--object-type", | "--object-type", | ||||
"content", | "content", | ||||
"keys", | |||||
], | ], | ||||
config=yaml_cfg, # incomplete config will make the cli raise | config=yaml_cfg, # incomplete config will make the cli raise | ||||
redis_host=local_redis_host, | redis_host=local_redis_host, | ||||
) | ) | ||||
def test__journal_client__missing_prefix_config_key(kafka_server, local_redis_host): | def test__journal_client__missing_prefix_config_key(kafka_server, local_redis_host): | ||||
"""Missing configuration on mandatory prefix key should raise""" | """Missing configuration on mandatory prefix key should raise""" | ||||
Show All 14 Lines | with pytest.raises(ValueError, match="prefix"): | ||||
False, | False, | ||||
# Missing --prefix (and no config key) will make the cli raise | # Missing --prefix (and no config key) will make the cli raise | ||||
[ | [ | ||||
"journal-client", | "journal-client", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
"1", | "1", | ||||
"--object-type", | "--object-type", | ||||
"content", | "content", | ||||
"messages", | |||||
], | ], | ||||
journal_cfg, | journal_cfg, | ||||
redis_host=local_redis_host, | redis_host=local_redis_host, | ||||
) | ) | ||||
def test__journal_client__missing_object_types_config_key( | def test__journal_client__missing_object_types_config_key( | ||||
kafka_server, local_redis_host | kafka_server, local_redis_host | ||||
): | ): | ||||
"""Missing configuration on mandatory object-types key should raise""" | """Missing configuration on mandatory object-types key should raise""" | ||||
journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( | journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( | ||||
broker=kafka_server, prefix="swh.journal.objects", group_id="test-consumer" | broker=kafka_server, prefix="swh.journal.objects", group_id="test-consumer" | ||||
) | ) | ||||
with pytest.raises(ValueError, match="object_types"): | with pytest.raises(ValueError, match="object_types"): | ||||
invoke( | invoke( | ||||
False, | False, | ||||
# Missing --object-types (and no config key) will make the cli raise | # Missing --object-types (and no config key) will make the cli raise | ||||
["journal-client", "--stop-after-objects", "1", "keys"], | ["journal-client", "--stop-after-objects", "1"], | ||||
journal_cfg, | journal_cfg, | ||||
redis_host=local_redis_host, | redis_host=local_redis_host, | ||||
) | ) | ||||
@pytest.mark.parametrize( | def test__journal_client__key_received(mocker, kafka_server, local_redis_host): | ||||
"message_handling, worker_fn", | mock = mocker.patch("swh.counters.journal_client.process_journal_messages") | ||||
[ | mock.return_value = 1 | ||||
("keys", "swh.counters.journal_client.process_journal_messages_by_keys"), | |||||
("messages", "swh.counters.journal_client.process_journal_messages"), | |||||
], | |||||
) | |||||
def test__journal_client__key_received( | |||||
mocker, kafka_server, local_redis_host, message_handling, worker_fn | |||||
): | |||||
mock = mocker.patch(worker_fn) | |||||
prefix = "swh.journal.objects" | prefix = "swh.journal.objects" | ||||
object_type = "content" | object_type = "content" | ||||
topic = prefix + "." + object_type | topic = prefix + "." + object_type | ||||
producer = Producer( | producer = Producer( | ||||
{"bootstrap.servers": kafka_server, "client.id": "testproducer", "acks": "all",} | {"bootstrap.servers": kafka_server, "client.id": "testproducer", "acks": "all",} | ||||
) | ) | ||||
Show All 15 Lines | result = invoke( | ||||
[ | [ | ||||
"journal-client", | "journal-client", | ||||
"--stop-after-objects", | "--stop-after-objects", | ||||
"1", | "1", | ||||
"--object-type", | "--object-type", | ||||
object_type, | object_type, | ||||
"--prefix", | "--prefix", | ||||
prefix, | prefix, | ||||
message_handling, | |||||
], | ], | ||||
journal_cfg, | journal_cfg, | ||||
redis_host=local_redis_host, | redis_host=local_redis_host, | ||||
) | ) | ||||
# Check the output | # Check the output | ||||
expected_output = "Processed 1 messages.\nDone.\n" | expected_output = "Processed 1 messages.\nDone.\n" | ||||
assert result.exit_code == 0, result.output | assert result.exit_code == 0, result.output | ||||
assert result.output == expected_output | assert result.output == expected_output | ||||
assert mock.called | assert mock.called | ||||
assert mock.call_args[0][0]["content"] | assert mock.call_args[0][0]["content"] | ||||
assert len(mock.call_args[0][0]) == 1 | assert len(mock.call_args[0][0]) == 1 | ||||
assert object_type in mock.call_args[0][0].keys() | assert object_type in mock.call_args[0][0].keys() | ||||
def test__journal_client__no_journal_type_argument_should_raise( | |||||
kafka_server, local_redis_host | |||||
): | |||||
journal_cfg = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( | |||||
broker=kafka_server, prefix="prefix", group_id="test-consumer" | |||||
) | |||||
with pytest.raises(SystemExit): | |||||
invoke( | |||||
False, | |||||
[ | |||||
"journal-client", | |||||
"--stop-after-objects", | |||||
"1", | |||||
"--object-type", | |||||
"object_type", | |||||
"--prefix", | |||||
"prefix", | |||||
], | |||||
journal_cfg, | |||||
redis_host=local_redis_host, | |||||
) |