Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Changes to address all discussion to date

...

This KIP proposes the addition of a single interface to the API and a single two additional configuration option options to enable the client-side re-authentication and server-side expired-connection-kill features (the both option value defaults to false, of course, so there is no change to existing behavior in the absence of an explicit opt-inins).  Specifically, the interface it proposes to add is as followsThis KIP also proposes bumping the version number for the SASL_HANDSHAKE API and adjusting the wire format so that clients can determine if the broker they are connected to supports re-authentication and they can learn what maximum credential lifetime the broker supports (see below for details).  It also adds new metrics as described below.

The interface this KIP proposes to add is as follows:

Code Block
languagejava
titleorg.apache.kafka.common.security.expiring.ExpiringCredential
collapsetrue
/**
 * A credential that expires and that can potentially be refreshed; such a
 * refreshed credential can also potentially be used to re-authenticate an
 * existing connection.
 * <p>
 * The parameters that impact how the refresh algorithm operates are specified
 * as part of the producer/consumer/broker configuration and are as follows. See
 * the documentation for these properties elsewhere for details.
 * <table>
 * <tr>
 * <th>Producer/Consumer/Broker Configuration Property</th>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.window.factor}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.window.jitter}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.min.period.seconds}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.min.buffer.seconds}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.reauthenticate.enable}</td>
 * </tr>
 * </table>
 * <p>
 * This interface was introduced in 2.1.0 and, while it feels stable, it could
 * evolve. We will try to evolve the API in a compatible manner, but we reserve
 * the right to make breaking changes in minor releases, if necessary. We will
 * update the {@code InterfaceStability} annotation and this notice once the API
 * is considered stable.
 * 
 * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_REAUTHENTICATE_ENABLE_DOC
 */
@InterfaceStability.Evolving
public interface ExpiringCredential {
    /**
     * The value to use when asking a {@code SaslClient} instance for the
     * {@code ExpiringCredential} that was used to authenticate, if any
     */
    public static final String SASL_CLIENT_NEGOTIATED_PROPERTY_NAME = "Expiring.Credential";

    /**
     * The name of the principal to which this credential applies (used only for
     * logging)
     * 
     * @return the always non-null/non-empty principal name
     */
    String principalName();

    /**
     * When the credential became valid, in terms of the number of milliseconds
     * since the epoch, if known, otherwise null. An expiring credential may not
     * necessarily indicate when it was created -- just when it expires -- so we
     * need to support a null return value here.
     * 
     * @return the time when the credential became valid, in terms of the number of
     *         milliseconds since the epoch, if known, otherwise null
     */
    Long startTimeMs();

    /**
     * When the credential expires, in terms of the number of milliseconds since the
     * epoch. All expiring credentials by definition must indicate their expiration
     * time -- thus, unlike other methods, we do not support a null return value
     * here.
     * 
     * @return the time when the credential expires, in terms of the number of
     *         milliseconds since the epoch
     */
    long expireTimeMs();

    /**
     * The point after which the credential can no longer be refreshed, in terms of
     * the number of milliseconds since the epoch, if any, otherwise null. Some
     * expiring credentials can be refreshed over and over again without limit, so
     * we support a null return value here.
     * 
     * @return the point after which the credential can no longer be refreshed, in
     *         terms of the number of milliseconds since the epoch, if any,
     *         otherwise null
     */
    Long absoluteLastRefreshTimeMs();
}

The configuration option this KIP proposes to add to enable client-side re-authentication is 'sasl.login.refresh.reauthenticate.enable' – it defaults to false, and when explicitly set to true in the context of a client (including a broker when it acts as an inter-broker client) the client-side re-authentication feature will be enabled for any SASL connection that uses a private credential implementing the above ExpiringCredential interface (this will be the case with OAUTHBEARER by default because the Login implementation that is used by default for OAUTHBEARER will be amended as part of this KIP to automatically wrap any OAuthBearerToken instance to make it implement ExpiringCredential if it does not already do so).  When the configuration option is set to true in the context of a broker (i.e. on the server side of the connection) then connections leveraging SASL/OAUTHBEARER will be closed after the token expires as described below.  It will typically make sense to ensure that the configuration option is the same on both the client and server sides of connections (i.e. both false or both true).  The option will not be dynamically changeable.

From a behavior perspective on the client side (again, including the broker when it is acting as an inter-broker client), when the option is enabled and a private credential implements ExpiringCredential, the existing configuration options sasl.login.refresh.{window.factor, window.jitter, min.period.seconds, min.buffer.seconds} define when the org.apache.kafka.common.security.auth.Login implementation on the client must generate a new private credential (e.g. when it must retrieve a new bearer token).  The default configuration values result in a new credential being retrieved sometime between 75% and 85% of the current credential's lifetime (e.g. sometime between 45 and 51 minutes after a token with an hour lifetime is initially retrieved).  At that time, when the new token is retrieved, every connection that authenticated with the old credential will be told to re-authenticate using the new credential.  After successful re-authentication, when the new credential is again refreshed (after another 45-51 minutes), the connections will be told once again to re-authenticate, etc.

