Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Adopted in 2.2

Table of Contents

Status

Current stateUnder DiscussionAdopted (in 2.2)

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]7352

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The adoption of KIP-255: OAuth Authentication via SASL/OAUTHBEARER in release 2.0.0 creates the possibility of using information in the bearer token to make authorization decisions.  Unfortunately, however, Kafka connections are long-lived, so there is no ability to change the bearer token associated with a particular connection.  Allowing SASL connections to periodically re-authenticate would resolve this.  In addition to this motivation there are two others that are security-related.  First, to eliminate access to Kafka for connected clients, the current requirement is to remove all authorizations (i.e. remove all ACLs).  This is necessary because of the long-lived nature of the connections.  It is operationally simpler to shut off access at the point of authentication, and with the release of KIP-86: Configurable SASL Callback Handlers it is going to become more and more likely that installations will authenticate users against external directories (e.g. via LDAP).  The ability to stop Kafka access by simply disabling an account in an LDAP directory (for example) is desirable.  The second motivating factor for re-authentication related to security is that the use of short-lived tokens is a common OAuth security recommendation, but issuing a short-lived token to a Kafka client (or a broker when OAUTHBEARER is the inter-broker protocol) currently has no benefit because once a client is connected to a broker the client is never challenged again and the connection may remain intact beyond the token expiration time (and may remain intact indefinitely under perfect circumstances).  This KIP proposes adding the ability for SASL clients (and brokers when OAUTHBEARER a SASL mechanism is the inter-broker protocol) to re-authenticate their connections to brokers and have the new bearer token appear on their session rather than the old one.

The implementation is designed in such a way that it does not preclude adding support for re-authentication of other SASL mechanism (e.g. PLAIN, SCRAM-related, and GSSAPI), but doing so is explicitly out-of-scope for this proposal.  Also explicitly out-of-scope for this proposal is the ability for brokers to close connections that continue to use expired credentials.  This ability is a natural next step, but it will be addressed via a separate KIP if/when this one is adopted.

Public Interfaces

for brokers to close connections that continue to use expired sessions.

This KIP has no impact on non-SASL connections (e.g. connections that use the PLAINTEXT or SSL security protocols) – no such connection will be re-authenticated, and no such connection will be closed.

Public Interfaces

