Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Aims to add a corresponding validator to a configuration where a validator is missing.
so that the program finds these incorrect configurations during initialization.

Public Interfaces

...

Public classes that involve changes

  1. ConfigDef
    Add a new validator NonNullAndEmptyString  to the ConfigDef class.

  2. SslClientAuth
    SslClientAuth provides a static method names()  to get a list of all enumeration constant lowercase names.

  3. BrokerSecurityConfigs
    Provides a new validator SaslEnabledMechanismsValidator  for validating the sasl.enabled.mechanisms  parameter.

  4. CompressionType
    CompressionType provides a static method names()  to get a list of all enumeration constant lowercase names

...

  1. AdminClientConfig

    configvalidator
    security.protocolin(SecurityProtocol.names().toArray(newString[0]))



  2. ConsumerConfig

    configvalidator
    group.instance.id 
    newConfigDef.NonEmptyString()
    key.deserializer
    newConfigDef.NonNullValidator()
    value.deserializer
    newConfigDef.NonNullValidator()
    security.protocolin(SecurityProtocol.names().toArray(newString[0]))



  3. ProducerConfig

    configvalidator
    key.serializer
    newConfigDef.NonNullValidator()
    value.serializer
    newConfigDef.NonNullValidator()
    security.protocolin(SecurityProtocol.names().toArray(newString[0]))
    compression.typein(CompressionType.names().toArray(newString[0]))



  4. SaslConfigs

    configvalidator
    sasl.mechanismnewConfigDef.NonNullAndEmptyString()



  5. MirrorClientConfig

    configvalidator
    security.protocolin(SecurityProtocol.names().toArray(newString[0]))



  6. MirrorConnectorConfig

    configvalidator
    security.protocolin(SecurityProtocol.names().toArray(newString[0]))



  7. MirrorMakerConfig

    configvalidator
    security.protocolin(SecurityProtocol.names().toArray(newString[0]))



  8. WorkerConfig

    configvalidator
    ssl.client.authin(SslClientAuth.names().toArray(newString[0]))



  9. DistributedConfig

    configvalidator
    security.protocolin(SecurityProtocol.names().toArray(newString[0]))



  10. KafkaConfig

    configvalidator
    sasl.mechanism.controller.protocolnewConfigDef.NonNullAndEmptyString()
    authorizer.class.namenew ConfigDef.NonNullValidator()
    security.inter.broker.protocolin(SecurityProtocol.names().toArray(new Array[String](0)):_*)
    sasl.mechanism.inter.broker.protocol newConfigDef.NonNullAndEmptyString()
    sasl.enabled.mechanismsnewBrokerSecurityConfigs.SaslEnabledMechanismsValidator()
    compression.typein(BrokerCompressionCodec.brokerCompressionOptions:_*)



  11. StreamsConfig

    configvalidator
    security.protocolin(SecurityProtocol.names().toArray(newString[0]))



Proposed Changes

1.ConfigDef.NonNullAndEmptyString

...

For all the configurations mentioned above, if the value set by the user is legal, the user will not perceive any changes.
If the value set by the user is invalid, the corresponding validator will throw a ConfigException  when the ConfigDef.parse(Map<?, ?> props)  method is executed. The corresponding Kafka program will fail to initialize.

...

I had hoped to add another validator for the following configuration: ValidList.in("GSSAPI", "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512", "OAUTHBEARER") .

Code Block
sasl.mechanism
sasl.enabled.mechanisms
sasl.mechanism.controller.protocol
sasl.mechanism.inter.broker.protocol

...