From a behavior perspective on the server (broker) side, when the option is enabled the broker will close a connection authenticated via SASL/OAUTHBEARER when the connection is used past the time when the token has expired and the specific API request is not directly related to re-authentication (ApiVersionsRequest, SaslHandshakeRequest, and SaslAuthenticateRequest).  In other words, if a connection sits idle, it will not be closed – something unrelated to re-authentication must traverse the connection before a disconnect will occur.

If a re-authentication attempt should fail then the broker will not close the connection.  The client will be told to retry after some delay depending on how many retries have been attempted so far: after some small amount of time for the first retry (e.g. 1 minute), double that for the second retry, and 4 times the initial delay for every retry thereafter.  Retry attempts generally occur at least until the current credential expires (but not indefinitely – and of course they won't continue if one of them actually succeeds).  There are certain errors that result in retries not being attempted (i.e. if some internal state doesn't make sense, which generally should not happen).  A retry is helpful when re-authentication fails on the server side due to some temporary outage (for example, the re-authentication will fail through no fault of the client if the token endpoint is unavailable and the broker has not yet cached the public key required to validate the token signature).

Metrics documenting re-authentications will be maintained.  They will mirror existing metrics that document authentications.  For example: failed-reauthentication-{rate,total} and successful-reauthentication-{rate,total}.

Although there is no technical reason preventing it, we arbitrarily decide to disallow changing identities upon re-authentication.  For example, if a connection originally authenticates as User:user1, an attempt to re-authenticate as anything else (e.g. User:user2) will fail.  Retry is allowed in this case (still subject to the expiration of the original credential as described above).

Delaying the return of a response upon failed re-authentication attempts (a la KIP-306: Configuration for Delaying Response to Failed Authentication) is not necessary since re-authentication attempts do not require TLS negotiation and are therefore considerably cheaper than initial authentication attempts.

Note that the class that implements the refresh logic on the client side (org.apache.kafka.common.security.expiring.internals.ExpiringCredentialRefreshingLogin) is not considered part of the public API.  This means that while it is up to the org.apache.kafka.common.security.auth.Login implementation for a particular mechanism to implement the logic, and that implementation can delegate to ExpiringCredentialRefreshingLogin to do so (as org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin does), this is something that could only be be done in a supported way for the built-in SASL mechanisms (e.g. PLAIN, SCRAM-related, and GSSAPI).  There is no intent to formally support for non-builtin mechanisms an ability to generate credentials that can be refreshed and re-authenticated – nothing would prevent anyone from implementing such mechanisms and delegating to the class, of course, but it would be an unsupported use of an internal class that is not a public API.

Proposed Changes

The description of this KIP is actually quite straightforward from a functionality perspective – turn the feature on with the configuration option in both the client and the broker and it just works for OAUTHBEARER; use a custom LoginModule for other mechanisms to create credentials implementing ExpiringCredential and delegate to org.apache.kafka.common.security.expiring.internals.ExpiringCredentialRefreshingLogin and it can be made to work on the client for those mechanisms, too (see "How To Support Re-Authentication or Other SASL Mechanisms" for details).  From an implementation perspective, though, the KIP is not so straightforward; it therefore includes a pull request with a proposed implementation.  Here is a high-level description of how the proposed implementation generally works.  Note that this description applies to the implementation only – none of this is part of the public API.

First, at a high level, we recognize that org.apache.kafka.clients.NetworkClient can be used in either an asynchronous or synchronous manner; in async mode the NetworkClient.poll() method is repeatedly called directly in a run loop, whereas in synchronous mode the NetworkClientUtils.sendAndReceive() method is used whenever something has to be sent (this method in turn calls NetworkClient.poll(), but otherwise poll() is not invoked).  The fact that these two separate use cases exist is important because the requests related to re-authentication (ApiVersionsRequest, SaslHandshakeRequest, and SaslAuthenticateRequest) cannot simply be injected into a queue that NetworkClient owns; we don't know when poll() will be called in the synchronous case, for example – it is only invoked when something needs to be sent, and that might take a while (if it occurs at all).  So at first glance a single one-size-fits-all approach at the level of NetworkClient won't work.

We can actually make a one-size-fits-all approach work if we expose an ability for synchronous I/O clients to manually cause requests related to re-authentication to be sent.  These clients implement a run loop – they just don't call poll() repeatedly.  We can add methods to the org.apache.kafka.clients.KafkaClient interface (which is not part of the public API) as shown below, and a synchronous I/O client can call hasAuthenticationRequest() periodically as part of their run loop regardless of whether they have anything to send to their destination node or not. The enqueueAuthenticationRequest() method that we add is leveraged by the SaslClientAuthenticator instance to submit requests and move the re-authentication process forward.