This KIP proposes the addition of a configuration option to enable the server-side expired-connection-kill feature (the option default results in no functionality changeThis KIP proposes the addition of a single interface to the API and a single additional configuration option to enable the feature (the option value defaults to false, of course, so there is no change to existing behavior in the absence of an explicit opt-in).  Specifically, the interface it proposes to add is as follows:

Code Block
languagejava
titleorg.apache.kafka.common.security.expiring.ExpiringCredential
collapsetrue
/**
 * A credential that expires and that can potentially be refreshed; such a
 * refreshed credential can also potentially be used to re-authenticate an
 * existing connection.
 * <p>
 * The parameters that impact how the refresh algorithm operates are specified
 * as part of the producer/consumer/broker configuration and are as follows. See
 * the documentation for these properties elsewhere for details.
 * <table>
 * <tr>
 * <th>Producer/Consumer/Broker Configuration Property</th>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.window.factor}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.window.jitter}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.min.period.seconds}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.min.buffer.seconds}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.reauthenticate.enable}</td>
 * </tr>
 * </table>
 * <p>
 * This interface was introduced in 2.1.0 and, while it feels stable, it could
 * evolve. We will try to evolve the API in a compatible manner, but we reserve
 * the right to make breaking changes in minor releases, if necessary. We will
 * update the {@code InterfaceStability} annotation and this notice once the API
 * is considered stable.
 * 
 * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_REAUTHENTICATE_ENABLE_DOC
 */
@InterfaceStability.Evolving
public interface ExpiringCredential {
    /**
     * The value to use when asking a {@code SaslClient} instance for the
     * {@code ExpiringCredential} that was used to authenticate, if any
     */
    public static final String SASL_CLIENT_NEGOTIATED_PROPERTY_NAME = "Expiring.Credential";

    /**
     * The name of the principal to which this credential applies (used only for
     * logging)
     * 
     * @return the always non-null/non-empty principal name
     */
    String principalName();

    /**
     * When the credential became valid, in terms of the number of milliseconds
     * since the epoch, if known, otherwise null. An expiring credential may not
     * necessarily indicate when it was created -- just when it expires -- so we
     * need to support a null return value here.
     * 
     * @return the time when the credential became valid, in terms of the number of
     *         milliseconds since the epoch, if known, otherwise null
     */
    Long startTimeMs();

    /**
     * When the credential expires, in terms of the number of milliseconds since the
     * epoch. All expiring credentials by definition must indicate their expiration
     * time -- thus, unlike other methods, we do not support a null return value
     * here.
     * 
     * @return the time when the credential expires, in terms of the number of
     *         milliseconds since the epoch
     */
    long expireTimeMs();

    /**
     * The point after which the credential can no longer be refreshed, in terms of
     * the number of milliseconds since the epoch, if any, otherwise null. Some
     * expiring credentials can be refreshed over and over again without limit, so
     * we support a null return value here.
     * 
     * @return the point after which the credential can no longer be refreshed, in
     *         terms of the number of milliseconds since the epoch, if any,
     *         otherwise null
     */
    Long absoluteLastRefreshTimeMs();
}

The configuration option this KIP proposes to add is 'sasl.login.refresh.reauthenticate.enable' – it defaults to false, and when explicitly set to true the feature will be enabled for any SASL connection that generates a private credential that implements the above ExpiringCredential interface (this will be the case with OAUTHBEARER by default because the Login implementation that is used by default for OAUTHBEARER will automatically wrap any OAuthBearerToken instance to make it implement ExpiringCredential if it does not already do so).

From a behavior perspective, when the option is enabled and a private credential implements ExpiringCredential, the existing configuration options sasl.login.refresh.{window.factor, window.jitter, min.period.seconds, min.buffer.seconds} define when the org.apache.kafka.common.security.auth.Login implementation on the client must generate a new private credential (e.g. when it must retrieve a new bearer token).  The default configuration values result in a new credential being retrieved sometime between 75% and 85% of the current credential's lifetime (e.g. sometime between 45 and 51 minutes after a token with an hour lifetime is initially retrieved).  At that time, when the new token is retrieved, every connection that authenticated with the old credential will be told to re-authenticate using the new credential.  After successful re-authentication, when the new credential is again refreshed (after another 45-51 minutes), the connections will be told once again to re-authenticate, etc.

Note that the class that implements the refresh logic (org.apache.kafka.common.security.expiring.internals.ExpiringCredentialRefreshingLogin) is not considered part of the public API.  This means that while it is up to the org.apache.kafka.common.security.auth.Login implementation for a particular mechanism to implement the logic, and that implementation can delegate to ExpiringCredentialRefreshingLogin to do so (as org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin does), this is something that could only be be done for the built-in SASL mechanisms (e.g. PLAIN, SCRAM-related, and GSSAPI).  There is no intent to support an ability to generate credentials that can be refreshed and re-authenticated for non-builtin mechanisms – nothing would prevent anyone from implementing such mechanisms and delegating to the class, of course, but it would be an unsupported use of an internal class that is not a public API.

If a re-authentication attempt should fail then the connection will be told to retry after some delay depending on how many retries have been attempted so far: after some small amount of time for the first retry (e.g. 1 minute), double that for the second retry, and 4 times the initial delay for every retry thereafter.  Retry attempts generally occur indefinitely until either one of them succeeds or the connection is closed.  There are certain errors that result in retries not being attempted (i.e. if some internal state doesn't make sense, which generally should not happen), and there are certain other errors that will only result in a retry if the existing token is still active (generally errors like an IOException, for example).

No metrics are affected/created by this proposal.  In particular, re-authentications are not counted as authentications, and no metrics related to re-authentication are created.

Although there is no technical reason preventing it, we arbitrarily decide to disallow changing identities upon re-authentication.  For example, if a connection originally authenticates as User:user1, an attempt to re-authenticate as anything else (e.g. User:user2) will fail.  Retry is allowed indefinitely in this case.

Proposed Changes

The description of this KIP is actually quite straightforward from a functionality perspective – turn the feature on with the configuration option and it just works for OAUTHBEARER; use a custom LoginModule for other mechanisms to create credentials implementing ExpiringCredential and delegate to org.apache.kafka.common.security.expiring.internals.ExpiringCredentialRefreshingLogin and it will work for those mechanisms, too.  From an implementation perspective, though, the KIP is not so straightforward; it therefore includes a pull request with a proposed implementation.  Here is a high-level description of how the proposed implementation generally works.  Note that this description applies to the implementation only – none of this is part of the public API.

First, at a high level, we recognize that org.apache.kafka.clients.NetworkClient can be used in either an asynchronous or synchronous manner; in async mode the NetworkClient.poll() method is repeatedly called directly in a run loop, whereas in synchronous mode the NetworkClientUtils.sendAndReceive() method is used whenever something has to be sent (this method in turn calls NetworkClient.poll(), but otherwise poll() is not invoked).  The fact that these two separate use cases exist is important because the requests related to re-authentication (ApiVersionsRequest, SaslHandshakeRequest, and SaslAuthenticateRequest) cannot simply be injected into a queue that NetworkClient owns; we don't know when poll() will be called in the synchronous case, for example – it is only invoked when something needs to be sent, and that might take a while (if it occurs at all).  So a single queue approach won't work.

The approach we take is to define an interface org.apache.kafka.common.security.authenticator.AuthenticationRequestEnqueuer that declares an enqueueRequest(AuthenticationRequest) method.  We leave it to the owners of the NetworkClient instances to define how to inject such requests by providing an implementation of the interface to the SaslChannelBuilder, which in turn provides it to the SaslClientAuthenticator.

For example, the org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient class provides the authenticationRequestEnqueuer() method that yields an instance that invokes ConsumerNetworkClient.send(Node, AbstractRequest.Builder<?>); it is then a simple matter of adding the following code to the constructors of the org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.connect.runtime.distributed.WorkerGroupMember classes, which use ConsumerNetworkClient for their connectivity:

Code Block
languagejava
titleConstructor Code Addition for Users of ConsumerNetworkClient
if (channelBuilder instanceof SaslChannelBuilder)
    ((SaslChannelBuilder) channelBuilder)
            .authenticationRequestEnqueuer(client.authenticationRequestEnqueuer());

Another example is kafka.controller.ControllerChannelManager.  Instances of this class maintain one request queue per broker with one background thread allocated for each broker to monitor the queue and send the requests that appear.  The requests are sent in a synchronous fashion with respect to the thread when a request appears in the queue (i.e. unlike ConsumerNetworkClient, which is an async use case, this is a synchronous NetworkClient use case).  It is necessary to invoke sendRequest() to add a request to the queue for a broker and have it sent.  The code in the addNewBroker() method therefore looks like this:

Code Block
languagescala
titleControllerChannelManager.addNewBroker() Code Addition
if (channelBuilder.isInstanceOf[SaslChannelBuilder])
  channelBuilder.asInstanceOf[SaslChannelBuilder].authenticationRequestEnqueuer(
    new AuthenticationRequestEnqueuer() {
      def enqueueRequest(authenticationRequest: AuthenticationRequest): Unit =
          sendRequest(authenticationRequest.nodeId(), authenticationRequest.requestBuilder().apiKey(),
              authenticationRequest.requestBuilder(), null, authenticationRequest.authenticationRequestCompletionHandler())
    })

So the general approach is to identify how to inject requests and then set the appropriate AuthenticationRequestEnqueuer implementation on the SaslChannelBuilder.  Including the above examples, the PR adds re-authentication support for the following:

  • org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient
    • org.apache.kafka.clients.consumer.KafkaConsumer
    • org.apache.kafka.connect.runtime.distributed.WorkerGroupMember
  • kafka.controller.ControllerChannelManager
  • org.apache.kafka.clients.admin.KafkaAdminClient
  • org.apache.kafka.clients.producer.KafkaProducer
  • kafka.coordinator.transaction.TransactionMarkerChannelManager
  • kafka.server.ReplicaFetcherBlockingSend (kafka.server.ReplicaFetcherThread)

There is no intent to include code in the PR to re-authenticate connections originating from these classes:

  • kafka.admin.AdminClient (this class is deprecated)
  • kafka.tools.ReplicaVerificationTool (an edge case; no need to support the option here)
  • kafka.server.KafkaServer (a NetworkClient instance is only instantiated for shutdown and is therefore not long-lived enough to require re-authentication)

  • org.apache.kafka.trogdor.workload.ConnectionStressWorker (this class simply connects and doesn’t do anything else with the connection)

The PR does not yet have unit or integration tests – I will add these if/when the general implementation approach is verified by the community as being appropriate.  I have stepped through the code in the debugger, and I can see from this exercise and from the emitted log messages that the implementation does in fact work.  For example, connections are re-authenticated a bit more frequently than once a minute with this JAAS configuration:

Code Block
languagetext
titleJAAS Config: Re-Authenticate About Once a Minute
KafkaClient {
  org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
    unsecuredLoginLifetimeSeconds="60"
    unsecuredLoginStringClaim_sub="admin";
};

The final issue to describe is how/when a KafkaChannel instance (each of which corresponds to a unique network connection) is told to re-authenticate.

We define the class org.apache.kafka.common.security.expiring.internals.ClientChannelCredentialTracker to keep track of the various KafkaChannel instances and the ExpiringCredential instances that they authenticated with.  The following code in SaslChannelBuilder adds an instance of ClientChannelCredentialTracker to the private credentials of the Subject associated with the SASL mechanism when the feature is enabled:

Code Block
languagejava
titleSaslChannelBuilder Code to Enable the Feature
LoginManager loginManager = LoginManager.acquireLoginManager(entry.getValue(), mechanism, defaultLoginClass, configs);
loginManagers.put(mechanism, loginManager);
Subject subject = loginManager.subject();
if (mode == Mode.CLIENT) {
    if (saslLoginRefreshReauthenticateEnable()) {
        log.info("SASL Login Refresh Re-Authenticate ENABLED");
        if (subject.getPrivateCredentials(ClientChannelCredentialTracker.class).isEmpty())
            subject.getPrivateCredentials().add(new ClientChannelCredentialTracker());
    } else
        log.info("SASL Login Refresh Re-Authenticate DISABLED");
}

The KafkaChannel and ExpiringCredentialRefreshingLogin classes can easily retrieve the ClientChannelCredentialTracker instance and tell it when various events of interest occur (a channel is initially authenticated, a credential is refreshed, a channel is disconnected).  The events are enqueued as they are reported, and a background thread processes them to decide which KafkaChannel instances need to re-authenticate.  There is a method KafkaChannel.initiateReauthentication(Time) that instantiates an instance of SaslClientAuthenticator and tells it to re-authenticate via the method SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver).  The implementation of AuthenticationSuccessOrFailureReceiver that is sent takes care of cleaning up the state in KafkaChannel related to the multi-step re-authentication process and notifying the ClientChannelCredentialTracker instance; it looks something like this:

Code Block
languagejava
titleAuthenticationSuccessOrFailureReceiver
new AuthenticationSuccessOrFailureReceiver() {
    @Override
    public void reauthenticationFailed(RetryIndication retry, String errorMessage) {
        log.warn(errorMessage);
        Utils.closeQuietly(notYetAuthenticatedAuthenticator, "notYetAuthenticatedAuthenticator");
        notYetAuthenticatedAuthenticator = null;
        clientChannelCredentialTracker().offer(ClientChannelCredentialEvent
                .channelFailedReauthentication(time, KafkaChannel.this, errorMessage, retry));
    }

    @Override
    public void reauthenticationSucceeded() {
        clientChannelCredentialTracker().offer(ClientChannelCredentialEvent
                .channelReauthenticated(time, KafkaChannel.this, notYetAuthenticatedSaslClientAuthenticator
                    .expiringCredential());
        replaceAnyPreviouslyAuthenticatedAuthenticatorWithNewOne();
        return;
    }



    private void replaceAnyPreviouslyAuthenticatedAuthenticatorWithNewOne() {
        // swap the old authenticator (if any) for the new one
        Authenticator tmp = authenticatedAuthenticator;
        authenticatedAuthenticator = notYetAuthenticatedAuthenticator;
        notYetAuthenticatedAuthenticator = null;
        if (tmp != null)
            Utils.closeQuietly(tmp, "previousAuthenticatedAuthenticator");
    }
}

When the KafkaChannel instance invokes the SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver) method it is up to the SaslClientAuthenticator instance to use its AuthenticationRequestEnqueuer instance to enqueue an ApiVersionsRequest along with an appropriate callback that either fails the re-authentication if the ApiVersionsRequest fails or continues it with a SaslHandshakeRequest.  Here is what the code looks like:

Code Block
languagejava
titleSaslClientAuthenticator.initiateReauthentication()
    public void initiateReauthentication(Time time,
            AuthenticationSuccessOrFailureReceiver authenticationSuccessOrFailureReceiver) {
        if (saslState != SaslState.SEND_APIVERSIONS_REQUEST) {
            notifyFailureDueToUnexpectedState(SaslState.SEND_APIVERSIONS_REQUEST,
                    authenticationSuccessOrFailureReceiver)
            return;
        }
        authenticationRequestEnqueuer().enqueueRequest(new AuthenticationRequest(this.node, new ApiVersionsRequest.Builder((short) 0),
                authenticationRequestCompletionHandlerForApiVersionsRequest(time,
                        authenticationSuccessOrFailureReceiver)));
    }

    private void notifyFailureDueToUnexpectedState(SaslState expectedSaslState,
            AuthenticationSuccessOrFailureReceiver authenticationSuccessOrFailureReceiver) {
        // it is possible that the connection was closed; don't retry
        SaslState receivedSaslState = saslState;
        if (saslState != SaslState.CLOSED)
            saslState = SaslState.FAILED;
        authenticationSuccessOrFailureReceiver.reauthenticationFailed(RetryIndication.DO_NOT_RETRY, String.format(
                "Re-authentication was in process but could not proceed because authenticator state is incorrect (expected %s): %s",
                expectedSaslState, receivedSaslState));
    }

And of course the method authenticationRequestCompletionHandlerForApiVersionsRequest(Time, AuthenticationSuccessOrFailureReceiver) looks similar but deals with sending a SaslHandshakeRequest (see the PR for details).

Finally, we add methods on SaslServerAuthenticator to respond to the requests that arrive related to re-authentication – specifically, respondToReauthenticationSaslHandshakeRequest() and respondToReauthenticationSaslAuthenticateRequest() – and we add code to kafka.server.KafkaApis to route the received requests accordingly (as opposed to responding with an error, which is what currently happens).  I won't go into the details here; see the PR for the code.

Compatibility, Deprecation, and Migration Plan

There is no impact to existing installations because the default is for the feature to be turned off.

Rejected Alternatives

A single queue owned by org.apache.kafka.clients.NetworkClient does not reliably work in the synchronous case as described above; we cannot know how soon an injected request will be sent (if ever).

This KIP also proposes bumping the version number for the SASL_AUTHENTICATE API to 1 (with a change in wire format) so that brokers can indicate the session expiration time to clients via an additional value on the last round-trip response.  Clients can use the max SASL_AUTHENTICATE version number supported by the server to determine if they are connected to a broker that supports re-authentication (true if version > 0).  This KIP also adds new metrics as described below.

The configuration option this KIP proposes to add to enable server-side expired-connection-kill is 'connections.max.reauth.ms' (not required to be prefixed with listener prefix or SASL mechanism name – such a value would be used across the cluster – but it may be as mentioned below). For example, "connections.max.reauth.ms=3600000".  The value represents the maximum value that could potentially be communicated as part of the new V1 SaslAuthenticateResponse.  The default value is 0, which means there is effectively no maximum communicated (0 will be sent, meaning "none"), server-side kill of expired connections is disabled, clients are not required to re-authenticate, and whether clients re-authenticate or not and at what interval is entirely up to them.  Existing SASL clients upgraded to v2.2.0 will be coded to not re-authenticate in this scenario.  The default value of 0 therefore results in no change whatsoever.

When 'connections.max.reauth.ms' is explicitly set to a positive number the server will disconnect any SASL connection that does not re-authenticate and subsequently uses the connection for any purpose other than re-authentication at any point beyond the communicated expiration point (which will not exceed the configured maximum value).  For example, if the configured value is 3600000 (1 hour) and the remaining lifetime of a bearer token presented at the time of authentication is 45 minutes, then 45 minutes is communicated back to the client and the server would kill the connection if it is not re-authenticated within 45 minutes and it is then actively used for anything other than re-authentication.  As a further example, if the configured value is 3600000 and the credential lifetime is either unspecified or greater than 1 hour then 1 hour would be communicated to the client and the server would kill the connection if it is not re-authenticated within 1 hour and it is then actively used for anything other than re-authentication.

Older clients will of course not have the session expiration time communicated to them since they will use a version 0 SaslAuthenticateRequest and will receive the existing version 0 SaslAuthenticateResponse.  The broker will disconnect these connections upon session expiration regardless of the fact that the client is an older one.  Such connections will be captured via a metric (described below) to help with migration.

The 'connections.max.reauth.ms' configuration option will not be dynamically changeable; restarts will be required if the value is to be changed.  However, if a new listener is dynamically added, the value could be set for that listener at that time (and the configuration key would be prefixed in that case); this dynamic capability will be addressed as a separate ticket and may not be delivered with the initial KIP implementation.

From a behavior perspective on the client side (including the broker when it is acting as an inter-broker client), when a v2.2.0-or-later SASL client connects to a v2.2.0 or later broker that supports re-authentication, the broker will communicate the session expiration time as part of the final SASL_AUTHENTICATE response.  If this value is positive, then the client will automatically re-authenticate before anything else unrelated to re-authentication is sent beyond that expiration point.  If the re-authentication attempt fails then the connection will be closed by the broker; retries are not supported.  If re-authentication succeeds then any received responses that queued up during re-authentication along with the Send that triggered the re-authentication to occur in the first place will subsequently be able to flow through (back to the client and along to the broker, respectively), and eventually the connection will re-authenticate again, etc.  Note also that the client cannot queue up additional send requests beyond the one that triggers re-authentication to occur until re-authentication succeeds and the triggering one is sent.

From a behavior perspective on the server (broker) side, when the expired-connection-kill feature is enabled with a positive value the broker will communicate a session time via SASL_AUTHENTICATE and will close a connection when the connection is used past the expiration time and the specific API request is not directly related to re-authentication (SaslHandshakeRequest and SaslAuthenticateRequest).  In other words, if a connection sits idle, it will not be closed – something unrelated to re-authentication must traverse the connection before a disconnect will occur.

Metrics documenting re-authentications will be maintained in the Selector code.  SOme will mirror existing metrics that document authentications.  For example: failed-reauthentication-{rate,total} and successful-reauthentication-{rate,total}.  There will also be separate successful-authentication-no-reauth-{rate,total} metrics to indicate the subset of clients that successfully authenticate with a V0 SaslAuthenticateRequest (or no such request, which can happen with very old clients)   These metrics are helpful during migration (see Migration Plan for details) as they will identify if/when all clients are properly upgraded before server-side expired-connection-kill functionality is enabled: the rate metric will be zero across all brokers when it is appropriate to enable the feature, and the total metric will be unchanging (zero after a restart).  There will also be reauthentication-latency-{avg,max} metrics that document the latency imposed by re-authentication.  It is unclear if this latency will be problematic, and the data collected via these metrics may be useful as we consider this issue in the future.

An additional metric ExpiredConnectionsKilledCount will be created and maintained by the server-side Processor code to count the number of such events.  It should remain at zero if all clients and brokers are upgraded to v2.2.0 or later.  If it is non-zero then either an older client is connecting (which would be evidenced in the successful-authentication-no-reauth metrics mentioned above) or a newer client is not re-authenticating correctly (which would indicate a bug).

Proposed Changes

Implementation Overview

The description of this KIP is actually quite straightforward from a behavior perspective – turn the feature on with the configuration option in the broker and it just works.  From an implementation perspective, though, the KIP is not so straightforward; a description of how it works therefore follows below.  Note that this description applies to the implementation only – none of this is part of the public API.

This implementation works at a very low level in the Kafka stack, at the level of the network code.  It is therefore transparent to all clients – it just works with no knowledge or accommodation required on their part.  When a client makes a request to a broker the request is intercepted at the level of the Selector class and a check is done to see if re-authentication is enabled; if it is, and the broker supports re-authentication, then the connection is re-authenticated at that point before the request is allowed to flow through.  The solution is elegant because it re-uses existing code paths while requiring no code changes higher up in the stack.

This KIP transparently adds re-authentication support for all uses, which at this point includes the following:

  • org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient
    • org.apache.kafka.clients.consumer.KafkaConsumer
    • org.apache.kafka.connect.runtime.distributed.WorkerGroupMember
  • kafka.controller.ControllerChannelManager
  • org.apache.kafka.clients.admin.KafkaAdminClient
  • org.apache.kafka.clients.producer.KafkaProducer
  • kafka.coordinator.transaction.TransactionMarkerChannelManager
  • kafka.server.ReplicaFetcherBlockingSend (kafka.server.ReplicaFetcherThread)
  • kafka.admin.AdminClient
  • kafka.tools.ReplicaVerificationTool
  • kafka.server.KafkaServer

  • org.apache.kafka.trogdor.workload.ConnectionStressWorker

The final issue to describe is how/when a KafkaChannel instance (each of which corresponds to a unique network connection) is told to re-authenticate.  Each KafkaChannel instance will remember the session expiration time communicated during (re-)authentication (if any); the code in the Selector class will check to see if that time has passed and will start the re-authentication process if so.

Compatibility, Deprecation, and Migration Plan
Anchor
MigrationPlan
MigrationPlan

With respect to compatibility, there is no impact to existing installations because the default is for the server-side connection kill feature to be turned off, older clients never try to re-authenticate because they don't support it, and newer clients that connect to older brokers will know that the broker does not support re-authentication and will therefore not attempt it.

With respect to migration, the approach would be as follows:

  1. Upgrade all brokers to v2.2.0 or later at whatever rate is desired with 'connections.max.reauth.ms' allowed to default to 0.  If SASL is used for the inter-broker protocol then brokers will check the SASL_AUTHENTICATE API version and use a V1 request when communicating to a broker that has been upgraded to 2.2.0, but the client will see the "0" session max lifetime and will not re-authenticate.  Their connections will not be killed.
  2. In parallel with (1) above, upgrade non-broker clients to v2.2.0 or later at whatever rate is desired.  SASL clients will check the SASL_AUTHENTICATE API version and use a V1 request when communicating to a broker that has been upgraded to 2.2.0, but the client will see the "0" session max lifetime and will not re-authenticate.  Their connections will not be killed.
  3. After (1) and (2) are complete, perform a rolling restart of all brokers and check the metrics successful-authentication-no-reauth-{rate,total} to confirm that they remain at zero.  This gives confidence that (1) and (2) are indeed complete.
  4. Update 'connections.max.reauth.ms' to a positive value and perform a rolling restart of brokers again. 
  5. Monitor the successful-authentication-no-reauth-{rate,total} metrics – they will remain at 0 unless an older client connects to the broker.

Rejected Alternatives

Delaying Support for Brokers Killing Connections

It was initially proposed that we defer adding the ability for brokers to kill connections using expired credentials to a future KIP.  This functionality is actually easier to add than re-authentication, and re-authentication without this feature doesn't really improve security (because it can't be enforced).  Adding the ability to kill connections using an expired bearer token without the ability for the client to re-authenticate also does not make sense as a general feature – it forces the client to essentially "recover" from what looks like a network error on a periodic basis.  So we will implement both features at the same time.

