You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 32 Next »

Status

Current stateUnder Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The adoption of KIP-255: OAuth Authentication via SASL/OAUTHBEARER in release 2.0.0 creates the possibility of using information in the bearer token to make authorization decisions.  Unfortunately, however, Kafka connections are long-lived, so there is no ability to change the bearer token associated with a particular connection.  Additionally, the use of short-lived tokens is a common OAuth security recommendation, but issuing a short-lived token to a Kafka client (or a broker when OAUTHBEARER is the inter-broker protocol) has no benefit because once a client is connected to a broker the client is never challenged again and the connection may remain intact beyond the token expiration time (and may remain intact indefinitely under perfect circumstances).  This KIP proposes adding the ability for clients (and brokers when OAUTHBEARER is the inter-broker protocol) to re-authenticate their connections to remote brokers and have the new bearer token appear on their session rather than the old one.

The implementation is designed in such a way that it does not preclude adding support for re-authentication of other SASL mechanism (e.g. PLAIN, SCRAM-related, and GSSAPI), but doing so is explicitly out-of-scope for this proposal.  Also explicitly out-of-scope for this proposal is the ability for brokers to close connections that continue to use expired credentials.  This ability is a natural next step, but it will be addressed via a separate KIP if/when this one is adopted.

Public Interfaces

This KIP proposes the addition of a single interface to the API and a single additional configuration option to enable the feature (the option value defaults to false, of course, so there is no change to existing behavior in the absence of an explicit opt-in).  Specifically, the interface it proposes to add is as follows:

org.apache.kafka.common.security.expiring.ExpiringCredential
/**
 * A credential that expires and that can potentially be refreshed; such a
 * refreshed credential can also potentially be used to re-authenticate an
 * existing connection.
 * <p>
 * The parameters that impact how the refresh algorithm operates are specified
 * as part of the producer/consumer/broker configuration and are as follows. See
 * the documentation for these properties elsewhere for details.
 * <table>
 * <tr>
 * <th>Producer/Consumer/Broker Configuration Property</th>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.window.factor}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.window.jitter}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.min.period.seconds}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.min.buffer.seconds}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.reauthenticate.enable}</td>
 * </tr>
 * </table>
 * <p>
 * This interface was introduced in 2.1.0 and, while it feels stable, it could
 * evolve. We will try to evolve the API in a compatible manner, but we reserve
 * the right to make breaking changes in minor releases, if necessary. We will
 * update the {@code InterfaceStability} annotation and this notice once the API
 * is considered stable.
 * 
 * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_REAUTHENTICATE_ENABLE_DOC
 */
@InterfaceStability.Evolving
public interface ExpiringCredential {
    /**
     * The value to use when asking a {@code SaslClient} instance for the
     * {@code ExpiringCredential} that was used to authenticate, if any
     */
    public static final String SASL_CLIENT_NEGOTIATED_PROPERTY_NAME = "Expiring.Credential";

    /**
     * The name of the principal to which this credential applies (used only for
     * logging)
     * 
     * @return the always non-null/non-empty principal name
     */
    String principalName();

    /**
     * When the credential became valid, in terms of the number of milliseconds
     * since the epoch, if known, otherwise null. An expiring credential may not
     * necessarily indicate when it was created -- just when it expires -- so we
     * need to support a null return value here.
     * 
     * @return the time when the credential became valid, in terms of the number of
     *         milliseconds since the epoch, if known, otherwise null
     */
    Long startTimeMs();

    /**
     * When the credential expires, in terms of the number of milliseconds since the
     * epoch. All expiring credentials by definition must indicate their expiration
     * time -- thus, unlike other methods, we do not support a null return value
     * here.
     * 
     * @return the time when the credential expires, in terms of the number of
     *         milliseconds since the epoch
     */
    long expireTimeMs();

    /**
     * The point after which the credential can no longer be refreshed, in terms of
     * the number of milliseconds since the epoch, if any, otherwise null. Some
     * expiring credentials can be refreshed over and over again without limit, so
     * we support a null return value here.
     * 
     * @return the point after which the credential can no longer be refreshed, in
     *         terms of the number of milliseconds since the epoch, if any,
     *         otherwise null
     */
    Long absoluteLastRefreshTimeMs();
}


The configuration option this KIP proposes to add is 'sasl.login.refresh.reauthenticate.enable' – it defaults to false, and when explicitly set to true the feature will be enabled for any SASL connection that generates a private credential that implements the above ExpiringCredential interface (this will be the case with OAUTHBEARER by default because the Login implementation that is used by default for OAUTHBEARER will automatically wrap any OAuthBearerToken instance to make it implement ExpiringCredential if it does not already do so).