Code Block
languagejava
titleorg.apache.kafka.clients.KafkaClient additions
    /**
     * Return true if any node has a re-authentication request either enqueued and
     * waiting to be sent or already in-flight. A call to {@link #poll(long, long)}
     * is required to send and receive/process the results of such requests. <b>An
     * owner of this instance that does not implement a run loop to repeatedly call
     * {@link #poll(long, long)} but instead only sends requests synchronously
     * on-demand to a single node must call this method periodically -- and invoke
     * {@link #poll(long, long)} if the return value is {@code true} -- to ensure
     * that any re-authentication requests that have been injected are sent and
     * processed in a timely fashion.</b>
     * <p>
     * Example code to re-authenticate a connection across several
     * requests/responses is as follows:
     * 
     * <pre>
     * // Send multiple requests related to re-authentication in the synchronous
     * // use case, completing the re-authentication exchange.
     * while (kafkaClient.hasReauthenticationRequest())
     *     // Returns an empty list in synchronous use case.
     *     kafkaClient.poll(Long.MAX_VALUE, time.milliseconds());
     * // The connection is ready for use, and if there originally was a
     * // re-authentication request then as many requests as required to
     * // complete the exchange have been sent.
     * </pre>
     * 
     * Alternatively, to only send one re-authentication request and receive its
     * response (which allows us to interleave other requests to the single node to
     * which we are connected before subsequent requests related to the multi-step
     * re-authentication exchange are sent):
     * 
     * <pre>
     * // Send a single request related to re-authentication in the synchronous
     * // use case, potentially (but not necessarily) completing the
     * // re-authentication exchange.
     * while (kafkaClient.hasReauthenticationRequest()) {
     *     // Returns an empty list in synchronous use case.
     *     kafkaClient.poll(Long.MAX_VALUE, time.milliseconds());
     *     if (!kafkaClient.hasInFlightRequests())
     *         break; // Response has been received.
     * }
     * // The connection is ready for use, and if there was a
     * // re-authentication request then either the exchange is finished or
     * // there is another re-authentication request available to be sent.
     * </pre>
     * 
     * @return if any node has a re-authentication request either enqueued and
     *         waiting to be sent or already in-flight
     * @see #enqueueAuthenticationRequest(ClientRequest)
     */
    default boolean hasReauthenticationRequest() {
        return false;
    }


    /**
     * Enqueue the given request related to re-authenticating a connection. This
     * method is guaranteed to be thread-safe even if the class implementing this
     * interface is generally not.
     * 
     * @param clientRequest
     *            the request to enqueue
     * @see #hasReauthenticationRequest()
     */
    default void enqueueAuthenticationRequest(ClientRequest clientRequest) {
        // empty
    }

It becomes relatively easy to add re-authentication support: we have to identify the KafkaClient instance to the SaslChannelBuilder, which in turn provides it to the SaslClientAuthenticator.  Asynchronous use cases simply work at that point; for synchronous I/O use cases we just have to insert periodic calls to hasReauthenticationRequest() in the run loop, which is not difficult to do (and there are currently only 2 such cases).

This KIP adds re-authentication support for the following:

As mentioned above, the SASL_HANDSHAKE API will have its version number bumped (and its wire format changed as described below) so that any client with the above configuration value set to true will not try to re-authenticate to a broker that has not been upgraded to the necessary version and therefore does not support responding to such requests.

The configuration option this KIP proposes to add to enable server-side expired-connection-kill is 'connections.max.reauth.ms' – it must be prefixed with listener prefix and SASL mechanism name in lower-case. For example, "sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000".  The value defaults to 0.  When explicitly set to a non-zero value the server will reject any authentication or re-authentication attempt from a client that presents a bearer token whose lifetime exceeds the time at which the (re-)authentication occurs plus a number of milliseconds equal to the absolute value of the configured value (for example, the token lifetime must not exceed one hour if the configured value is either -3600000 or +3600000).  When explicitly set to a positive number the server will disconnect any connection that does not re-authenticate and subsequently uses the connection for any purpose other than re-authentication at any point beyond the expiration point.  For example, if the configured value is 3600000 and the remaining token lifetime at the time of authentication is 45 minutes, the server would kill the connection if it is not re-authenticated within 45 minutes and it is then actively used for anything other than re-authentication.  As a further example, if the configured value is 3600000 and the mechanism is not OAUTHBEARER then the server would kill the connection as soon as it is used more than 1 hour after the time of (re-)authentication (assuming the mechanism does not support re-authentication, which it will not when this KIP is first delivered – but it could be added at a later time as described below in "How To Support Re-Authentication for Other SASL Mechanisms").

We support positive and negative values to facilitate migration (see Migration Plan for details).

Neither of the above configuration options will be dynamically changeable, so restarts will be required if the value is to be changed.

