...
First, at a high level, we recognize that org.apache.kafka.clients.NetworkClient
can be used in either an asynchronous or synchronous manner; in async mode the NetworkClient.poll()
method is used directly, whereas in synchronous mode repeatedly called directly in a run loop, whereas in synchronous mode the NetworkClientUtils.sendAndReceive()
method is used whenever something has to be sent (this method in turn calls NetworkClient.poll()
, but otherwise poll() is not invoked). The fact that these two separate use cases exist is important because the requests related to re-authentication (ApiVersionsRequest
, SaslHandshakeRequest
, and SaslAuthenticateRequest
) cannot simply be injected into a queue that NetworkClient
owns; we don't know when poll()
will be called in the synchronous case, for example – it is only invoked when something needs to be sent, and that might take a while (if it occurs at all). So a single queue approach won't work.
The approach we take is to define an interface org.apache.kafka.common.security.authenticator.AuthenticationRequestEnqueuer
that declares 3 separate enqueueRequest()
methods (one for each of the request types related to re-authentication). We leave it to the owners of the NetworkClient
instances to define how to inject the requests by providing an implementation of the interface to the SaslChannelBuilder
, which in turn provides it to the SaslClientAuthenticator
.
For example, the org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient
class provides the authenticationRequestEnqueuer()
method that yields an instance that invokes ConsumerNetworkClient.send(Node, AbstractRequest.Builder<?>)
; it is then a simple matter of adding the following code to the constructors of the org.apache.kafka.clients.consumer.KafkaConsumer
and org.apache.kafka.connect.runtime.distributed.WorkerGroupMember
classes:
Code Block | ||||
---|---|---|---|---|
| ||||
if (channelBuilder instanceof SaslChannelBuilder)
((SaslChannelBuilder) channelBuilder).authenticationRequestEnqueuer(client.authenticationRequestEnqueuer());
|
Compatibility, Deprecation, and Migration Plan
...