...
The KafkaChannel
and ExpiringCredentialRefreshingLogin
classes can easily retrieve the ClientChannelCredentialTracker
instance and tell it when various events of interest occur (a channel is initially authenticated, a credential is refreshed, a channel is closeddisconnected). The events are enqueued as they are reported, and a background thread processes them to decide which KafkaChannel
instances need to re-authenticate. There is a method KafkaChannel.initiateReauthentication(Time)
that instantiates an instance of SaslClientAuthenticator
and tells it to re-authenticate via the method SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver)
. The implementation of AuthenticationSuccessOrFailureReceiver
that is sent looks something like this:
Code Block | ||||
---|---|---|---|---|
| ||||
new AuthenticationSuccessOrFailureReceiver() {
@Override
public void reauthenticationFailed(RetryIndication retry, String errorMessage) {
log.warn(errorMessage);
Utils.closeQuietly(notYetAuthenticatedAuthenticator, "notYetAuthenticatedAuthenticator");
notYetAuthenticatedAuthenticator = null;
clientChannelCredentialTracker.offer(ClientChannelCredentialEvent
.channelFailedReauthentication(time, KafkaChannel.this, errorMessage, retry));
}
@Override
public void reauthenticationSucceeded() {
clientChannelCredentialTracker.offer(ClientChannelCredentialEvent
.channelReauthenticated(time, KafkaChannel.this, notYetAuthenticatedSaslClientAuthenticator
.expiringCredential());
replaceAnyPreviouslyAuthenticatedAuthenticatorWithNewOne();
return;
}
} |
Compatibility, Deprecation, and Migration Plan
...