From a behavior perspective on the client side (again, including the broker when it is acting as an inter-broker client), when the client-side re-authentication option is enabled and a private credential implements ExpiringCredential, the existing configuration options sasl.login.refresh.{window.factor, window.jitter, min.period.seconds, min.buffer.seconds} define when the org.apache.kafka.common.security.auth.Login implementation on the client must generate a new private credential (e.g. when it must retrieve a new bearer token).  The default configuration values result in a new credential being retrieved sometime between 75% and 85% of the current credential's lifetime (e.g. sometime between 45 and 51 minutes after a token with an hour lifetime is initially retrieved).  At that time, when the new credential is retrieved, every connection that authenticated with the old credential and is connected to a broker that supports authentication will be told to re-authenticate.  Re-authentication will begin as soon as the connection is used.  If the re-authentication attempt fails then the connection will be closed by the broker; retries are not supported.  If re-authentication succeeds then any requests that queued up during re-authentication will subsequently be able to flow through, and eventually the connection will be told to re-authenticate again, etc.

From a behavior perspective on the server (broker) side, when the broker-side expired-connection-kill feature is fully enabled with a positive value the broker will close a connection authenticated via the indicated SASL mechanism when the connection is used past the expiration time and the specific API request is not directly related to re-authentication (ApiVersionsRequest, SaslHandshakeRequest, and SaslAuthenticateRequest).  In other words, if a connection sits idle, it will not be closed – something unrelated to re-authentication must traverse the connection before a disconnect will occur.

Metrics documenting re-authentications will be maintained.  They will mirror existing metrics that document authentications.  For example: failed-reauthentication-{rate,total} and successful-reauthentication-{rate,total}.

A broker metric will be created that documents the number of API requests unrelated to re-authentication that are made over a connection that is considered expired.  This metric may be non-zero only when the configuration value is negative.  It helps operators ensure that all clients are properly upgraded and re-authenticating before fully turning on server-side expired-connection-kill functionality (by changing the negative configuration value to its absolute value): the metric will be unchanging across all brokers when it is safe to fully enable the feature with the absolute value.

A broker metric will be created that documents the number connections killed by the server-side expired-connection-kill functionality.  This metric may be non-zero only when the configuration value is positive, and it indicates that a client is connecting to the broker with re-authentication either unavailable (i.e. an older client) or disabled.

A client metric will be created that documents the latency imposed by re-authentication.  It is unclear if this latency will be problematic, and the data collected via this metric may be useful as we consider this issue in the future.

Note that the class that implements the refresh logic on the client side (org.apache.kafka.common.security.expiring.internals.ExpiringCredentialRefreshingLogin) is not considered part of the public API.  This means that while it is up to the org.apache.kafka.common.security.auth.Login implementation for a particular mechanism to implement the logic, and that implementation can delegate to ExpiringCredentialRefreshingLogin to do so (as org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin does), this is something that could only be be done in a supported way for the built-in SASL mechanisms (e.g. PLAIN, SCRAM-related, and GSSAPI).  There is no intent to formally support an ability for non-builtin mechanisms to generate credentials that can be refreshed and re-authenticated – nothing would prevent anyone from implementing such mechanisms and delegating to the class, of course, but it would be an unsupported use of an internal class that is not part of the public API.

Proposed Changes

Implementation Overview

The description of this KIP is actually quite straightforward from a behavior perspective – turn the feature on with the configuration options in both the client and the broker and it just works for OAUTHBEARER; use a custom LoginModule for other mechanisms to create credentials implementing ExpiringCredential and delegate to org.apache.kafka.common.security.expiring.internals.ExpiringCredentialRefreshingLogin and it can be made to work on the client for those mechanisms, too (see "How To Support Re-Authentication for Other SASL Mechanisms" for details).  From an implementation perspective, though, the KIP is not so straightforward; a description of how it works therefore follows below.  Note that this description applies to the implementation only – none of this is part of the public API.

This implementation works at a very low level in the Kafka stack, at the level of the network code.  It is therefore transparent to all clients – it just works with no knowledge or accommodation required on their part.  When a client makes a request to a broker the request is intercepted at the level of the Selector class and a check is done to see if re-authentication is enabled; if it is, and the broker supports re-authentication, then the connection is re-authenticated at that point before the request is allowed to flow through.  The solution is elegant because it re-uses existing code paths.


This KIP transparently adds re-authentication support for all uses, which at this point includes the following:

  • org.apache.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient
    • org.apache.kafka.clients.consumer.KafkaConsumer
    • org.apache.kafka.connect.runtime.distributed.WorkerGroupMember
  • kafka.controller.ControllerChannelManager
  • org.apache.kafka.clients.admin.KafkaAdminClient
  • org.apache.kafka.clients.producer.KafkaProducer
  • kafka.coordinator.transaction.TransactionMarkerChannelManager
  • kafka.server.ReplicaFetcherBlockingSend (kafka.server.ReplicaFetcherThread)

...

  • ReplicaFetcherBlockingSend (kafka.server.ReplicaFetcherThread)
  • kafka.admin.AdminClient (this class is deprecated)
  • kafka.tools.ReplicaVerificationTool (an edge case; no need to support the option here)
  • kafka.server.KafkaServer

  • org.apache.kafka.trogdor.workload.ConnectionStressWorker