Delaying Support for non-OAUTHBEARER SASL Mechanisms

It was initially proposed that we defer adding the ability for SASL clients to re-authenticate when using a non-OAUTHBEARER mechanism (e.g. PLAIN, GSSAPI, and SCRAM-related).  We were able to identify how all mechanisms could be readily and easily supported.

Creating a Configuration Option to Disable Client-side Re-Authentication

It was initially proposed that we would create a client-side configuration option to disable the use of re-authentication on the client.  This may have been necessary when we were contemplating not including support for non-OAUTHBEARER SASL mechanisms and/or when we had not decided to bump the SASL_AUTHENTICATE version number, but it because unnecessary given these decisions.  There is no need to disable the feature if the client and the server both support it.

Validating the Token Lifetime as Part of Re-Authentication

It was initially proposed that we would have the broker reject (re-)authentications that occurred with a credential having a lifetime longer than the maximum allowed.  This was decided to be unnecessary here because the same thing can be done as part of token validation.

Highest-Level Approach: Inserting Requests into Clients' Queues

The original implementation injected requests directly into both asynchronous and synchronous I/O clients' queues.  This implementation was too high up in the stack and required significantly more work and code than any other solution.  It also was much harder to maintain because it became entwined in the implementation of every client.

High-Level Approach: One-Size-Fits-All With Additional KafkaClient Methods

