Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleorg.apache.kafka.common.security.expiring.ExpiringCredential
collapsetrue
/**
 * A credential that expires and that can potentially be refreshed; such a
 * refreshed credential can also potentially be used to re-authenticate an
 * existing connection.
 * <p>
 * The parameters that impact how the refresh algorithm operates are specified
 * as part of the producer/consumer/broker configuration and are as follows. See
 * the documentation for these properties elsewhere for details.
 * <table>
 * <tr>
 * <th>Producer/Consumer/Broker Configuration Property</th>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.window.factor}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.window.jitter}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.min.period.seconds}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.min.buffer.seconds}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.reauthenticate.enable}</td>
 * </tr>
 * </table>
 * <p>
 * This interface was introduced in 2.1.0 and, while it feels stable, it could
 * evolve. We will try to evolve the API in a compatible manner, but we reserve
 * the right to make breaking changes in minor releases, if necessary. We will
 * update the {@code InterfaceStability} annotation and this notice once the API
 * is considered stable.
 * 
 * @see OAuthBearerLoginModule
 * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_REAUTHENTICATE_ENABLE_DOC
 */
@InterfaceStability.Evolving
public interface ExpiringCredential {
    /**
     * The value to use when asking a {@code SaslClient} instance for the
     * {@code ExpiringCredential} that was used to authenticate, if any
     */
    public static final String SASL_CLIENT_NEGOTIATED_PROPERTY_NAME = "Expiring.Credential";

    /**
     * The name of the principal to which this credential applies (used only for
     * logging)
     * 
     * @return the always non-null/non-empty principal name
     */
    String principalName();

    /**
     * When the credential became valid, in terms of the number of milliseconds
     * since the epoch, if known, otherwise null. An expiring credential may not
     * necessarily indicate when it was created -- just when it expires -- so we
     * need to support a null return value here.
     * 
     * @return the time when the credential became valid, in terms of the number of
     *         milliseconds since the epoch, if known, otherwise null
     */
    Long startTimeMs();

    /**
     * When the credential expires, in terms of the number of milliseconds since the
     * epoch. All expiring credentials by definition must indicate their expiration
     * time -- thus, unlike other methods, we do not support a null return value
     * here.
     * 
     * @return the time when the credential expires, in terms of the number of
     *         milliseconds since the epoch
     */
    long expireTimeMs();

    /**
     * The point after which the credential can no longer be refreshed, in terms of
     * the number of milliseconds since the epoch, if any, otherwise null. Some
     * expiring credentials can be refreshed over and over again without limit, so
     * we support a null return value here.
     * 
     * @return the point after which the credential can no longer be refreshed, in
     *         terms of the number of milliseconds since the epoch, if any,
     *         otherwise null
     */
    Long absoluteLastRefreshTimeMs();
}

...

No metrics are affected/created by this proposal.  In particular, re-authentications are not counted as authentications, and no metric counting metrics related to re-authentications is authentication are created.

Although there is no technical reason preventing it, we arbitrarily decide to disallow changing identities upon re-authentication.  For example, if a connection originally authenticates as USER:user1, an attempt to re-authenticate as anything else (e.g. USER:user2) will fail.  Retry is allowed in this case.

...

The approach we take is to define an interface org.apache.kafka.common.security.authenticator.AuthenticationRequestEnqueuer that declares 3 separate an enqueueRequest() methods (one for each of the request types related to re-authentication)AuthenticationRequest) method.  We leave it to the owners of the NetworkClient instances to define how to inject the such requests by providing an implementation of the interface to the SaslChannelBuilder, which in turn provides it to the SaslClientAuthenticator.

For example, the org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient class provides the authenticationRequestEnqueuer() method that yields an instance that invokes ConsumerNetworkClient.send(Node, AbstractRequest.Builder<?>); it is then a simple matter of adding the following code to the constructors of the org.apache.kafka.clients.consumer.KafkaConsumer and org.apache.kafka.connect.runtime.distributed.WorkerGroupMember classes, which use ConsumerNetworkClient for their connectivity:

...

Code Block
languagescala
titleControllerChannelManager.addNewBroker() Code Addition
      if (channelBuilder.isInstanceOf[SaslChannelBuilder])
        channelBuilder.asInstanceOf[SaslChannelBuilder].authenticationRequestEnqueuer(
          new AuthenticationRequestEnqueuer() {
            def enqueueRequest(nodeIdauthenticationRequest: String, apiVersionsRequestBuilder: ApiVersionsRequest.Builder,
              callback: AuthenticationRequestCompletionHandler): Unit =
                sendRequest(nodeId.toInt, apiVersionsRequestBuilder.apiKey(), apiVersionsRequestBuilder, null, callback)

            def enqueueRequest(nodeId: String, saslHandshakeRequestBuilder: SaslHandshakeRequest.Builder,
              callback: AuthenticationRequestCompletionHandler): Unit =
                sendRequest(nodeId.toInt, saslHandshakeRequestBuilder.apiKey(), saslHandshakeRequestBuilder, null, callback)

            def enqueueRequest(nodeId: String, saslAuthenticateRequestBuilder: SaslAuthenticateRequest.Builder,AuthenticationRequest): Unit =
              callback: AuthenticationRequestCompletionHandler): Unit =sendRequest(authenticationRequest.nodeId(), authenticationRequest.requestBuilder().apiKey(),
                sendRequest(nodeId.toInt, saslAuthenticateRequestBuilder.apiKeyauthenticationRequest.requestBuilder(), saslAuthenticateRequestBuilder, null, callbackauthenticationRequest.authenticationRequestCompletionHandler())
          })

So the general approach is to identify how to inject requests and then set the appropriate AuthenticationRequestEnqueuer implementation on the SaslChannelBuilder.  In addition to the above examples the PR also includes an implementation for org.apache.kafka.clients.admin.KafkaAdminClient.

...

Code Block
languagejava
titleSaslClientAuthenticator.initiateReauthentication()
    public void initiateReauthentication(Time time,
            AuthenticationSuccessOrFailureReceiver authenticationSuccessOrFailureReceiver) {
        if (saslState != SaslState.SEND_APIVERSIONS_REQUEST) {
            notifyFailureDueToUnexpectedState(SaslState.SEND_APIVERSIONS_REQUEST,
                    authenticationSuccessOrFailureReceiver)
            return;
        }
        authenticationRequestEnqueuer().enqueueRequest(new AuthenticationRequest(this.node, new ApiVersionsRequest.Builder((short) 0),
                authenticationRequestCompletionHandlerForApiVersionsRequest(time,
                        authenticationSuccessOrFailureReceiver)));
    }

    private void notifyFailureDueToUnexpectedState(SaslState expectedSaslState,
            AuthenticationSuccessOrFailureReceiver authenticationSuccessOrFailureReceiver) {
        // it is possible that the connection was closed; don't retry
        SaslState receivedSaslState = saslState;
        if (saslState != SaslState.CLOSED)
            saslState = SaslState.FAILED;
        authenticationSuccessOrFailureReceiver.reauthenticationFailed(RetryIndication.DO_NOT_RETRY, String.format(
                "Re-authentication was in process but could not proceed because authenticator state is incorrect (expected %s): %s",
                expectedSaslState, receivedSaslState));
    }

...