The final issue to describe is how/when a KafkaChannel instance (

...

each of which corresponds to a unique network connection) is told to re-authenticate.

We define the class org.apache.kafka.common.security.

...

expiring.

...

The PR does not yet have unit or integration tests – I will add these if/when the general implementation approach is verified by the community as being appropriate.  I have stepped through the code in the debugger, and I can see from this exercise as well as from the emitted log messages during normal operation that the implementation does in fact work (both for re-authentication and server-side kill of credential-expired connections).  For example, connections are repeatedly re-authenticated about once a minute with this JAAS configuration if we turn the feature on at the client:

Code Block
languagetext
titleJAAS Config: Re-Authenticate About Once a Minute
KafkaClient {
  org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
    unsecuredLoginLifetimeSeconds="60"
    unsecuredLoginStringClaim_sub="admin";
};

If we don't turn the feature on at the client but we do turn it on at the broker then console consumer connections are disconnected every minute and are automatically reconnected once the credential refreshes on the client; a console producer is disconnected only at the point where it tries to publish with an expired credential; it also reconnects and publishes once the client credential is refreshed.

The final issue to describe is how/when a KafkaChannel instance (each of which corresponds to a unique network connection) is told to re-authenticate.

We define the class org.apache.kafka.common.security.expiring.internals.ClientChannelCredentialTracker to keep track of the various KafkaChannel instances and the ExpiringCredential instances that they authenticated with.  The following code in SaslChannelBuilder adds an instance of ClientChannelCredentialTracker to the private credentials of the Subject associated with the SASL mechanism when the feature is enabled:

Code Block
languagejava
titleSaslChannelBuilder Code to Enable the Feature
LoginManager loginManager = LoginManager.acquireLoginManager(entry.getValue(), mechanism, defaultLoginClass, configs);
loginManagers.put(mechanism, loginManager);
Subject subject = loginManager.subject();
if (mode == Mode.CLIENT) {
    if (saslLoginRefreshReauthenticateEnable()) {
        log.info("SASL Login Refresh Re-Authenticate ENABLED");
        if (subject.getPrivateCredentials(ClientChannelCredentialTracker.class).isEmpty())
            subject.getPrivateCredentials().add(new ClientChannelCredentialTracker());
    } else
        log.info("SASL Login Refresh Re-Authenticate DISABLED");
}

The KafkaChannel and ExpiringCredentialRefreshingLogin classes can easily retrieve the ClientChannelCredentialTracker instance and tell it when various events of interest occur (a channel is initially authenticated, a credential is refreshed, a channel is disconnected).  The events are enqueued as they are reported, and a background thread processes them to decide which KafkaChannel instances need to re-authenticate.  There is a method KafkaChannel.initiateReauthentication(Time) that instantiates an instance of SaslClientAuthenticator and tells it to re-authenticate via the method SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver).  The implementation of AuthenticationSuccessOrFailureReceiver that is sent takes care of cleaning up the state in KafkaChannel related to the multi-step re-authentication process and notifying the ClientChannelCredentialTracker instance of the result; it looks something like this:

Code Block
languagejava
titleAuthenticationSuccessOrFailureReceiver
new AuthenticationSuccessOrFailureReceiver() {
    @Override
    public void reauthenticationFailed(RetryIndication retry, String errorMessage) {
        log.warn(errorMessage);
        Utils.closeQuietly(notYetAuthenticatedAuthenticator, "notYetAuthenticatedAuthenticator");
        notYetAuthenticatedAuthenticator = null;
        clientChannelCredentialTracker().offer(ClientChannelCredentialEvent
                .channelFailedReauthentication(time, KafkaChannel.this, errorMessage, retry));
    }

    @Override
    public void reauthenticationSucceeded() {
        clientChannelCredentialTracker().offer(ClientChannelCredentialEvent
                .channelReauthenticated(time, KafkaChannel.this, notYetAuthenticatedSaslClientAuthenticator
                    .expiringCredential());
        replaceAnyPreviouslyAuthenticatedAuthenticatorWithNewOne();
        return;
    }

    private void replaceAnyPreviouslyAuthenticatedAuthenticatorWithNewOne() {
        // swap the old authenticator (if any) for the new one
        Authenticator tmp = authenticatedAuthenticator;
        authenticatedAuthenticator = notYetAuthenticatedAuthenticator;
        notYetAuthenticatedAuthenticator = null;
        if (tmp != null)
            Utils.closeQuietly(tmp, "previousAuthenticatedAuthenticator");
    }
}

When the KafkaChannel instance invokes the SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver) method it is up to the SaslClientAuthenticator instance to use its KafkaClient instance to enqueue an ApiVersionsRequest along with an appropriate callback that either fails the re-authentication if the ApiVersionsRequest fails or continues it with a SaslHandshakeRequest.  Here is what the code looks like:

Code Block
languagejava
titleSaslClientAuthenticator.initiateReauthentication()
    public void initiateReauthentication(Time time,
            AuthenticationSuccessOrFailureReceiver authenticationSuccessOrFailureReceiver) {
        if (saslState != SaslState.SEND_APIVERSIONS_REQUEST) {
            notifyFailureDueToUnexpectedState(SaslState.SEND_APIVERSIONS_REQUEST,
                    authenticationSuccessOrFailureReceiver)
            return;
        }
        kafkaClientSupplier.get().enqueueAuthenticationRequest(kafkaClient.newClientRequest(this.node,
                new ApiVersionsRequest.Builder((short) 0), time.milliseconds(), true, 1000,
                completionHandlerForApiVersionsRequest(time, authenticationSuccessOrFailureReceiver)));
    }

    private void notifyFailureDueToUnexpectedState(SaslState expectedSaslState,
            AuthenticationSuccessOrFailureReceiver authenticationSuccessOrFailureReceiver) {
        // it is possible that the connection was closed; don't retry
        SaslState receivedSaslState = saslState;
        if (saslState != SaslState.CLOSED)
            saslState = SaslState.FAILED;
        authenticationSuccessOrFailureReceiver.reauthenticationFailed(RetryIndication.DO_NOT_RETRY, String.format(
                "Re-authentication was in process but could not proceed because authenticator state is incorrect (expected %s): %s",
                expectedSaslState, receivedSaslState));
    }

And of course the method completionHandlerForApiVersionsRequest(Time, AuthenticationSuccessOrFailureReceiver) looks similar but deals with sending a SaslHandshakeRequest (see the PR for details).

Finally, we add methods on SaslServerAuthenticator to respond to the requests that arrive related to re-authentication – specifically, respondToReauthenticationSaslHandshakeRequest() and respondToReauthenticationSaslAuthenticateRequest() – and we add code to kafka.server.KafkaApis to route the received requests accordingly (as opposed to responding with an error, which is what currently happens).  The kafka.server.KafkaApis class is also where we check for expired connections.  I won't go into the details here; see the PR for the code.

...

It was mentioned above that the design supports adding re-authentication to PLAIN, GSSAPI, and SCRAM-related SASL mechanisms but that doing so is out of scope for this KIP.  It is helpful to describe how this could be done in a bit more detail.

There are two requirements: the credential that the Login mechanism adds to the Subject's private credentials on the client side must implement the ExpiringCredential interface; and the client and server sides have to agree on the expiration time of each credential so that the client will know when to refresh the credential and the server will know when the presented credential expires (assuming we implement the ability for a server to close "expired" connections).

Implementing the ExpiringCredential interface is pretty easily done by wrapping the data that the LoginModule currently creates and adjusting the SASL Client callback handler accordingly.  For example, the org.apache.kafka.common.security.plain.PlainLoginModule class currently adds a String to the private credentials for the password and a String to the public credentials for the username; it would instead store an instance of a class that implements ExpiringCredential and includes an additional password() method, and the org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler class would be amended to pull the data from that object if it exists (rather than from the pair of String values, which it would fall back to if no ExpiringCredential existed).  Similar adjustments could be done for the SCRAM-related mechanisms as well.  The GSSAPI mechanism would be a bit different because we don't control all the code (we reuse an existing LoginModule implementation, for example); in such cases we would simply use our own implementation that delegates to existing one and that augments where necessary (e.g. in the commit() method of the LoginModule implementation).

Getting the client and server to agree on a credential expiration time may sometimes be a challenge.  It is easy with OAUTHBEARER because bearer tokens advertise their expiration times (either as part of the JWT or via a call to the token endpoint).  GSSAPI also is likely to be straightforward because Kerberos tickets have an expiration time.  What about PLAIN and the SCRAM-related SASL mechanisms?  These present challenges.  The SCRAM-related SASL mechanisms support SASL extensions, so we could leverage this to have the client send an agreed-upon name/value pair to the server indicating the credential lifetime it desires; the server could fail the authentication if the requested lifetime exceeds its maximum allowed value, but otherwise it would accept the lifetime that the client wants and proceed as it does today. The PLAIN SASL mechanism does not support extensions, so we would have to come up with another option (which we could also use for the SCRAM-related mechanisms if we wanted to be consistent).  We could add a new API call so that clients could ask servers for the lifetime they use, or we could extend the SaslHandshakeRequest/Response API call to include that information in the server's response – the client would then adopt that value.  Or we could just configure the values on the client and server and require the configurations to be consistent with each other (i.e. the client-side configuration value must not exceed the server-side configuration value).

Getting the client and server to agree on the credential lifetime is the challenge; discussing the options, documenting the solution, and implementing it would all be done as part of a KIP.

Compatibility, Deprecation, and Migration Plan

There is no impact to existing installations because the default is for the feature to be turned off.

Rejected Alternatives

