You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 15 Next »

Status

Current stateUnder Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The adoption of KIP-255: OAuth Authentication via SASL/OAUTHBEARER in release 2.0.0 creates the possibility of using information in the bearer token to make authorization decisions.  Unfortunately, however, Kafka connections are long-lived, so there is no ability to change the bearer token associated with a particular connection.  Additionally, the use of short-lived tokens is a common OAuth security recommendation, but issuing a short-lived token to a Kafka client (or a broker when OAUTHBEARER is the inter-broker protocol) has no benefit because once a client is connected to a broker the client is never challenged again and the connection may remain intact beyond the token expiration time (and may remain intact indefinitely under perfect circumstances).  This KIP proposes adding the ability for clients (and brokers when OAUTHBEARER is the inter-broker protocol) to re-authenticate their connections to remote brokers and have the new bearer token appear on their session rather than the old one.

The implementation is designed in such a way that it does not preclude adding support for re-authentication of other SASL mechanism (e.g. PLAIN, SCRAM-related, and GSSAPI), but doing so is explicitly out-of-scope for this proposal.  Also explicitly out-of-scope for this proposal is the ability for brokers to close connections that continue to use expired credentials.  This ability is a natural next step, but it will be addressed via a separate KIP if/when this one is adopted.

Public Interfaces

This KIP proposes the addition of a single interface to the API and a single additional configuration option to enable the feature (the option value defaults to false, of course, so there is no change to existing behavior in the absence of an explicit opt-in).  Specifically, the interface it proposes to add is as follows:

org.apache.kafka.common.security.expiring.ExpiringCredential
/**
 * A credential that expires and that can potentially be refreshed; such a
 * refreshed credential can also potentially be used to re-authenticate an
 * existing connection.
 * <p>
 * The parameters that impact how the refresh algorithm operates are specified
 * as part of the producer/consumer/broker configuration and are as follows. See
 * the documentation for these properties elsewhere for details.
 * <table>
 * <tr>
 * <th>Producer/Consumer/Broker Configuration Property</th>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.window.factor}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.window.jitter}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.min.period.seconds}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.min.buffer.seconds}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.reauthenticate.enable}</td>
 * </tr>
 * </table>
 * <p>
 * This interface was introduced in 2.1.0 and, while it feels stable, it could
 * evolve. We will try to evolve the API in a compatible manner, but we reserve
 * the right to make breaking changes in minor releases, if necessary. We will
 * update the {@code InterfaceStability} annotation and this notice once the API
 * is considered stable.
 * 
 * @see OAuthBearerLoginModule
 * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_REAUTHENTICATE_ENABLE_DOC
 */
@InterfaceStability.Evolving
public interface ExpiringCredential {
    /**
     * The value to use when asking a {@code SaslClient} instance for the
     * {@code ExpiringCredential} that was used to authenticate, if any
     */
    public static final String SASL_CLIENT_NEGOTIATED_PROPERTY_NAME = "Expiring.Credential";

    /**
     * The name of the principal to which this credential applies (used only for
     * logging)
     * 
     * @return the always non-null/non-empty principal name
     */
    String principalName();

    /**
     * When the credential became valid, in terms of the number of milliseconds
     * since the epoch, if known, otherwise null. An expiring credential may not
     * necessarily indicate when it was created -- just when it expires -- so we
     * need to support a null return value here.
     * 
     * @return the time when the credential became valid, in terms of the number of
     *         milliseconds since the epoch, if known, otherwise null
     */
    Long startTimeMs();

    /**
     * When the credential expires, in terms of the number of milliseconds since the
     * epoch. All expiring credentials by definition must indicate their expiration
     * time -- thus, unlike other methods, we do not support a null return value
     * here.
     * 
     * @return the time when the credential expires, in terms of the number of
     *         milliseconds since the epoch
     */
    long expireTimeMs();

