Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleAuthenticationSuccessOrFailureReceiver
new AuthenticationSuccessOrFailureReceiver() {
    @Override
    public void reauthenticationFailed(RetryIndication retry, String errorMessage) {
        log.warn(errorMessage);
        Utils.closeQuietly(notYetAuthenticatedAuthenticator, "notYetAuthenticatedAuthenticator");
        notYetAuthenticatedAuthenticator = null;
        clientChannelCredentialTracker.offer(ClientChannelCredentialEvent
                .channelFailedReauthentication(time, KafkaChannel.this, errorMessage, retry));
    }

    @Override
    public void reauthenticationSucceeded() {
        clientChannelCredentialTracker.offer(ClientChannelCredentialEvent
                .channelReauthenticated(time, KafkaChannel.this, notYetAuthenticatedSaslClientAuthenticator
                    .expiringCredential());
        replaceAnyPreviouslyAuthenticatedAuthenticatorWithNewOne();
        return;
    }
}

When the method SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver) is invoked it is up to the SaslClientAuthenticator instance to use its AuthenticationRequestEnqueuer instance to enqueue an ApiVersionsRequest along with an appropriate callback that either fails the re-authentication if the ApiVersionsRequest fails or continues it with a SaslHandshakeRequest.  Here is what the code looks like:

Code Block
languagejava
titleSaslClientAuthenticator.initiateReauthentication()
    public void initiateReauthentication(Time time,
            AuthenticationSuccessOrFailureReceiver authenticationSuccessOrFailureReceiver) {
        AuthenticationRequestEnqueuer authenticationRequestEnqueuer = authenticationRequestEnqueuer();
        if (saslState != SaslState.SEND_APIVERSIONS_REQUEST) {
            // it is possible that the connection was closed; don't retry
            if (saslState != SaslState.CLOSED)
                saslState = SaslState.FAILED;
            authenticationSuccessOrFailureReceiver.reauthenticationFailed(RetryIndication.DO_NOT_RETRY, String.format(
                    "Re-authentication initiated but authenticator initial state is incorrect (expected %s): %s",
                    SaslState.SEND_APIVERSIONS_REQUEST, saslState));
            return;
        }
        authenticationRequestEnqueuer.enqueueRequest(this.node, new ApiVersionsRequest.Builder((short) 0),
                authenticationRequestCompletionHandlerForApiVersionsRequest(time,
                        authenticationSuccessOrFailureReceiver));
    }

And of course the method authenticationRequestCompletionHandlerForApiVersionsRequest(Time, AuthenticationSuccessOrFailureReceiver) looks similar.

Finally, we add methods on SaslServerAuthenticator to respond to the requests that arrive related to re-authentication – specifically, respondToReauthenticationSaslHandshakeRequest() and respondToReauthenticationSaslAuthenticateRequest() – and add code to kafka.server.KafkaApis to route the received requests accordingly (as opposed to responding with an error, which is what currently happens).  I won't go into the details here; see the PR for the code.

Compatibility, Deprecation, and Migration Plan

...