Delaying Support for Brokers Killing Connections

It was initially proposed that we defer adding the ability for brokers to kill connections using expired credentials to a future KIP.  This functionality is actually easier to add than re-authentication, and re-authentication without this feature doesn't really improve security (because it can't be enforced).  Adding the ability to kill connections using an expired bearer token without the ability for the client to re-authenticate also does not make sense as a general feature – it forces the client to essentially "recover" from what looks like a network error on a periodic basis.  So we will implement both features at the same time.

Inserting Requests into Clients' Queues

The original PR injected requests directly into both asynchronous and synchronous I/O clients' queues.  This implementation was too high up in the stack and required significantly more work and code than any other solution.  It also was much harder to maintain because it became entwined in the implementation of every client.

Low-Level Approach

...

internals.ClientChannelCredentialTracker to keep track of the various KafkaChannel instances and the ExpiringCredential instances that they authenticated with.  The following code in SaslChannelBuilder adds an instance of ClientChannelCredentialTracker to the private credentials of the Subject associated with the SASL mechanism when the feature is enabled:

Code Block
languagejava
titleSaslChannelBuilder Code to Enable the Feature
LoginManager loginManager = LoginManager.acquireLoginManager(entry.getValue(), mechanism, defaultLoginClass, configs);
loginManagers.put(mechanism, loginManager);
Subject subject = loginManager.subject();
if (mode == Mode.CLIENT) {
    if (saslLoginRefreshReauthenticateEnable()) {
        log.info("SASL Login Refresh Re-Authenticate ENABLED");
        if (subject.getPrivateCredentials(ClientChannelCredentialTracker.class).isEmpty())
            subject.getPrivateCredentials().add(new ClientChannelCredentialTracker());
    } else
        log.info("SASL Login Refresh Re-Authenticate DISABLED");
}

The KafkaChannel and ExpiringCredentialRefreshingLogin classes can easily retrieve the ClientChannelCredentialTracker instance and tell it when various events of interest occur (a channel is initially authenticated, a credential is refreshed, a channel successfully re-authenticates, a channel is disconnected).  The events are enqueued as they are reported, and a background thread processes them to decide which KafkaChannel instances need to re-authenticate.  There is a method KafkaChannel.readyForReauthentication() that marks the instance as requiring re-authentication so the Selector will know.

How To Support Re-Authentication for Other SASL Mechanisms
Anchor
HowTo
HowTo

It was mentioned above that the design supports adding re-authentication to PLAIN, GSSAPI, and SCRAM-related SASL mechanisms but that doing so is out of scope for this KIP.  It is helpful to describe how this could be done in a bit more detail.

There are two requirements: the credential that the Login mechanism adds to the Subject's private credentials on the client side must implement the ExpiringCredential interface; and the client and server sides have to agree on the expiration time of each credential so that the client will know when to refresh the credential and the server will know when the presented credential expires.  The new broker configuration 'connections.max.reauth.ms' takes care of the second requirement.

Implementing the ExpiringCredential interface is pretty easily done by wrapping the data that the LoginModule currently creates and adjusting the SASL Client callback handler accordingly.  For example, the org.apache.kafka.common.security.plain.PlainLoginModule class currently adds a String to the private credentials for the password and a String to the public credentials for the username; it would instead store an instance of a class that implements ExpiringCredential and includes an additional password() method, and the org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler class would be amended to pull the data from that object if it exists (rather than from the pair of String values, which it would fall back to if no ExpiringCredential existed).  Similar adjustments could be done for the SCRAM-related mechanisms as well.  The GSSAPI mechanism would be a bit different because we don't control all the code (we reuse an existing LoginModule implementation, for example); in such cases we would simply use our own implementation that delegates to existing one and that augments where necessary (e.g. in the commit() method of the LoginModule implementation).

Compatibility, Deprecation, and Migration Plan
Anchor
MigrationPlan
MigrationPlan

With respect to compatibility, there is no impact to existing installations because the default is for the feature to be completely turned off on both the client and server.

With respect to migration, the approach would be as follows:

1) Upgrade all brokers to v2.1.0 or later

2) After (1) is complete, turn on re-authentication for brokers (as inter-broker clients, via 'sasl.login.refresh.reauthenticate.enable') at whatever rate is desired -- just eventually, at some point, get the client-side feature turned on for all brokers so that inter-broker connections are re-authenticating.

3) After (2) is complete, partially enable the server-side kill functionality with a negative value for 'connections.max.reauth.ms' on all brokers.  The metric documenting the number of API requests made over expired connections will begin to increase (but no connections will be killed)

