Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

First, at a high level, we recognize that org.apache.kafka.clients.NetworkClient can be used in either an asynchronous or synchronous manner; in async mode the NetworkClient.poll() method is used directly, whereas in synchronous mode repeatedly called directly in a run loop, whereas in synchronous mode the NetworkClientUtils.sendAndReceive() method is used whenever something has to be sent (this method in turn calls NetworkClient.poll(), but otherwise poll() is not invoked).  The fact that these two separate use cases exist is important because the requests related to re-authentication (ApiVersionsRequest, SaslHandshakeRequest, and SaslAuthenticateRequest) cannot simply be injected into a queue that NetworkClient owns; we don't know when poll() will be called in the synchronous case, for example – it is only invoked when something needs to be sent, and that might take a while (if it occurs at all).  So a single queue approach won't work.

The approach we take is to define an interface org.apache.kafka.common.security.authenticator.AuthenticationRequestEnqueuer that declares 3 separate enqueueRequest() methods (one for each of the request types related to re-authentication).  We leave it to the owners of the NetworkClient instances to define how to inject the requests by providing an implementation of the interface to the SaslChannelBuilder, which in turn provides it to the SaslClientAuthenticator.

For example, the org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient class provides the authenticationRequestEnqueuer() method that yields an instance that invokes ConsumerNetworkClient.send(Node, AbstractRequest.Builder<?>); it is then a simple matter of adding the following code to the constructors of the org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.connect.runtime.distributed.WorkerGroupMember classes:


Code Block
languagejava
titleConstructor Code Addition
if (channelBuilder instanceof SaslChannelBuilder)
    ((SaslChannelBuilder) channelBuilder).authenticationRequestEnqueuer(client.authenticationRequestEnqueuer());



Compatibility, Deprecation, and Migration Plan

...