One implementation injected requests into the KafkaClient via a new method (shown below).  This one-size-fits-all approach worked as long as synchronous I/O clients periodically checked for and sent injected requests related to re-authentication (via a new method, also shown below).  This implementation was harder to maintain than the chosen approach because it crossed existing module boundaries related to security code, networking code, and code higher up in the stack – it imposed requirements (however slight) on code higher up in the stack in order for re-authentication to work.  This violation of existing modularity caused concern.  The code was also 2-3 times bigger (at least) relative to the accepted implementation, and it was incrementally (though not dramatically) more difficult to test.  It did create the possibility of interleaving requests related to re-authentication with requests that the client was otherwise sending, which minimized latency spikes due to re-authentication, but that advantage was difficult to quantify and therefore did not tip the balance in favor of this option.

Code Block
languagejava
titleorg.apache.kafka.clients.KafkaClient additions
    /**
     * Return true if any node has a re-authentication request either enqueued and
     * waiting to be sent or already in-flight. A call to {@link #poll(long, long)}
     * is required to send and receive/process the results of such requests. <b>An
     * owner of this instance that does not implement a run loop to repeatedly call
     * {@link #poll(long, long)} but instead only sends requests synchronously
     * on-demand to a single node must call this method periodically -- and invoke
     * {@link #poll(long, long)} if the return value is {@code true} -- to ensure
     * that any re-authentication requests that have been injected are sent and
     * processed in a timely fashion.</b>
     * <p>
     * Example code to re-authenticate a connection across several
     * requests/responses is as follows:
     * 
     * <pre>
     * // Send multiple requests related to re-authentication in the synchronous
     * // use case, completing the re-authentication exchange.
     * while (kafkaClient.hasReauthenticationRequest())
     *     // Returns an empty list in synchronous use case.
     *     kafkaClient.poll(Long.MAX_VALUE, time.milliseconds());
     * // The connection is ready for use, and if there originally was a
     * // re-authentication request then as many requests as required to
     * // complete the exchange have been sent.
     * </pre>
     * 
     * Alternatively, to only send one re-authentication request and receive its
     * response (which allows us to interleave other requests to the single node to
     * which we are connected before subsequent requests related to the multi-step
     * re-authentication exchange are sent):
     * 
     * <pre>
     * // Send a single request related to re-authentication in the synchronous
     * // use case, potentially (but not necessarily) completing the
     * // re-authentication exchange.
     * while (kafkaClient.hasReauthenticationRequest()) {
     *     // Returns an empty list in synchronous use case.
     *     kafkaClient.poll(Long.MAX_VALUE, time.milliseconds());
     *     if (!kafkaClient.hasInFlightRequests())
     *         break; // Response has been received.
     * }
     * // The connection is ready for use, and if there was a
     * // re-authentication request then either the exchange is finished or
     * // there is another re-authentication request available to be sent.
     * </pre>
     * 
     * @return if any node has a re-authentication request either enqueued and
     *         waiting to be sent or already in-flight
     * @see #enqueueAuthenticationRequest(ClientRequest)
     */
    default boolean hasReauthenticationRequest() {
        return false;
    }


    /**
     * Enqueue the given request related to re-authenticating a connection. This
     * method is guaranteed to be thread-safe even if the class implementing this
     * interface is generally not.
     * 
     * @param clientRequest
     *            the request to enqueue
     * @see #hasReauthenticationRequest()
     */
    default void enqueueAuthenticationRequest(ClientRequest clientRequest) {
        // empty
    }