4) In parallel with (1), (2), and (3) above, upgrade non-broker clients to v2.1.0 or later and turn their re-authentication feature on.  Clients will check the API version and only re-authenticate to a broker that has also been upgraded (note that the ability of a broker to respond to a re-authentication cannot be turned off -- it is always on beginning with version 2.1.0, and it just sits there doing nothing if it isn't exercised by an enabled client).

5) After (3) and (4) are complete, check the broker metric documenting the number of API requests made over expired connections to confirm that it is no longer increasing.  Once you are satisfied that (1), (2), (3), and (4) are indeed complete you can fully enable the server-side expired-connection-kill feature on each broker by changing the 'sasl.login.refresh.reauthenticate.enable' value from its negative value to its absolute value and restarting it.

6) Monitor the metric that documents the number of killed connections – it will remain at 0 unless an older client or one that does not have re-authentication enabled connects.

Rejected Alternatives

Delaying Support for Brokers Killing Connections

It was initially proposed that we defer adding the ability for brokers to kill connections using expired credentials to a future KIP.  This functionality is actually easier to add than re-authentication, and re-authentication without this feature doesn't really improve security (because it can't be enforced).  Adding the ability to kill connections using an expired bearer token without the ability for the client to re-authenticate also does not make sense as a general feature – it forces the client to essentially "recover" from what looks like a network error on a periodic basis.  So we will implement both features at the same time.

Highest-Level Approach: Inserting Requests into Clients' Queues

The original implementation injected requests directly into both asynchronous and synchronous I/O clients' queues.  This implementation was too high up in the stack and required significantly more work and code than any other solution.  It also was much harder to maintain because it became entwined in the implementation of every client.

High-Level Approach: One-Size-Fits-All With Additional KafkaClient Methods

One implementation injected requests into the KafkaClient via a new method (shown below).  This one-size-fits-all approach worked as long as synchronous I/O clients periodically checked for and sent injected requests related to re-authentication (via a new method, also shown below).  This implementation was harder to maintain than the chosen approach because it crossed existing module boundaries related to security code, networking code, and code higher up in the stack – it imposed requirements (however slight) on code higher up in the stack in order for re-authentication to work.  This violation of existing modularity caused concern.  The code was also 2-3 times bigger (at least) relative to the accepted implementation, and it was incrementally (though not dramatically) more difficult to test.  It did create the possibility of interleaving requests related to re-authentication with requests that the client was otherwise sending, which minimized latency spikes due to re-authentication, but that advantage was difficult to quantify and therefore did not tip the balance in favor of this option.

Code Block
languagejava
titleorg.apache.kafka.clients.KafkaClient additions
    /**
     * Return true if any node has a re-authentication request either enqueued and
     * waiting to be sent or already in-flight. A call to {@link #poll(long, long)}
     * is required to send and receive/process the results of such requests. <b>An
     * owner of this instance that does not implement a run loop to repeatedly call
     * {@link #poll(long, long)} but instead only sends requests synchronously
     * on-demand to a single node must call this method periodically -- and invoke
     * {@link #poll(long, long)} if the return value is {@code true} -- to ensure
     * that any re-authentication requests that have been injected are sent and
     * processed in a timely fashion.</b>
     * <p>
     * Example code to re-authenticate a connection across several
     * requests/responses is as follows:
     * 
     * <pre>
     * // Send multiple requests related to re-authentication in the synchronous
     * // use case, completing the re-authentication exchange.
     * while (kafkaClient.hasReauthenticationRequest())
     *     // Returns an empty list in synchronous use case.
     *     kafkaClient.poll(Long.MAX_VALUE, time.milliseconds());
     * // The connection is ready for use, and if there originally was a
     * // re-authentication request then as many requests as required to
     * // complete the exchange have been sent.
     * </pre>
     * 
     * Alternatively, to only send one re-authentication request and receive its
     * response (which allows us to interleave other requests to the single node to
     * which we are connected before subsequent requests related to the multi-step
     * re-authentication exchange are sent):
     * 
     * <pre>
     * // Send a single request related to re-authentication in the synchronous
     * // use case, potentially (but not necessarily) completing the
     * // re-authentication exchange.
     * while (kafkaClient.hasReauthenticationRequest()) {
     *     // Returns an empty list in synchronous use case.
     *     kafkaClient.poll(Long.MAX_VALUE, time.milliseconds());
     *     if (!kafkaClient.hasInFlightRequests())
     *         break; // Response has been received.
     * }
     * // The connection is ready for use, and if there was a
     * // re-authentication request then either the exchange is finished or
     * // there is another re-authentication request available to be sent.
     * </pre>
     * 
     * @return if any node has a re-authentication request either enqueued and
     *         waiting to be sent or already in-flight
     * @see #enqueueAuthenticationRequest(ClientRequest)
     */
    default boolean hasReauthenticationRequest() {
        return false;
    }


    /**
     * Enqueue the given request related to re-authenticating a connection. This
     * method is guaranteed to be thread-safe even if the class implementing this
     * interface is generally not.
     * 
     * @param clientRequest
     *            the request to enqueue
     * @see #hasReauthenticationRequest()
     */
    default void enqueueAuthenticationRequest(ClientRequest clientRequest) {
        // empty
    }

Authenticating a Separate Connection and Transferring Credentials

...