Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleConstructor Code Addition for Users of ConsumerNetworkClient
if (channelBuilder instanceof SaslChannelBuilder)
    ((SaslChannelBuilder) channelBuilder)
            .authenticationRequestEnqueuer(client.authenticationRequestEnqueuer());

Another example is kafka.controller.ControllerChannelManager.  Instances of this class maintain one request queue per broker with one background thread allocated for each broker to monitor the queue and send the requests that appear.  The requests are sent in a synchronous fashion with respect to the thread when a request appears in the queue (i.e. unlike ConsumerNetworkClient, which is an async use case, this is a synchronous NetworkClient use case).  It is necessary to invoke sendRequest() to add a request to the queue for a broker and have it sent.  The code in the addNewBroker() method therefore looks like this:

Code Block
languagescala
titleControllerChannelManager.addNewBroker() Code Addition
if (channelBuilder.isInstanceOf[SaslChannelBuilder])
  channelBuilder.asInstanceOf[SaslChannelBuilder].authenticationRequestEnqueuer(
    new AuthenticationRequestEnqueuer() {
      def enqueueRequest(authenticationRequest: AuthenticationRequest): Unit =
          sendRequest(authenticationRequest.nodeId(), authenticationRequest.requestBuilder().apiKey(),
              authenticationRequest.requestBuilder(), null, authenticationRequest.authenticationRequestCompletionHandler())
    })

So the general approach is to identify how to inject requests and then set the appropriate AuthenticationRequestEnqueuer implementation on the SaslChannelBuilder.  In addition to the above examples the PR also includes implementations for org.apache.kafka.clients.admin.KafkaAdminClient, org.apache.kafka.clients.producer.KafkaProducer, and kafka.coordinator.transaction.TransactionMarkerChannelManager.

...

  • kafka.admin.AdminClient (this class is deprecated)
  • kafka.tools.ReplicaVerificationTool (a low-priority an edge case, ; no need to support the option here)
  • kafka.server.KafkaServer (a NetworkClient instance is only instantiated for shutdown and is therefore not long-lived enough to require re-authentication)

  • org.apache.kafka.trogdor.workload.ConnectionStressWorker (this class simply connects and doesn’t do anything else with the connection)

The PR does not yet have unit or integration tests – I will add these if/when the general implementation approach is verified by the community as being appropriate.  I have stepped through the code in the debugger, and I can see from this exercise as well as and from the emitted log messages that it the implementation does in fact work as currently implemented.

The final issue to describe is how/when a KafkaChannel instance (each of which corresponds to a unique network connection) is told to re-authenticate.

We define the class org.apache.kafka.common.security.expiring.internals.ClientChannelCredentialTracker to keep track of the various KafkaChannel instances and the ExpiringCredential instances that they authenticated with.  When the feature is enabled we add The following code in SaslChannelBuilder adds an instance of this class to ClientChannelCredentialTracker to the private credentials of the Subject associated with the SASL mechanism using this code in SaslChannelBuilderwhen the feature is enabled:

Code Block
languagejava
titleSaslChannelBuilder Code to Enable the Feature
LoginManager loginManager = LoginManager.acquireLoginManager(entry.getValue(), mechanism, defaultLoginClass, configs);
loginManagers.put(mechanism, loginManager);
Subject subject = loginManager.subject();
if (mode == Mode.CLIENT) {
    if (saslLoginRefreshReauthenticateEnable()) {
        log.info("SASL Login Refresh Re-Authenticate ENABLED");
        if (subject.getPrivateCredentials(ClientChannelCredentialTracker.class).isEmpty())
            subject.getPrivateCredentials().add(new ClientChannelCredentialTracker());
    } else
        log.info("SASL Login Refresh Re-Authenticate DISABLED");
}

The KafkaChannel and ExpiringCredentialRefreshingLogin classes can easily retrieve the ClientChannelCredentialTracker instance and tell it when various events of interest occur (a channel is initially authenticated, a credential is refreshed, a channel is disconnected).  The events are enqueued as they are reported, and a background thread processes them to decide which KafkaChannel instances need to re-authenticate.  There is a method KafkaChannel.initiateReauthentication(Time) that instantiates an instance of SaslClientAuthenticator and tells it to re-authenticate via the method SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver).  The implementation of AuthenticationSuccessOrFailureReceiver that is sent takes care of notifying the ClientChannelCredentialTracker instance; cleaning up the state in KafkaChannel related to the multi-step re-authentication process and notifying the ClientChannelCredentialTracker instance; it looks something like this:

Code Block
languagejava
titleAuthenticationSuccessOrFailureReceiver
new AuthenticationSuccessOrFailureReceiver() {
    @Override
    public void reauthenticationFailed(RetryIndication retry, String errorMessage) {
        log.warn(errorMessage);
        Utils.closeQuietly(notYetAuthenticatedAuthenticator, "notYetAuthenticatedAuthenticator");
        notYetAuthenticatedAuthenticator = null;
        clientChannelCredentialTracker().offer(ClientChannelCredentialEvent
                .channelFailedReauthentication(time, KafkaChannel.this, errorMessage, retry));
    }

    @Override
    public void reauthenticationSucceeded() {
        clientChannelCredentialTracker().offer(ClientChannelCredentialEvent
                .channelReauthenticated(time, KafkaChannel.this, notYetAuthenticatedSaslClientAuthenticator
                    .expiringCredential());
        replaceAnyPreviouslyAuthenticatedAuthenticatorWithNewOne();
        return;
    }
}



    private void replaceAnyPreviouslyAuthenticatedAuthenticatorWithNewOne() {
        // swap the old authenticator (if any) for the new one
        Authenticator tmp = authenticatedAuthenticator;
        authenticatedAuthenticator = notYetAuthenticatedAuthenticator;
        notYetAuthenticatedAuthenticator = null;
        if (tmp != null)
            Utils.closeQuietly(tmp, "previousAuthenticatedAuthenticator");
    }
}

When the KafkaChannel instance invokes the SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver) method When the method SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver) is invoked it is up to the SaslClientAuthenticator instance to use its AuthenticationRequestEnqueuer instance to enqueue an ApiVersionsRequest along with an appropriate callback that either fails the re-authentication if the ApiVersionsRequest fails or continues it with a SaslHandshakeRequest.  Here is what the code looks like:

...