...
Code Block | ||||
---|---|---|---|---|
| ||||
if (channelBuilder instanceof SaslChannelBuilder) ((SaslChannelBuilder) channelBuilder) .authenticationRequestEnqueuer(client.authenticationRequestEnqueuer()); |
Another example is kafka.controller.ControllerChannelManager
. Instances of this class maintain one request queue per broker with one background thread allocated for each broker to monitor the queue and send the requests that appear. The requests are sent in a synchronous fashion with respect to the thread when a request appears in the queue (i.e. unlike ConsumerNetworkClient
, which is an async use case, this is a synchronous NetworkClient
use case). It is necessary to invoke sendRequest()
to add a request to the queue for a broker and have it sent. The code in the addNewBroker()
method therefore looks like this:
Code Block | ||||
---|---|---|---|---|
| ||||
if (channelBuilder.isInstanceOf[SaslChannelBuilder])
channelBuilder.asInstanceOf[SaslChannelBuilder].authenticationRequestEnqueuer(
new AuthenticationRequestEnqueuer() {
def enqueueRequest(authenticationRequest: AuthenticationRequest): Unit =
sendRequest(authenticationRequest.nodeId(), authenticationRequest.requestBuilder().apiKey(),
authenticationRequest.requestBuilder(), null, authenticationRequest.authenticationRequestCompletionHandler())
}) |
So the general approach is to identify how to inject requests and then set the appropriate AuthenticationRequestEnqueuer
implementation on the SaslChannelBuilder
. In addition to the above examples the PR also includes implementations for org.apache.kafka.clients.admin.KafkaAdminClient
, org.apache.kafka.clients.producer.KafkaProducer
, and kafka.coordinator.transaction.TransactionMarkerChannelManager
.
...
kafka.admin.AdminClient
(this class is deprecated)kafka.tools.ReplicaVerificationTool
(a low-priority an edge case, ; no need to support the option here)kafka.server.KafkaServer
(aNetworkClient
instance is only instantiated for shutdown and is therefore not long-lived enough to require re-authentication)org.apache.kafka.trogdor.workload.ConnectionStressWorker
(this class simply connects and doesn’t do anything else with the connection)
The PR does not yet have unit or integration tests – I will add these if/when the general implementation approach is verified by the community as being appropriate. I have stepped through the code in the debugger, and I can see from this exercise as well as and from the emitted log messages that it the implementation does in fact work as currently implemented.
The final issue to describe is how/when a KafkaChannel
instance (each of which corresponds to a unique network connection) is told to re-authenticate.
We define the class org.apache.kafka.common.security.expiring.internals.ClientChannelCredentialTracker
to keep track of the various KafkaChannel
instances and the ExpiringCredential
instances that they authenticated with. When the feature is enabled we add The following code in SaslChannelBuilder
adds an instance of this class to ClientChannelCredentialTracker
to the private credentials of the Subject
associated with the SASL mechanism using this code in SaslChannelBuilder
when the feature is enabled:
Code Block | ||||
---|---|---|---|---|
| ||||
LoginManager loginManager = LoginManager.acquireLoginManager(entry.getValue(), mechanism, defaultLoginClass, configs); loginManagers.put(mechanism, loginManager); Subject subject = loginManager.subject(); if (mode == Mode.CLIENT) { if (saslLoginRefreshReauthenticateEnable()) { log.info("SASL Login Refresh Re-Authenticate ENABLED"); if (subject.getPrivateCredentials(ClientChannelCredentialTracker.class).isEmpty()) subject.getPrivateCredentials().add(new ClientChannelCredentialTracker()); } else log.info("SASL Login Refresh Re-Authenticate DISABLED"); } |
The KafkaChannel
and ExpiringCredentialRefreshingLogin
classes can easily retrieve the ClientChannelCredentialTracker
instance and tell it when various events of interest occur (a channel is initially authenticated, a credential is refreshed, a channel is disconnected). The events are enqueued as they are reported, and a background thread processes them to decide which KafkaChannel
instances need to re-authenticate. There is a method KafkaChannel.initiateReauthentication(Time)
that instantiates an instance of SaslClientAuthenticator
and tells it to re-authenticate via the method SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver)
. The implementation of AuthenticationSuccessOrFailureReceiver
that is sent takes care of notifying the ClientChannelCredentialTracker
instance; cleaning up the state in KafkaChannel
related to the multi-step re-authentication process and notifying the ClientChannelCredentialTracker
instance; it looks something like this:
Code Block | ||||
---|---|---|---|---|
| ||||
new AuthenticationSuccessOrFailureReceiver() { @Override public void reauthenticationFailed(RetryIndication retry, String errorMessage) { log.warn(errorMessage); Utils.closeQuietly(notYetAuthenticatedAuthenticator, "notYetAuthenticatedAuthenticator"); notYetAuthenticatedAuthenticator = null; clientChannelCredentialTracker().offer(ClientChannelCredentialEvent .channelFailedReauthentication(time, KafkaChannel.this, errorMessage, retry)); } @Override public void reauthenticationSucceeded() { clientChannelCredentialTracker().offer(ClientChannelCredentialEvent .channelReauthenticated(time, KafkaChannel.this, notYetAuthenticatedSaslClientAuthenticator .expiringCredential()); replaceAnyPreviouslyAuthenticatedAuthenticatorWithNewOne(); return; } } private void replaceAnyPreviouslyAuthenticatedAuthenticatorWithNewOne() { // swap the old authenticator (if any) for the new one Authenticator tmp = authenticatedAuthenticator; authenticatedAuthenticator = notYetAuthenticatedAuthenticator; notYetAuthenticatedAuthenticator = null; if (tmp != null) Utils.closeQuietly(tmp, "previousAuthenticatedAuthenticator"); } } |
When the KafkaChannel
instance invokes the SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver)
method When the method SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver)
is invoked it is up to the SaslClientAuthenticator
instance to use its AuthenticationRequestEnqueuer
instance to enqueue an ApiVersionsRequest
along with an appropriate callback that either fails the re-authentication if the ApiVersionsRequest
fails or continues it with a SaslHandshakeRequest
. Here is what the code looks like:
...