Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Another example is kafka.controller.ControllerChannelManager.  Instances of this class maintain one request queue per broker with one background thread allocated for each broker to monitor the queue and send the requests that appear.  The requests are sent in a synchronous fashion with respect to thread only when a request appears in the queue (i.e. unlike ConsumerNetworkClient, which is an async use case, this is a synchronous use case).  It is necessary to invoke sendRequest() to add a request to the queue for a broker and have it sent.  The code in the addNewBroker() method therefore looks like this:

Code Block
languagescala
titleControllerChannelManager.addNewBroker() code
      if (channelBuilder.isInstanceOf[SaslChannelBuilder])
        channelBuilder.asInstanceOf[SaslChannelBuilder].authenticationRequestEnqueuer(
          new AuthenticationRequestEnqueuer() {
            def enqueueRequest(nodeId: String, apiVersionsRequestBuilder: ApiVersionsRequest.Builder,
              callback: AuthenticationRequestCompletionHandler): Unit =
                sendRequest(nodeId.toInt, apiVersionsRequestBuilder.apiKey(), apiVersionsRequestBuilder, null, callback)

            def enqueueRequest(nodeId: String, saslHandshakeRequestBuilder: SaslHandshakeRequest.Builder,
              callback: AuthenticationRequestCompletionHandler): Unit =
                sendRequest(nodeId.toInt, saslHandshakeRequestBuilder.apiKey(), saslHandshakeRequestBuilder, null, callback)

            def enqueueRequest(nodeId: String, saslAuthenticateRequestBuilder: SaslAuthenticateRequest.Builder,
              callback: AuthenticationRequestCompletionHandler): Unit =
                sendRequest(nodeId.toInt, saslAuthenticateRequestBuilder.apiKey(), saslAuthenticateRequestBuilder, null, callback)
          })

So the general approach is to identify how to inject requests and then set the appropriate AuthenticationRequestEnqueuer implementation on the SaslChannelBuilder.  In addition to the above examples the PR also includes an implementation for org.apache.kafka.clients.admin.KafkaAdminClient.

The PR does not yet have implementations for produce-related classes.  For example:

  • org.apache.kafka.clients.producer.KafkaProducer

  • kafka.coordinator.transaction.TransactionMarkerChannelManager

  • kafka.server.ReplicaFetcherBlockingSend

  • kafka.common.InterBrokerSendThread

The PR also does not yet have unit or integration tests – I will add these if/when the general implementation approach is verified by the community as being appropriate.  I have stepped through the code in the debugger, and it does work.

The final issue to describe is how/when a KafkaChannel instance (each of which corresponds to a unique network connection) is told to re-authenticate.

Compatibility, Deprecation, and Migration Plan

...