...
These are the configurations that Kafka passes onto Zookeeper clients:
{code: java}
def
...
zkClientConfigFromKafkaConfig(config:
...
KafkaConfig,
...
forceZkSslClientEnable:
...
Boolean
...
=
...
false):
...
ZKClientConfig
...
=
...
{
...
val
...
clientConfig
...
=
...
new
...
ZKClientConfig
...
if
...
(config.zkSslClientEnable
...
||
...
forceZkSslClientEnable)
...
{
...
KafkaConfig.setZooKeeperClientProperty(clientConfig,
...
KafkaConfig.ZkSslClientEnableProp,
...
"true")
...
config.zkClientCnxnSocketClassName.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
...
KafkaConfig.ZkClientCnxnSocketProp,
...
_))
...
config.zkSslKeyStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
...
KafkaConfig.ZkSslKeyStoreLocationProp,
...
_))
...
config.zkSslKeyStorePassword.foreach(x
...
=>
...
KafkaConfig.setZooKeeperClientProperty(clientConfig,
...
KafkaConfig.ZkSslKeyStorePasswordProp,
...
x.value))
...
config.zkSslKeyStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
...
KafkaConfig.ZkSslKeyStoreTypeProp,
...
_))
...
config.zkSslTrustStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
...
KafkaConfig.ZkSslTrustStoreLocationProp,
...
_))
...
config.zkSslTrustStorePassword.foreach(x
...
=>
...
KafkaConfig.setZooKeeperClientProperty(clientConfig,
...
KafkaConfig.ZkSslTrustStorePasswordProp,
...
x.value))
...
config.zkSslTrustStoreType.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
...
KafkaConfig.ZkSslTrustStoreTypeProp,
...
_))
...
KafkaConfig.setZooKeeperClientProperty(clientConfig,
...
KafkaConfig.ZkSslProtocolProp,
...
config.ZkSslProtocol)
...
config.ZkSslEnabledProtocols.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
...
KafkaConfig.ZkSslEnabledProtocolsProp,
...
_))
...
config.ZkSslCipherSuites.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig,
...
KafkaConfig.ZkSslCipherSuitesProp,
...
_))
...
KafkaConfig.setZooKeeperClientProperty(clientConfig,
...
KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp,
...
config.ZkSslEndpointIdentificationAlgorithm)
...
KafkaConfig.setZooKeeperClientProperty(clientConfig,
...
KafkaConfig.ZkSslCrlEnableProp,
...
config.ZkSslCrlEnable.toString)
...
KafkaConfig.setZooKeeperClientProperty(clientConfig,
...
KafkaConfig.ZkSslOcspEnableProp,
...
config.ZkSslOcspEnable.toString)
...
}
...
//
...
The
...
zk
...
sasl
...
is
...
enabled
...
by
...
default
...
so
...
it
...
can
...
produce
...
false
...
error
...
when
...
broker
...
does
...
not
...
intend
...
to
...
use
...
SASL.
...
if
...
(!JaasUtils.isZkSaslEnabled)
...
clientConfig.setProperty(JaasUtils.ZK_SASL_CLIENT,
...
"false")
...
clientConfig
}
{code}
Below is a list of changes to behaviours which Kafka uses to communicate with Zookeeper:
...