Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The approach we take is to define an interface org.apache.kafka.common.security.authenticator.AuthenticationRequestEnqueuer that declares an enqueueRequest(AuthenticationRequest) method.  We leave it to the owners of the NetworkClient instances to define how to inject such requests by providing an implementation of the interface to the SaslChannelBuilder, which in turn provides it to the SaslClientAuthenticator.

For example, the org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient class provides the authenticationRequestEnqueuer() method that yields an instance that invokes ConsumerNetworkClient.send(Node, AbstractRequest.Builder<?>); it is then a simple matter of adding the following code to the constructors of the org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.connect.runtime.distributed.WorkerGroupMember classes, which use ConsumerNetworkClient for their connectivity:

...

Another example is kafka.controller.ControllerChannelManager.  Instances of this class maintain one request queue per broker with one background thread allocated for each broker to monitor the queue and send the requests that appear.  The requests are sent in a synchronous fashion with respect to the thread when a request appears in the queue (i.e. unlike ConsumerNetworkClient, which is an async use case that uses a run loop, this is a synchronous NetworkClient use case that only sends requests when they appear).  It is necessary to invoke sendRequest() to add a request to the queue for a broker and have it sent.  The code in the addNewBroker() method therefore looks like this:

...

So the general approach is to identify how to inject requests and then set the appropriate AuthenticationRequestEnqueuer implementation on the SaslChannelBuilderIncluding the above examples, the PR This KIP adds re-authentication support for the following:

  • org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient
    • org.apache.kafka.clients.consumer.KafkaConsumer
    • org.apache.kafka.connect.runtime.distributed.WorkerGroupMember
  • kafka.controller.ControllerChannelManager
  • org.apache.kafka.clients.admin.KafkaAdminClient
  • org.apache.kafka.clients.producer.KafkaProducer
  • kafka.coordinator.transaction.TransactionMarkerChannelManager
  • kafka.server.ReplicaFetcherBlockingSend (kafka.server.ReplicaFetcherThread)

There is no intent to include code in the PR to re-authenticate connections originating from these classes:

...

The PR does not yet have unit or integration tests – I will add these if/when the general implementation approach is verified by the community as being appropriate.  I have stepped through the code in the debugger, and I can see from this exercise and as well as from the emitted log messages during normal operation that the implementation does in fact work.  For example, connections are repeatedly re-authenticated a bit more frequently than about once a minute with this JAAS configuration:

...

The KafkaChannel and ExpiringCredentialRefreshingLogin classes can easily retrieve the ClientChannelCredentialTracker instance and tell it when various events of interest occur (a channel is initially authenticated, a credential is refreshed, a channel is disconnected).  The events are enqueued as they are reported, and a background thread processes them to decide which KafkaChannel instances need to re-authenticate.  There is a method KafkaChannel.initiateReauthentication(Time) that instantiates an instance of SaslClientAuthenticator and tells it to re-authenticate via the method SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver).  The implementation of AuthenticationSuccessOrFailureReceiver that is sent takes care of cleaning up the state in KafkaChannel related to the multi-step re-authentication process and notifying the ClientChannelCredentialTracker instance of the result; it looks something like this:

...