Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-7352
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The adoption of KIP-255: OAuth Authentication via SASL/OAUTHBEARER in release 2.0.0 creates the possibility of using information in the bearer token to make authorization decisions. Unfortunately, however, Kafka connections are long-lived, so there is no ability to change the bearer token associated with a particular connection. Allowing SASL connections to periodically re-authenticate would resolve this. In addition to this motivation there are two others that are security-related. First, to eliminate access to Kafka for connected clients, the current requirement is to remove all authorizations (i.e. remove all ACLs). This is necessary because of the long-lived nature of the connections. It is operationally simpler to shut off access at the point of authentication, and with the release of KIP-86: Configurable SASL Callback Handlers it is going to become more and more likely that installations will authenticate users against external directories (e.g. via LDAP). The ability to stop Kafka access by simply disabling an account in an LDAP directory (for example) is desirable. The second motivating factor for re-authentication related to security is that the use of short-lived tokens is a common OAuth security recommendation, but issuing a short-lived token to a Kafka client (or a broker when OAUTHBEARER is the inter-broker protocol) currently has no benefit because once a client is connected to a broker the client is never challenged again and the connection may remain intact beyond the token expiration time (and may remain intact indefinitely under perfect circumstances). This KIP proposes adding the ability for clients (and brokers when OAUTHBEARER is the inter-broker protocol) to re-authenticate their connections to brokers and have the new bearer token appear on their session rather than the old one.
The implementation is designed in such a way that it does not preclude adding support for re-authentication of other SASL mechanism (e.g. PLAIN, SCRAM-related, and GSSAPI), but doing so is explicitly out-of-scope for this proposal (a section below discusses how this could be done). Also explicitly out-of-scope for this proposal is the ability for brokers to close connections that continue to use expired credentials. This ability is a natural next step, but it will be addressed via a separate KIP if/when this one is adopted.
Public Interfaces
This KIP proposes the addition of a single interface to the API and a single additional configuration option to enable the feature (the option value defaults to false, of course, so there is no change to existing behavior in the absence of an explicit opt-in). Specifically, the interface it proposes to add is as follows:
The configuration option this KIP proposes to add is 'sasl.login.refresh.reauthenticate.enable
' – it defaults to false, and when explicitly set to true the feature will be enabled for any SASL connection that generates a private credential that implements the above ExpiringCredential
interface (this will be the case with OAUTHBEARER by default because the Login
implementation that is used by default for OAUTHBEARER will automatically wrap any OAuthBearerToken
instance to make it implement ExpiringCredential
if it does not already do so).
From a behavior perspective, when the option is enabled and a private credential implements ExpiringCredential
, the existing configuration options sasl.login.refresh.{window.factor, window.jitter, min.period.seconds, min.buffer.seconds}
define when the org.apache.kafka.common.security.auth.
implementation on the client must generate a new private credential (e.g. when it must retrieve a new bearer token). The default configuration values result in a new credential being retrieved sometime between 75% and 85% of the current credential's lifetime (e.g. sometime between 45 and 51 minutes after a token with an hour lifetime is initially retrieved). At that time, when the new token is retrieved, every connection that authenticated with the old credential will be told to re-authenticate using the new credential. After successful re-authentication, when the new credential is again refreshed (after another 45-51 minutes), the connections will be told once again to re-authenticate, etc.Login
Note that the class that implements the refresh logic (org.apache.kafka.common.security.expiring.internals.ExpiringCredentialRefreshingLogin
) is not considered part of the public API. This means that while it is up to the org.apache.kafka.common.security.auth.Login
implementation for a particular mechanism to implement the logic, and that implementation can delegate to ExpiringCredentialRefreshingLogin
to do so (as org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin
does), this is something that could only be be done for the built-in SASL mechanisms (e.g. PLAIN, SCRAM-related, and GSSAPI). There is no intent to support an ability to generate credentials that can be refreshed and re-authenticated for non-builtin mechanisms – nothing would prevent anyone from implementing such mechanisms and delegating to the class, of course, but it would be an unsupported use of an internal class that is not a public API.
If a re-authentication attempt should fail then the connection will be told to retry after some delay depending on how many retries have been attempted so far: after some small amount of time for the first retry (e.g. 1 minute), double that for the second retry, and 4 times the initial delay for every retry thereafter. Retry attempts generally occur at least until the current credential expires (but not indefinitely – and of course they won't continue if one of them actually succeeds). There are certain errors that result in retries not being attempted (i.e. if some internal state doesn't make sense, which generally should not happen). A retry is helpful when re-authentication fails on the server side due to some temporary outage (for example, the re-authentication will fail through no fault of the client if the token endpoint is unavailable and the broker has not yet cached the public key required to validate the token signature).
No metrics are affected/created by this proposal. In particular, re-authentications are not counted as authentications, and no metrics related to re-authentication are created.
Although there is no technical reason preventing it, we arbitrarily decide to disallow changing identities upon re-authentication. For example, if a connection originally authenticates as User:user1, an attempt to re-authenticate as anything else (e.g. User:user2) will fail. Retry is allowed in this case (still subject to the expiration of the original credential as described above).
Delaying the return of a response upon failed re-authentication attempts (a la KIP-306: Configuration for Delaying Response to Failed Authentication) is not necessary since re-authentication attempts do not require TLS negotiation and are therefore considerably cheaper than initial authentication attempts.
Proposed Changes
The description of this KIP is actually quite straightforward from a functionality perspective – turn the feature on with the configuration option and it just works for OAUTHBEARER; use a custom LoginModule
for other mechanisms to create credentials implementing ExpiringCredential
and delegate to org.apache.kafka.common.security.expiring.internals.ExpiringCredentialRefreshingLogin
and it will work for those mechanisms, too. From an implementation perspective, though, the KIP is not so straightforward; it therefore includes a pull request with a proposed implementation. Here is a high-level description of how the proposed implementation generally works. Note that this description applies to the implementation only – none of this is part of the public API.
First, at a high level, we recognize that org.apache.kafka.clients.NetworkClient
can be used in either an asynchronous or synchronous manner; in async mode the NetworkClient.poll()
method is repeatedly called directly in a run loop, whereas in synchronous mode the NetworkClientUtils.sendAndReceive()
method is used whenever something has to be sent (this method in turn calls NetworkClient.poll()
, but otherwise poll() is not invoked). The fact that these two separate use cases exist is important because the requests related to re-authentication (ApiVersionsRequest
, SaslHandshakeRequest
, and SaslAuthenticateRequest
) cannot simply be injected into a queue that NetworkClient
owns; we don't know when poll()
will be called in the synchronous case, for example – it is only invoked when something needs to be sent, and that might take a while (if it occurs at all). So a single queue approach won't work.
The approach we take is to define an interface org.apache.kafka.common.security.authenticator.AuthenticationRequestEnqueuer
that declares an enqueueRequest(AuthenticationRequest)
method. We leave it to the owners of the NetworkClient
instances to define how to inject such requests by providing an implementation of the interface to the SaslChannelBuilder
, which in turn provides it to the SaslClientAuthenticator
.
For example, the org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient
class provides the authenticationRequestEnqueuer()
method that yields an instance that invokes ConsumerNetworkClient.send(Node, AbstractRequest.Builder<?>)
; it is then a simple matter of adding the following code to the constructors of the org.apache.kafka.clients.consumer.KafkaConsumer
and org.apache.kafka.connect.runtime.distributed.WorkerGroupMember
classes, which use ConsumerNetworkClient
for their connectivity:
if (channelBuilder instanceof SaslChannelBuilder) ((SaslChannelBuilder) channelBuilder) .authenticationRequestEnqueuer(client.authenticationRequestEnqueuer());
Another example is kafka.controller.ControllerChannelManager
. Instances of this class maintain one request queue per broker with one background thread allocated for each broker to monitor the queue and send the requests that appear. The requests are sent in a synchronous fashion with respect to the thread when a request appears in the queue (i.e. unlike ConsumerNetworkClient
, which is an async use case that uses a run loop, this is a synchronous NetworkClient
use case that only sends requests when they appear). It is necessary to invoke sendRequest()
to add a request to the queue for a broker and have it sent. The code in the addNewBroker()
method therefore looks like this:
if (channelBuilder.isInstanceOf[SaslChannelBuilder]) channelBuilder.asInstanceOf[SaslChannelBuilder].authenticationRequestEnqueuer( new AuthenticationRequestEnqueuer() { def enqueueRequest(authenticationRequest: AuthenticationRequest): Unit = sendRequest(authenticationRequest.nodeId(), authenticationRequest.requestBuilder().apiKey(), authenticationRequest.requestBuilder(), null, authenticationRequest.authenticationRequestCompletionHandler()) })
So the general approach is to identify how to inject requests and then set the appropriate AuthenticationRequestEnqueuer
implementation on the SaslChannelBuilder
. This KIP adds re-authentication support for the following:
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient
org.apache.kafka.clients.consumer.KafkaConsumer
org.apache.kafka.connect.runtime.distributed.WorkerGroupMember
kafka.controller.ControllerChannelManager
org.apache.kafka.clients.admin.KafkaAdminClient
org.apache.kafka.clients.producer.KafkaProducer
kafka.coordinator.transaction.TransactionMarkerChannelManager
kafka.server.ReplicaFetcherBlockingSend (kafka.server.ReplicaFetcherThread)
There is no intent to re-authenticate connections originating from these classes:
kafka.admin.AdminClient
(this class is deprecated)kafka.tools.ReplicaVerificationTool
(an edge case; no need to support the option here)kafka.server.KafkaServer
(aNetworkClient
instance is only instantiated for shutdown and is therefore not long-lived enough to require re-authentication)org.apache.kafka.trogdor.workload.ConnectionStressWorker
(this class simply connects and doesn’t do anything else with the connection)
The PR does not yet have unit or integration tests – I will add these if/when the general implementation approach is verified by the community as being appropriate. I have stepped through the code in the debugger, and I can see from this exercise as well as from the emitted log messages during normal operation that the implementation does in fact work. For example, connections are repeatedly re-authenticated about once a minute with this JAAS configuration:
KafkaClient { org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginLifetimeSeconds="60" unsecuredLoginStringClaim_sub="admin"; };
The final issue to describe is how/when a KafkaChannel
instance (each of which corresponds to a unique network connection) is told to re-authenticate.
We define the class org.apache.kafka.common.security.expiring.internals.ClientChannelCredentialTracker
to keep track of the various KafkaChannel
instances and the ExpiringCredential
instances that they authenticated with. The following code in SaslChannelBuilder
adds an instance of ClientChannelCredentialTracker
to the private credentials of the Subject
associated with the SASL mechanism when the feature is enabled:
LoginManager loginManager = LoginManager.acquireLoginManager(entry.getValue(), mechanism, defaultLoginClass, configs); loginManagers.put(mechanism, loginManager); Subject subject = loginManager.subject(); if (mode == Mode.CLIENT) { if (saslLoginRefreshReauthenticateEnable()) { log.info("SASL Login Refresh Re-Authenticate ENABLED"); if (subject.getPrivateCredentials(ClientChannelCredentialTracker.class).isEmpty()) subject.getPrivateCredentials().add(new ClientChannelCredentialTracker()); } else log.info("SASL Login Refresh Re-Authenticate DISABLED"); }
The KafkaChannel
and ExpiringCredentialRefreshingLogin
classes can easily retrieve the ClientChannelCredentialTracker
instance and tell it when various events of interest occur (a channel is initially authenticated, a credential is refreshed, a channel is disconnected). The events are enqueued as they are reported, and a background thread processes them to decide which KafkaChannel
instances need to re-authenticate. There is a method KafkaChannel.initiateReauthentication(Time)
that instantiates an instance of SaslClientAuthenticator
and tells it to re-authenticate via the method SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver)
. The implementation of AuthenticationSuccessOrFailureReceiver
that is sent takes care of cleaning up the state in KafkaChannel
related to the multi-step re-authentication process and notifying the ClientChannelCredentialTracker
instance of the result; it looks something like this:
new AuthenticationSuccessOrFailureReceiver() { @Override public void reauthenticationFailed(RetryIndication retry, String errorMessage) { log.warn(errorMessage); Utils.closeQuietly(notYetAuthenticatedAuthenticator, "notYetAuthenticatedAuthenticator"); notYetAuthenticatedAuthenticator = null; clientChannelCredentialTracker().offer(ClientChannelCredentialEvent .channelFailedReauthentication(time, KafkaChannel.this, errorMessage, retry)); } @Override public void reauthenticationSucceeded() { clientChannelCredentialTracker().offer(ClientChannelCredentialEvent .channelReauthenticated(time, KafkaChannel.this, notYetAuthenticatedSaslClientAuthenticator .expiringCredential()); replaceAnyPreviouslyAuthenticatedAuthenticatorWithNewOne(); return; } private void replaceAnyPreviouslyAuthenticatedAuthenticatorWithNewOne() { // swap the old authenticator (if any) for the new one Authenticator tmp = authenticatedAuthenticator; authenticatedAuthenticator = notYetAuthenticatedAuthenticator; notYetAuthenticatedAuthenticator = null; if (tmp != null) Utils.closeQuietly(tmp, "previousAuthenticatedAuthenticator"); } }
When the KafkaChannel
instance invokes the SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver)
method it is up to the SaslClientAuthenticator
instance to use its AuthenticationRequestEnqueuer
instance to enqueue an ApiVersionsRequest
along with an appropriate callback that either fails the re-authentication if the ApiVersionsRequest
fails or continues it with a SaslHandshakeRequest
. Here is what the code looks like:
public void initiateReauthentication(Time time, AuthenticationSuccessOrFailureReceiver authenticationSuccessOrFailureReceiver) { if (saslState != SaslState.SEND_APIVERSIONS_REQUEST) { notifyFailureDueToUnexpectedState(SaslState.SEND_APIVERSIONS_REQUEST, authenticationSuccessOrFailureReceiver) return; } authenticationRequestEnqueuer().enqueueRequest(new AuthenticationRequest(this.node, new ApiVersionsRequest.Builder((short) 0), authenticationRequestCompletionHandlerForApiVersionsRequest(time, authenticationSuccessOrFailureReceiver))); } private void notifyFailureDueToUnexpectedState(SaslState expectedSaslState, AuthenticationSuccessOrFailureReceiver authenticationSuccessOrFailureReceiver) { // it is possible that the connection was closed; don't retry SaslState receivedSaslState = saslState; if (saslState != SaslState.CLOSED) saslState = SaslState.FAILED; authenticationSuccessOrFailureReceiver.reauthenticationFailed(RetryIndication.DO_NOT_RETRY, String.format( "Re-authentication was in process but could not proceed because authenticator state is incorrect (expected %s): %s", expectedSaslState, receivedSaslState)); }
And of course the method authenticationRequestCompletionHandlerForApiVersionsRequest(Time, AuthenticationSuccessOrFailureReceiver)
looks similar but deals with sending a SaslHandshakeRequest
(see the PR for details).
Finally, we add methods on SaslServerAuthenticator
to respond to the requests that arrive related to re-authentication – specifically, respondToReauthenticationSaslHandshakeRequest()
and respondToReauthenticationSaslAuthenticateRequest()
– and we add code to kafka.server.KafkaApis
to route the received requests accordingly (as opposed to responding with an error, which is what currently happens). I won't go into the details here; see the PR for the code.
How To Support Re-Authentication for Other SASL Mechanisms
It was mentioned above that the design supports adding re-authentication to PLAIN, GSSAPI, and SCRAM-related SASL mechanisms but that doing so is out of scope for this KIP. It is helpful to describe how this could be done in a bit more detail.
There are two requirements: the credential that the Login
mechanism adds to the Subject
's private credentials on the client side must implement the ExpiringCredential
interface; and the client and server sides have to agree on the expiration time of each credential so that the client will know when to refresh the credential and the server will know when the presented credential expires (assuming we implement the ability for a server to close "expired" connections).
Implementing the ExpiringCredential
interface is pretty easily done by wrapping the data that the LoginModule
currently creates and adjusting the SASL Client callback handler accordingly. For example, the org.apache.kafka.common.security.plain.PlainLoginModule
class currently adds a String
to the private credentials for the password and a String
to the public credentials for the username; it would instead store an instance of a class that implements ExpiringCredential
and includes an additional password()
method, and the org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler
class would be amended to pull the data from that object if it exists (rather than from the pair of String
values, which it would fall back to if no ExpiringCredential
existed). Similar adjustments could be done for the SCRAM-related mechanisms as well. The GSSAPI mechanism would be a bit different because we don't control all the code (we reuse an existing LoginModule
implementation, for example); in such cases we would simply use our own implementation that delegates to existing one and that augments where necessary (e.g. in the commit()
method of the LoginModule
implementation).
Getting the client and server to agree on a credential expiration time may sometimes be a challenge. It is easy with OAUTHBEARER because bearer tokens advertise their expiration times (either as part of the JWT or via a call to the token endpoint). GSSAPI also is likely to be straightforward because Kerberos tickets have an expiration time. What about PLAIN and the SCRAM-related SASL mechanisms? These present challenges. The SCRAM-related SASL mechanisms support SASL extensions, so we could leverage this to have the client send an agreed-upon name/value pair to the server indicating the credential lifetime it desires; the server could fail the authentication if the requested lifetime exceeds its maximum allowed value, but otherwise it would accept the lifetime that the client wants and proceed as it does today. The PLAIN SASL mechanism does not support extensions, so we would have to come up with another option (which we could also use for the SCRAM-related mechanisms if we wanted to be consistent). We could add a new API call so that clients could ask servers for the lifetime they use, or we could extend the SaslHandshakeRequest/Response
API call to include that information in the server's response – the client would then adopt that value. Or we could just configure the values on the client and server and require the configurations to be consistent with each other (i.e. the client-side configuration value must not exceed the server-side configuration value).
Getting the client and server to agree on the credential lifetime is the challenge; discussing the options, documenting the solution, and implementing it would all be done as part of a KIP.
Compatibility, Deprecation, and Migration Plan
There is no impact to existing installations because the default is for the feature to be turned off.
Rejected Alternatives
A single queue owned by org.apache.kafka.clients.NetworkClient
does not reliably work in the synchronous case as described above; we cannot know how soon an injected request will be sent (if ever).
One alternative idea is to add two new request types: "ReceiveReauthenticationNonceRequest" and "ReauthenticateWithNonceRequest". When re-authentication needs to occur the client would make a separate, new connection to the broker and send a "ReceiveReauthenticationNonceRequest" to the broker to have it associate a nonce with the authenticated credentials and return the nonce to the client. Then the client would send a "ReauthenticateWithNonceRequest" with the returned nonce over the connection that it wishes to re-authenticate; the broker would then replace the credentials on that connection with the credentials it had previously associated with the nonce. I don't know if this would work (might there be some issue with advertised vs. actual addresses and maybe the possibility of there being a load balancer? Could we be guaranteed the ability to connect to the exact same broker as our existing connection?) . If it could work then it does have the advantage of requiring the injection of just a single request over an existing connection that would return very quickly rather than 3 separate requests of which at least one might take a while to return (to potentially retrieve a public key for token signature validation, for example; the validation itself isn't exactly free, either, even if the public key is already cached). One disadvantage of the alternative, nonce-based approach is that it requires the creation of a separate connection, including TLS negotiation, and that is very expensive compared to sending 3 requests over an existing connection (which of course already has TLS negotiated).
A brute-force alternative is to simply kill the connection on the client side when the background login thread refreshes the credential. The advantage is that we don't need a code path for re-authentication – the client simply connects again to replace the connection that was killed. There are many disadvantages, though. The approach is harsh – having connections pulled out from underneath the client will introduce latency while the client reconnects; it introduces non-trivial resource utilization on both the client and server as TLS is renegotiated; and it forces the client to periodically "recover" from what essentially looks like a failure scenario. While these are significant disadvantages, the most significant disadvantage of all is that killing connections on the client side adds no security – trusting the client to kill its connection in a timely fashion is a blind and unjustifiable trust.
We could kill the connection from the server side instead, when the token expires. But in this case, if there is no ability for the client to re-authenticate to avoid the killing of the connection in the first place, then we still have all of the harsh approach disadvantages mentioned above.