...
Aims to add a corresponding validator to a configuration where a validator is missing.
so that the program finds these incorrect configurations during initialization.
Public Interfaces
...
Public classes that involve changes
- ConfigDef
Add a new validatorNonNullAndEmptyString
to the ConfigDef class. - SslClientAuth
SslClientAuth provides a static methodnames()
to get a list of all enumeration constant lowercase names. - BrokerSecurityConfigs
Provides a new validatorSaslEnabledMechanismsValidator
for validating thesasl.enabled.mechanisms
parameter. - CompressionType
CompressionType provides a static methodnames()
to get a list of all enumeration constant lowercase names
...
AdminClientConfig
config validator security.protocol in(SecurityProtocol.names().toArray(newString[0])) ConsumerConfig
config validator group.instance.id
newConfigDef.NonEmptyString() key.deserializer
newConfigDef.NonNullValidator() value.deserializer
newConfigDef.NonNullValidator() security.protocol in(SecurityProtocol.names().toArray(newString[0])) ProducerConfig
config validator key.serializer
newConfigDef.NonNullValidator() value.serializer
newConfigDef.NonNullValidator() security.protocol in(SecurityProtocol.names().toArray(newString[0])) compression.type in(CompressionType.names().toArray(newString[0])) SaslConfigs
config validator sasl.mechanism newConfigDef.NonNullAndEmptyString() MirrorClientConfig
config validator security.protocol in(SecurityProtocol.names().toArray(newString[0])) MirrorConnectorConfig
config validator security.protocol in(SecurityProtocol.names().toArray(newString[0])) MirrorMakerConfig
config validator security.protocol in(SecurityProtocol.names().toArray(newString[0])) WorkerConfig
config validator ssl.client.auth in(SslClientAuth.names().toArray(newString[0])) DistributedConfig
config validator security.protocol in(SecurityProtocol.names().toArray(newString[0])) KafkaConfig
config validator sasl.mechanism.controller.protocol newConfigDef.NonNullAndEmptyString() authorizer.class.name new ConfigDef.NonNullValidator() security.inter.broker.protocol in(SecurityProtocol.names().toArray(new Array[String](0)):_*) sasl.mechanism.inter.broker.protocol newConfigDef.NonNullAndEmptyString() sasl.enabled.mechanisms newBrokerSecurityConfigs.SaslEnabledMechanismsValidator() compression.type in(BrokerCompressionCodec.brokerCompressionOptions:_*) StreamsConfig
config validator security.protocol in(SecurityProtocol.names().toArray(newString[0]))
Proposed Changes
1.ConfigDef.NonNullAndEmptyString
...
For all the configurations mentioned above, if the value set by the user is legal, the user will not perceive any changes.
If the value set by the user is invalid, the corresponding validator will throw a ConfigException
when the ConfigDef.parse(Map<?, ?> props)
method is executed. The corresponding Kafka program will fail to initialize.
...
I had hoped to add another validator for the following configuration: ValidList.in("GSSAPI", "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512", "OAUTHBEARER")
.
Code Block |
---|
sasl.mechanism sasl.enabled.mechanisms sasl.mechanism.controller.protocol sasl.mechanism.inter.broker.protocol |
...