    /**
     * The point after which the credential can no longer be refreshed, in terms of
     * the number of milliseconds since the epoch, if any, otherwise null. Some
     * expiring credentials can be refreshed over and over again without limit, so
     * we support a null return value here.
     * 
     * @return the point after which the credential can no longer be refreshed, in
     *         terms of the number of milliseconds since the epoch, if any,
     *         otherwise null
     */
    Long absoluteLastRefreshTimeMs();
}


The configuration option this KIP proposes to add is 'sasl.login.refresh.reauthenticate.enable' – it defaults to false, and when explicitly set to true the feature will be enabled for any SASL connection that generates a private credential that implements the above ExpiringCredential interface (this will be the case with OAUTHBEARER by default because the Login implementation that is used by default for OAUTHBEARER will automatically wrap any OAuthBearerToken instance to make it implement ExpiringCredential if it does not already do so).

Proposed Changes

The description of this KIP is actually quite straightforward from a functionality perspective – turn the feature on with the configuration option and it just works for OAUTHBEARER; use a custom LoginModule for other mechanism to create credentials implementing ExpiringCredential and it will work for those mechanism, too.  From an implementation perspective, though, the KIP is not so straightforward; it therefore includes a pull request with a proposed implementation.  Here is a high-level description of how the proposed implementation generally works.  Note that this description applies to the implementation only – none of this is part of the public API.

First, at a high level, we recognize that org.apache.kafka.clients.NetworkClient can be used in either an asynchronous or synchronous manner; in async mode the NetworkClient.poll() method is repeatedly called directly in a run loop, whereas in synchronous mode the NetworkClientUtils.sendAndReceive() method is used whenever something has to be sent (this method in turn calls NetworkClient.poll(), but otherwise poll() is not invoked).  The fact that these two separate use cases exist is important because the requests related to re-authentication (ApiVersionsRequest, SaslHandshakeRequest, and SaslAuthenticateRequest) cannot simply be injected into a queue that NetworkClient owns; we don't know when poll() will be called in the synchronous case, for example – it is only invoked when something needs to be sent, and that might take a while (if it occurs at all).  So a single queue approach won't work.

The approach we take is to define an interface org.apache.kafka.common.security.authenticator.AuthenticationRequestEnqueuer that declares 3 separate enqueueRequest() methods (one for each of the request types related to re-authentication).  We leave it to the owners of the NetworkClient instances to define how to inject the requests by providing an implementation of the interface to the SaslChannelBuilder, which in turn provides it to the SaslClientAuthenticator.

For example, the org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient class provides the authenticationRequestEnqueuer() method that yields an instance that invokes ConsumerNetworkClient.send(Node, AbstractRequest.Builder<?>); it is then a simple matter of adding the following code to the constructors of the org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.connect.runtime.distributed.WorkerGroupMember classes:


Constructor Code Addition for Users of ConsumerNetworkClient
if (channelBuilder instanceof SaslChannelBuilder)
    ((SaslChannelBuilder) channelBuilder).authenticationRequestEnqueuer(client.authenticationRequestEnqueuer());

Another example is kafka.controller.ControllerChannelManager.  Instances of this class maintain one request queue per broker with one background thread allocated for each broker to monitor the queue and send the requests that appear.  The requests are sent in a synchronous fashion with respect to the thread only when a request appears in the queue (i.e. unlike ConsumerNetworkClient, which is an async use case, this is a synchronous use case).  It is necessary to invoke sendRequest() to add a request to the queue for a broker and have it sent.  The code in the addNewBroker() method therefore looks like this:

ControllerChannelManager.addNewBroker() Code Addition
      if (channelBuilder.isInstanceOf[SaslChannelBuilder])
        channelBuilder.asInstanceOf[SaslChannelBuilder].authenticationRequestEnqueuer(
          new AuthenticationRequestEnqueuer() {
            def enqueueRequest(nodeId: String, apiVersionsRequestBuilder: ApiVersionsRequest.Builder,
              callback: AuthenticationRequestCompletionHandler): Unit =
                sendRequest(nodeId.toInt, apiVersionsRequestBuilder.apiKey(), apiVersionsRequestBuilder, null, callback)

            def enqueueRequest(nodeId: String, saslHandshakeRequestBuilder: SaslHandshakeRequest.Builder,
              callback: AuthenticationRequestCompletionHandler): Unit =
                sendRequest(nodeId.toInt, saslHandshakeRequestBuilder.apiKey(), saslHandshakeRequestBuilder, null, callback)

            def enqueueRequest(nodeId: String, saslAuthenticateRequestBuilder: SaslAuthenticateRequest.Builder,
              callback: AuthenticationRequestCompletionHandler): Unit =
                sendRequest(nodeId.toInt, saslAuthenticateRequestBuilder.apiKey(), saslAuthenticateRequestBuilder, null, callback)
          })