Adding an ExpiringCredential Public API

It was initially proposed that we make an existing, non-public ExpiringCredential interface part of the public API and leverage the background login refresh thread's refresh event to kick-start re-authentication on the client side for the refreshed credential.  This is unnecessary due to a couple of factors.  First, the server (broker) indicates to the client what the expiration time is, and the low-level mechanism we have chosen on the client side can insert itself into the flow at the correct time – it does not need an external mechanism; and second, the server will chose the token expiration time as the session expiration time if does not exceed the maximum allowable value, which means the refresh thread on the client side will have already refreshed the token (or, if it hasn't, the client can't make new connections anyway).  We had at one time considered that the server rejecting tokens whose remaining lifetime exceeds the maximum allowable session time was a third factor, but that functionality was rejected because it can be done as part of token validation as mentioned above.

Authenticating a Separate Connection and Transferring Credentials

One alternative idea is to add two new request types: "ReceiveReauthenticationNonceRequest" and "ReauthenticateWithNonceRequest".  When re-authentication needs to occur the client would make a separate, new connection to the broker and send a "ReceiveReauthenticationNonceRequest" to the broker to have it associate a nonce with the authenticated credentials and return the nonce to the client.  Then the client would send a "ReauthenticateWithNonceRequest" with the returned nonce over the connection that it wishes to re-authenticate; the broker would then replace the credentials on that connection with the credentials it had previously associated with the nonce.  I don't know if this would work (might there be some issue with advertised vs. actual addresses and maybe the possibility of there being a load balancer?  Could we be guaranteed the ability to connect to the exact same broker as our existing connection?) . If it could work then it does have the advantage of requiring the injection of just a single request over an existing connection that would return very quickly rather than 3 separate requests of which at least one might take a while to return (to potentially retrieve a public key for token signature validation, for example; the validation itself isn't exactly free, either, even if the public key is already cached).  One disadvantage of the alternative, nonce-based approach is that it requires the creation of a separate connection, including TLS negotiation, and that is very expensive compared to sending 3 requests over an existing connection (which of course already has TLS negotiated).

