...
The KafkaChannel
and ExpiringCredentialRefreshingLogin
classes can easily retrieve the ClientChannelCredentialTracker
instance and tell it when various events of interest occur (a channel is initially authenticated, a credential is refreshed, a channel is disconnected). The events are enqueued as they are reported, and a background thread processes them to decide which KafkaChannel
instances need to re-authenticate. There is a method KafkaChannel.initiateReauthentication(Time)
that instantiates an instance of SaslClientAuthenticator
and tells it to re-authenticate via the method SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver)
. The implementation of AuthenticationSuccessOrFailureReceiver
that is sent takes care of notifying the ClientChannelCredentialTracker
instance; it looks something like this:
Code Block | ||||
---|---|---|---|---|
| ||||
new AuthenticationSuccessOrFailureReceiver() { @Override public void reauthenticationFailed(RetryIndication retry, String errorMessage) { log.warn(errorMessage); Utils.closeQuietly(notYetAuthenticatedAuthenticator, "notYetAuthenticatedAuthenticator"); notYetAuthenticatedAuthenticator = null; clientChannelCredentialTracker().offer(ClientChannelCredentialEvent .channelFailedReauthentication(time, KafkaChannel.this, errorMessage, retry)); } @Override public void reauthenticationSucceeded() { clientChannelCredentialTracker().offer(ClientChannelCredentialEvent .channelReauthenticated(time, KafkaChannel.this, notYetAuthenticatedSaslClientAuthenticator .expiringCredential()); replaceAnyPreviouslyAuthenticatedAuthenticatorWithNewOne(); return; } } |
...