From a behavior perspective, when the option is enabled and a private credential implements ExpiringCredential, the existing configuration options sasl.login.refresh.{window.factor, window.jitter, min.period.seconds, min.buffer.seconds} define when the org.apache.kafka.common.security.auth.Login implementation on the client must generate a new private credential (e.g. when it must retrieve a new bearer token).  The default configuration values result in a new credential being retrieved sometime between 75% and 85% of the current credential's lifetime (e.g. sometime between 45 and 51 minutes after a token with an hour lifetime is initially retrieved).  At that time, when the new token is retrieved, every connection that authenticated with the old credential will be told to re-authenticate using the new credential.  After successful re-authentication, when the new credential is again refreshed (after another 45-51 minutes), the connections will be told once again to re-authenticate, etc.

Note that the class that implements the refresh logic (org.apache.kafka.common.security.expiring.internals.ExpiringCredentialRefreshingLogin) is not considered part of the public API.  This means that while it is up to the org.apache.kafka.common.security.auth.Login implementation for a particular mechanism to implement the logic, and that implementation can delegate to ExpiringCredentialRefreshingLogin to do so (as org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin does), this is something that could only be be done for the built-in SASL mechanisms (e.g. PLAIN, SCRAM-related, and GSSAPI).  There is no intent to allow non-builtin mechanisms to generate credentials that can be refreshed and re-authenticated.