So the general approach is to identify how to inject requests and then set the appropriate AuthenticationRequestEnqueuer implementation on the SaslChannelBuilder.  In addition to the above examples the PR also includes an implementation for org.apache.kafka.clients.admin.KafkaAdminClient.

The PR does not yet have implementations for produce-related classes.  For example:

  • org.apache.kafka.clients.producer.KafkaProducer

  • kafka.coordinator.transaction.TransactionMarkerChannelManager

  • kafka.server.ReplicaFetcherBlockingSend

  • kafka.common.InterBrokerSendThread

The PR also does not yet have unit or integration tests – I will add these if/when the general implementation approach is verified by the community as being appropriate.  I have stepped through the code in the debugger, and it does work.

The final issue to describe is how/when a KafkaChannel instance (each of which corresponds to a unique network connection) is told to re-authenticate.

We define the class org.apache.kafka.common.security.expiring.internals.ClientChannelCredentialTracker to keep track of the various KafkaChannel instances and the ExpiringCredential instances that they authenticated with.  When the feature is enabled we add an instance of this class to the private credentials of the Subject associated with the SASL mechanism using this code in SaslChannelBuilder:

SaslChannelBuilder Code to Enable the Feature
LoginManager loginManager = LoginManager.acquireLoginManager(entry.getValue(), mechanism, defaultLoginClass, configs);
loginManagers.put(mechanism, loginManager);
Subject subject = loginManager.subject();
if (mode == Mode.CLIENT) {
    if (saslLoginRefreshReauthenticateEnable()) {
        log.info("SASL Login Refresh Re-Authenticate ENABLED");
        if (subject.getPrivateCredentials(ClientChannelCredentialTracker.class).isEmpty())
            subject.getPrivateCredentials().add(new ClientChannelCredentialTracker());
    } else
        log.info("SASL Login Refresh Re-Authenticate DISABLED");
}

The KafkaChannel and ExpiringCredentialRefreshingLogin classes can easily retrieve the ClientChannelCredentialTracker instance and tell it when various events of interest occur (a channel is initially authenticated, a credential is refreshed, a channel is closed).  The events are enqueued as they are reported, and a background thread processes them to decide which KafkaChannel instances need to re-authenticate.

Compatibility, Deprecation, and Migration Plan

There is no impact to existing installations because the default is for the feature to be turned off.

Rejected Alternatives

An alternative idea is to add two new request types: "receive re-authentication nonce" and "re-authenticate with nonce".  When re-authentication needs to occur the client would make a separate, new connection to the broker and then send the "receive re-authentication nonce" request to the broker to have it associate a nonce with the credentials and return the nonce to the client.  Then the client would send the  "re-authenticate with nonce" request with the returned nonce over the connection that it wishes to re-authenticate; the broker would then replace the credentials on that connection with the credentials it had previously associated with the nonce.  I don't know if this would work (might there be some issue with advertised vs. actual addresses and maybe the possibility of there being a load balancer?  Could we be guaranteed the ability to connect to the exact same broker as our existing connection?) . If it could work then it does have the advantage of requiring the injection of just a single request over an existing connection that would return very quickly rather than 3 separate requests of which at least one might take a while to return (to potentially retrieve a public key for token signature validation, for example; the validation itself isn't exactly free, either).  One disadvantage of the alternative approach is that it requires the creation of a separate connection, including TLS negotiation, and that is very expensive compared to sending 3 requests over an existing connection (which of course already has TLS negotiated).

  • No labels