Brute-Force Client-Side Kill

A brute-force alternative is to simply kill the connection on the client side when the background login thread refreshes the credential.  The advantage is that we don't need a code path for re-authentication – the client simply connects again to replace the connection that was killed.  There are many disadvantages, though.  The approach is harsh – having connections pulled out from underneath the client will introduce latency while the client reconnects; it introduces non-trivial resource utilization on both the client and server as TLS is renegotiated; and it forces the client to periodically "recover" from what essentially looks like a failure scenario.  While these are significant disadvantages, the most significant disadvantage of all is that killing connections on the client side adds no security – trusting the client to kill its connection in a timely fashion is a blind and unjustifiable trust.

Brute-Force Server-Side Kill

We could kill the connection from the server side instead, when the token expires.  But in this case, if there is no ability for the client to re-authenticate to avoid the killing of the connection in the first place, then we still have all of the harsh approach disadvantages mentioned aboveOne alternative idea is to add two new request types: "ReceiveReauthenticationNonceRequest" and "ReauthenticateWithNonceRequest".  When re-authentication needs to occur the client would make a separate, new connection to the broker and send a "ReceiveReauthenticationNonceRequest" to the broker to have it associate a nonce with the authenticated credentials and return the nonce to the client.  Then the client would send a "ReauthenticateWithNonceRequest" with the returned nonce over the connection that it wishes to re-authenticate; the broker would then replace the credentials on that connection with the credentials it had previously associated with the nonce.  I don't know if this would work (might there be some issue with advertised vs. actual addresses and maybe the possibility of there being a load balancer?  Could we be guaranteed the ability to connect to the exact same broker as our existing connection?) . If it could work then it does have the advantage of requiring the injection of just a single request over an existing connection that would return very quickly rather than 3 separate requests of which at least one might take a while to return (to potentially retrieve a public key for token signature validation, for example; the validation itself isn't exactly free, either, even if the public key is already cached).  One disadvantage of the alternative, nonce-based approach is that it requires the creation of a separate connection, including TLS negotiation, and that is very expensive compared to sending 3 requests over an existing connection (which of course already has TLS negotiated).