If a re-authentication attempt should fail then the connection will be told to retry after some delay depending on how many retries have been attempted so far: after some small amount of time for the first retry (e.g. 1 minute), double that for the second retry, and 4 times the initial delay for every retry thereafter.  Retry attempts generally occur indefinitely until either one of them succeeds or the connection is closed.  There are certain errors that result in retries not being attempted (i.e. if some internal state doesn't make sense, which generally should not happen), and there are certain other errors that will only result in a retry if the existing token is still active (generally errors like an IOException, for example).

No metrics are affected/created by this proposal.  In particular, re-authentications are not counted as authentications, and no metrics related to re-authentication are created.

Although there is no technical reason preventing it, we arbitrarily decide to disallow changing identities upon re-authentication.  For example, if a connection originally authenticates as USER:user1, an attempt to re-authenticate as anything else (e.g. USER:user2) will fail.  Retry is allowed indefinitely in this case.

Proposed Changes

The description of this KIP is actually quite straightforward from a functionality perspective – turn the feature on with the configuration option and it just works for OAUTHBEARER; use a custom LoginModule for other mechanisms to create credentials implementing ExpiringCredential and delegate to org.apache.kafka.common.security.expiring.internals.ExpiringCredentialRefreshingLogin for it will work for those mechanisms, too.  From an implementation perspective, though, the KIP is not so straightforward; it therefore includes a pull request with a proposed implementation.  Here is a high-level description of how the proposed implementation generally works.  Note that this description applies to the implementation only – none of this is part of the public API.

First, at a high level, we recognize that org.apache.kafka.clients.NetworkClient can be used in either an asynchronous or synchronous manner; in async mode the NetworkClient.poll() method is repeatedly called directly in a run loop, whereas in synchronous mode the NetworkClientUtils.sendAndReceive() method is used whenever something has to be sent (this method in turn calls NetworkClient.poll(), but otherwise poll() is not invoked).  The fact that these two separate use cases exist is important because the requests related to re-authentication (ApiVersionsRequest, SaslHandshakeRequest, and SaslAuthenticateRequest) cannot simply be injected into a queue that NetworkClient owns; we don't know when poll() will be called in the synchronous case, for example – it is only invoked when something needs to be sent, and that might take a while (if it occurs at all).  So a single queue approach won't work.

The approach we take is to define an interface org.apache.kafka.common.security.authenticator.AuthenticationRequestEnqueuer that declares an enqueueRequest(AuthenticationRequest) method.  We leave it to the owners of the NetworkClient instances to define how to inject such requests by providing an implementation of the interface to the SaslChannelBuilder, which in turn provides it to the SaslClientAuthenticator.

For example, the org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient class provides the authenticationRequestEnqueuer() method that yields an instance that invokes ConsumerNetworkClient.send(Node, AbstractRequest.Builder<?>); it is then a simple matter of adding the following code to the constructors of the org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.connect.runtime.distributed.WorkerGroupMember classes, which use ConsumerNetworkClient for their connectivity:


Constructor Code Addition for Users of ConsumerNetworkClient
if (channelBuilder instanceof SaslChannelBuilder)
    ((SaslChannelBuilder) channelBuilder).authenticationRequestEnqueuer(client.authenticationRequestEnqueuer());

Another example is kafka.controller.ControllerChannelManager.  Instances of this class maintain one request queue per broker with one background thread allocated for each broker to monitor the queue and send the requests that appear.  The requests are sent in a synchronous fashion with respect to the thread when a request appears in the queue (i.e. unlike ConsumerNetworkClient, which is an async use case, this is a synchronous NetworkClient use case).  It is necessary to invoke sendRequest() to add a request to the queue for a broker and have it sent.  The code in the addNewBroker() method therefore looks like this:

ControllerChannelManager.addNewBroker() Code Addition
if (channelBuilder.isInstanceOf[SaslChannelBuilder])
  channelBuilder.asInstanceOf[SaslChannelBuilder].authenticationRequestEnqueuer(
    new AuthenticationRequestEnqueuer() {
      def enqueueRequest(authenticationRequest: AuthenticationRequest): Unit =
          sendRequest(authenticationRequest.nodeId(), authenticationRequest.requestBuilder().apiKey(),
              authenticationRequest.requestBuilder(), null, authenticationRequest.authenticationRequestCompletionHandler())
})

So the general approach is to identify how to inject requests and then set the appropriate AuthenticationRequestEnqueuer implementation on the SaslChannelBuilder.  In addition to the above examples the PR also includes implementations for org.apache.kafka.clients.admin.KafkaAdminClient and org.apache.kafka.clients.producer.KafkaProducer.

The PR does not yet have implementations for these classes:

  • kafka.coordinator.transaction.TransactionMarkerChannelManager

  • kafka.server.ReplicaFetcherBlockingSend

  • kafka.common.InterBrokerSendThread

The PR also does not yet have unit or integration tests – I will add these if/when the general implementation approach is verified by the community as being appropriate.  I have stepped through the code in the debugger, and I can see from this exercise as well as from the emitted log messages that it does work as currently implemented.

The final issue to describe is how/when a KafkaChannel instance (each of which corresponds to a unique network connection) is told to re-authenticate.

We define the class org.apache.kafka.common.security.expiring.internals.ClientChannelCredentialTracker to keep track of the various KafkaChannel instances and the ExpiringCredential instances that they authenticated with.  When the feature is enabled we add an instance of this class to the private credentials of the Subject associated with the SASL mechanism using this code in SaslChannelBuilder:

SaslChannelBuilder Code to Enable the Feature
LoginManager loginManager = LoginManager.acquireLoginManager(entry.getValue(), mechanism, defaultLoginClass, configs);
loginManagers.put(mechanism, loginManager);
Subject subject = loginManager.subject();
if (mode == Mode.CLIENT) {
    if (saslLoginRefreshReauthenticateEnable()) {
        log.info("SASL Login Refresh Re-Authenticate ENABLED");
        if (subject.getPrivateCredentials(ClientChannelCredentialTracker.class).isEmpty())
            subject.getPrivateCredentials().add(new ClientChannelCredentialTracker());
    } else
        log.info("SASL Login Refresh Re-Authenticate DISABLED");
}

The KafkaChannel and ExpiringCredentialRefreshingLogin classes can easily retrieve the ClientChannelCredentialTracker instance and tell it when various events of interest occur (a channel is initially authenticated, a credential is refreshed, a channel is disconnected).  The events are enqueued as they are reported, and a background thread processes them to decide which KafkaChannel instances need to re-authenticate.  There is a method KafkaChannel.initiateReauthentication(Time) that instantiates an instance of SaslClientAuthenticator and tells it to re-authenticate via the method SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver).  The implementation of AuthenticationSuccessOrFailureReceiver that is sent takes care of notifying the ClientChannelCredentialTracker instance; it looks something like this:

