...
Code Block | ||||
---|---|---|---|---|
| ||||
new AuthenticationSuccessOrFailureReceiver() { @Override public void reauthenticationFailed(RetryIndication retry, String errorMessage) { log.warn(errorMessage); Utils.closeQuietly(notYetAuthenticatedAuthenticator, "notYetAuthenticatedAuthenticator"); notYetAuthenticatedAuthenticator = null; clientChannelCredentialTracker.offer(ClientChannelCredentialEvent .channelFailedReauthentication(time, KafkaChannel.this, errorMessage, retry)); } @Override public void reauthenticationSucceeded() { clientChannelCredentialTracker.offer(ClientChannelCredentialEvent .channelReauthenticated(time, KafkaChannel.this, notYetAuthenticatedSaslClientAuthenticator .expiringCredential()); replaceAnyPreviouslyAuthenticatedAuthenticatorWithNewOne(); return; } } |
When the method SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver)
is invoked it is up to the SaslClientAuthenticator
instance to use its AuthenticationRequestEnqueuer
instance to enqueue an ApiVersionsRequest
along with an appropriate callback that either fails the re-authentication if the ApiVersionsRequest
fails or continues it with a SaslHandshakeRequest
. Here is what the code looks like:
Code Block | ||||
---|---|---|---|---|
| ||||
public void initiateReauthentication(Time time,
AuthenticationSuccessOrFailureReceiver authenticationSuccessOrFailureReceiver) {
AuthenticationRequestEnqueuer authenticationRequestEnqueuer = authenticationRequestEnqueuer();
if (saslState != SaslState.SEND_APIVERSIONS_REQUEST) {
// it is possible that the connection was closed; don't retry
if (saslState != SaslState.CLOSED)
saslState = SaslState.FAILED;
authenticationSuccessOrFailureReceiver.reauthenticationFailed(RetryIndication.DO_NOT_RETRY, String.format(
"Re-authentication initiated but authenticator initial state is incorrect (expected %s): %s",
SaslState.SEND_APIVERSIONS_REQUEST, saslState));
return;
}
authenticationRequestEnqueuer.enqueueRequest(this.node, new ApiVersionsRequest.Builder((short) 0),
authenticationRequestCompletionHandlerForApiVersionsRequest(time,
authenticationSuccessOrFailureReceiver));
} |
And of course the method authenticationRequestCompletionHandlerForApiVersionsRequest(Time, AuthenticationSuccessOrFailureReceiver)
looks similar.
Finally, we add methods on SaslServerAuthenticator
to respond to the requests that arrive related to re-authentication – specifically, respondToReauthenticationSaslHandshakeRequest()
and respondToReauthenticationSaslAuthenticateRequest()
– and add code to kafka.server.KafkaApis
to route the received requests accordingly (as opposed to responding with an error, which is what currently happens). I won't go into the details here; see the PR for the code.
Compatibility, Deprecation, and Migration Plan
...