Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_client.py
Show First 20 Lines • Show All 235 Lines • ▼ Show 20 Lines | ): | ||||
with pytest.raises(ValueError): | with pytest.raises(ValueError): | ||||
JournalClient( | JournalClient( | ||||
brokers=[kafka_server_base], | brokers=[kafka_server_base], | ||||
group_id="whatever", | group_id="whatever", | ||||
prefix="wrong.prefix", | prefix="wrong.prefix", | ||||
stop_after_objects=1, | stop_after_objects=1, | ||||
object_types=["else"], | object_types=["else"], | ||||
) | ) | ||||
def test_client_subscriptions_with_anonymized_topics( | |||||
kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str | |||||
): | |||||
ardumont: Maybe mentions that this tests only the client subscription to anonymized topics (and… | |||||
Done Inline ActionsWill do (not sure yet which fix, either rename the test or consume all the topics). douardda: Will do (not sure yet which fix, either rename the test or consume all the topics). | |||||
producer = Producer( | |||||
{ | |||||
"bootstrap.servers": kafka_server_base, | |||||
"client.id": "test producer", | |||||
"acks": "all", | |||||
} | |||||
) | |||||
# Fill Kafka with revision object on both the regular prefix (normally for | |||||
# anonymized objects in this case) and privileged one | |||||
producer.produce( | |||||
topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), | |||||
) | |||||
producer.produce( | |||||
topic=kafka_prefix + "_privileged.revision", | |||||
key=REV["id"], | |||||
value=value_to_kafka(REV), | |||||
) | |||||
producer.flush() | |||||
# without privileged "channels" activated on the client side | |||||
client = JournalClient( | |||||
brokers=[kafka_server_base], | |||||
group_id=kafka_consumer_group, | |||||
prefix=kafka_prefix, | |||||
stop_after_objects=1, | |||||
privileged=False, | |||||
) | |||||
# we only subscribed to "standard" topics | |||||
assert client.subscription == [kafka_prefix + ".revision"] | |||||
# with privileged "channels" activated on the client side | |||||
client = JournalClient( | |||||
brokers=[kafka_server_base], | |||||
group_id=kafka_consumer_group, | |||||
prefix=kafka_prefix, | |||||
stop_after_objects=1, | |||||
privileged=True, | |||||
) | |||||
# we only subscribed to "privileged" topics | |||||
assert client.subscription == [kafka_prefix + "_privileged.revision"] | |||||
def test_client_subscriptions_without_anonymized_topics( | |||||
kafka_prefix: str, kafka_consumer_group: str, kafka_server_base: str | |||||
): | |||||
producer = Producer( | |||||
{ | |||||
"bootstrap.servers": kafka_server_base, | |||||
"client.id": "test producer", | |||||
"acks": "all", | |||||
} | |||||
) | |||||
# Fill Kafka with revision objects only on the standard prefix | |||||
producer.produce( | |||||
topic=kafka_prefix + ".revision", key=REV["id"], value=value_to_kafka(REV), | |||||
) | |||||
producer.flush() | |||||
# without privileged channel activated on the client side | |||||
client = JournalClient( | |||||
brokers=[kafka_server_base], | |||||
group_id=kafka_consumer_group, | |||||
prefix=kafka_prefix, | |||||
stop_after_objects=1, | |||||
privileged=False, | |||||
) | |||||
# we only subscribed to the standard prefix | |||||
assert client.subscription == [kafka_prefix + ".revision"] | |||||
# with privileged channel activated on the client side | |||||
client = JournalClient( | |||||
brokers=[kafka_server_base], | |||||
group_id=kafka_consumer_group, | |||||
prefix=kafka_prefix, | |||||
stop_after_objects=1, | |||||
privileged=True, | |||||
) | |||||
Not Done Inline Actionscurious me, what's expected to happen if we require priviledged=True to something other than revision and release? Shouldn't it be tested as well? ardumont: curious me, what's expected to happen if we require priviledged=True to something other than… | |||||
Done Inline ActionsThe behavior of the client (which should be clear according to commit messages and docstrings, but...) is that if this config flag is set, it will subscribe to existing privileged topics rather than regular ones (if both privileged and regular exists in the advertised topics list), but will apply this logic for object_types one by one. So for other object types than anonymizable ones, it will subscribe to regular topics. douardda: The behavior of the client (which should be clear according to commit messages and docstrings… | |||||
# we also only subscribed to the standard prefix, since there is no priviled prefix | |||||
# on the kafka broker | |||||
assert client.subscription == [kafka_prefix + ".revision"] |
Maybe mentions that this tests only the client subscription to anonymized topics (and respectively priviledged topic for the next one).
maybe make that loop over the priviledged set of object types (revision, release)?