AuthenticationSuccessOrFailureReceiver
new AuthenticationSuccessOrFailureReceiver() {
    @Override
    public void reauthenticationFailed(RetryIndication retry, String errorMessage) {
        log.warn(errorMessage);
        Utils.closeQuietly(notYetAuthenticatedAuthenticator, "notYetAuthenticatedAuthenticator");
        notYetAuthenticatedAuthenticator = null;
        clientChannelCredentialTracker().offer(ClientChannelCredentialEvent
                .channelFailedReauthentication(time, KafkaChannel.this, errorMessage, retry));
    }

    @Override
    public void reauthenticationSucceeded() {
        clientChannelCredentialTracker().offer(ClientChannelCredentialEvent
                .channelReauthenticated(time, KafkaChannel.this, notYetAuthenticatedSaslClientAuthenticator
                    .expiringCredential());
        replaceAnyPreviouslyAuthenticatedAuthenticatorWithNewOne();
        return;
    }
}

When the method SaslClientAuthenticator.initiateReauthentication(Time, AuthenticationSuccessOrFailureReceiver) is invoked it is up to the SaslClientAuthenticator instance to use its AuthenticationRequestEnqueuer instance to enqueue an ApiVersionsRequest along with an appropriate callback that either fails the re-authentication if the ApiVersionsRequest fails or continues it with a SaslHandshakeRequest.  Here is what the code looks like:

SaslClientAuthenticator.initiateReauthentication()
    public void initiateReauthentication(Time time,
            AuthenticationSuccessOrFailureReceiver authenticationSuccessOrFailureReceiver) {
        if (saslState != SaslState.SEND_APIVERSIONS_REQUEST) {
            notifyFailureDueToUnexpectedState(SaslState.SEND_APIVERSIONS_REQUEST,
                    authenticationSuccessOrFailureReceiver)
            return;
        }
        authenticationRequestEnqueuer().enqueueRequest(new AuthenticationRequest(this.node, new ApiVersionsRequest.Builder((short) 0),
                authenticationRequestCompletionHandlerForApiVersionsRequest(time,
                        authenticationSuccessOrFailureReceiver)));
    }

    private void notifyFailureDueToUnexpectedState(SaslState expectedSaslState,
            AuthenticationSuccessOrFailureReceiver authenticationSuccessOrFailureReceiver) {
        // it is possible that the connection was closed; don't retry
        SaslState receivedSaslState = saslState;
        if (saslState != SaslState.CLOSED)
            saslState = SaslState.FAILED;
        authenticationSuccessOrFailureReceiver.reauthenticationFailed(RetryIndication.DO_NOT_RETRY, String.format(
                "Re-authentication was in process but could not proceed because authenticator state is incorrect (expected %s): %s",
                expectedSaslState, receivedSaslState));
    }

And of course the method authenticationRequestCompletionHandlerForApiVersionsRequest(Time, AuthenticationSuccessOrFailureReceiver) looks similar but deals with sending a SaslHandshakeRequest (see the PR for details).

Finally, we add methods on SaslServerAuthenticator to respond to the requests that arrive related to re-authentication – specifically, respondToReauthenticationSaslHandshakeRequest() and respondToReauthenticationSaslAuthenticateRequest() – and we add code to kafka.server.KafkaApis to route the received requests accordingly (as opposed to responding with an error, which is what currently happens).  I won't go into the details here; see the PR for the code.

Compatibility, Deprecation, and Migration Plan

There is no impact to existing installations because the default is for the feature to be turned off.

Rejected Alternatives

A single queue owned by org.apache.kafka.clients.NetworkClient does not reliably work in the synchronous case as described above; we cannot know how soon an injected request will be sent (if ever).

One alternative idea is to add two new request types: "ReceiveReauthenticationNonceRequest" and "ReauthenticateWithNonceRequest".  When re-authentication needs to occur the client would make a separate, new connection to the broker and send a "ReceiveReauthenticationNonceRequest" to the broker to have it associate a nonce with the authenticated credentials and return the nonce to the client.  Then the client would send a "ReauthenticateWithNonceRequest" with the returned nonce over the connection that it wishes to re-authenticate; the broker would then replace the credentials on that connection with the credentials it had previously associated with the nonce.  I don't know if this would work (might there be some issue with advertised vs. actual addresses and maybe the possibility of there being a load balancer?  Could we be guaranteed the ability to connect to the exact same broker as our existing connection?) . If it could work then it does have the advantage of requiring the injection of just a single request over an existing connection that would return very quickly rather than 3 separate requests of which at least one might take a while to return (to potentially retrieve a public key for token signature validation, for example; the validation itself isn't exactly free, either, even if the public key is already cached).  One disadvantage of the alternative, nonce-based approach is that it requires the creation of a separate connection, including TLS negotiation, and that is very expensive compared to sending 3 requests over an existing connection (which of course already has TLS negotiated).

  • No labels