Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added "Summary for Production Use" section and clarified that two refresh-related parameters are ignored if their sum exceeds the token lifetime.

...

Producer/Consumer/Broker Configuration PropertyDocumentation
sasl.login.refresh.window.factorLogin refresh thread will sleep until the specified window factor relative to the credential's
lifetime has been reached, at which time it will try to refresh the credential. Legal values
are between 0.5 (50%) and 1.0 (100%) inclusive; a default value of 0.8 (80%) is used if
no value is specified. Currently applies only to OAUTHBEARER.
sasl.login.refresh.window.jitterThe maximum amount of random jitter relative to the credential's lifetime that is added to
the login refresh thread's sleep time. Legal values are between 0 and 0.25 (25%) inclusive;
a default value of 0.05 (5%) is used if no value is specified. Currently applies only to
OAUTHBEARER.
sasl.login.refresh.min.period.seconds

The desired minimum time for the login refresh thread to wait before refreshing a credential,
in seconds. Legal values are between 0 and 900 (15 minutes); a default value of 60
(1 minute) is used if no value is specified.
This value and
sasl.login.refresh.buffer.seconds are both ignored if their sum exceeds the
remaining lifetime of a credential. Currently applies only to OAUTHBEARER.

sasl.login.refresh.buffer.secondsThe amount of buffer time before credential expiration to maintain when refreshing a
credential, in seconds. If a refresh would otherwise occur closer to expiration than the
number of buffer seconds then the refresh will be moved up to maintain as much of the
buffer time as possible, overriding all other considerations. Legal values are between 0 and
3600 (1 hour); a default value of 300 (5 minutes) is used if no value is specified. This value
and sasl.login.refresh.min.period.seconds are both ignored if their sum exceeds
the remaining lifetime of a credential.
Currently
applies only to OAUTHBEARER.

...

Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

/**
 * A {@code Callback} for use by the {@code SaslServer} implementation when it
 * needs to provide an OAuth 2 bearer token compact serialization for
 * validation. Callback handlers should use the
 * {@link #error(String, String, String)} method to communicate errors back to
 * the SASL Client as per
 * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
 * 2.0 Authorization Framework</a> and the <a href=
 * "https://www.iana.org/assignments/oauth-parameters/oauth-parameters.xhtml#extensions-error">IANA
 * OAuth Extensions Error Registry</a>. Callback handlers should communicate
 * other problems by raising an {@code IOException}.
 */
public class OAuthBearerValidatorCallback implements Callback {
    private final String tokenValue;
    private OAuthBearerToken token = null;
    private String errorStatus = null;
    private String errorScope = null;
    private String errorOpenIDConfiguration = null;

    /**
     * Constructor
     *
     * @param tokenValue
     *            the mandatory/non-blank token value
     */
    public OAuthBearerValidatorCallback(String tokenValue) {
        if (Objects.requireNonNull(tokenValue).isEmpty())
            throw new IllegalArgumentException("token value must not be empty");
        this.tokenValue = tokenValue;
    }

    /**
     * Return the (always non-null) token value
     *
     * @return the (always non-null) token value
     */
    public String tokenValue() {
        return tokenValue;
    }

    /**
     * Return the (potentially null) token
     *
     * @return the (potentially null) token
     */
    public OAuthBearerToken token() {
        return token;
    }

    /**
     * Return the (potentially null) error status value as per
     * <a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
     * of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>
     * and the <a href=
     * "https://www.iana.org/assignments/oauth-parameters/oauth-parameters.xhtml#extensions-error">IANA
     * OAuth Extensions Error Registry</a>.
     *
     * @return the (potentially null) error status value
     */
    public String errorStatus() {
        return errorStatus;
    }

    /**
     * Return the (potentially null) error scope value as per
     * <a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
     * of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>.
     *
     * @return the (potentially null) error scope value
     */
    public String errorScope() {
        return errorScope;
    }

    /**
     * Return the (potentially null) error openid-configuration value as per
     * <a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
     * of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>.
     *
     * @return the (potentially null) error openid-configuration value
     */
    public String errorOpenIDConfiguration() {
        return errorOpenIDConfiguration;
    }

    /**
     * Set the token. The token value is unchanged and is expected to match the
     * provided token's value. All error values are cleared.
     *
     * @param token
     *            the mandatory token to set
     */
    public void token(OAuthBearerToken token) {
        this.token = Objects.requireNonNull(token);
        this.errorStatus = null;
        this.errorScope = null;
        this.errorOpenIDConfiguration = null;
    }

    /**
     * Set the error values as per
     * <a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
     * of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>.
     * Any token is cleared.
     *
     * @param errorStatus
     *            the mandatory error status value from the <a href=
     *            "https://www.iana.org/assignments/oauth-parameters/oauth-parameters.xhtml#extensions-error">IANA
     *            OAuth Extensions Error Registry</a> to set
     * @param errorScope
     *            the optional error scope value to set
     * @param errorStatus
     *            the optional error openid-configuration value to set
     */
    public void error(String errorStatus, String errorScope, String errorOpenIDConfiguration) {
        if (Objects.requireNonNull(errorStatus).isEmpty())
            throw new IllegalArgumentException("error status must not be empty");
        this.errorStatus = errorStatus;
        this.errorScope = errorScope;
        this.errorOpenIDConfiguration = errorOpenIDConfiguration;
        this.token = null;
    }
}

Summary of Configuration
Anchor
Summary of Configuration
Summary of Configuration

The following table summarizes the proposed configuration for Kafka's SASL/OAUTHBEARER implementation.

ConfigurationExample Value

Likelihood of Value Being Different

from the Example Value

Notes
JAAS Login Module

org.apache.kafka.common.security.oauthbearer.

OAuthBearerLoginModule

Low 

Non-Broker:

 sasl.login.callback.handler.class

Broker:

 listener.name.sasl_ssl.oauthbearer.

   sasl.login.callback.handler.class

 

org.example.MyLoginCallbackHandler

 

High

 

Default implementation creates an unsecured OAuth Bearer Token

listener.name.sasl_ssl.oauthbearer.

  sasl.server.callback.handler.class

org.example.MyValidatorCallbackHandler

HighDefault implementation validates an unsecured OAuth Bearer Token
JAAS Module OptionsVariedHigh

Used to configure the above sasl.login and sasl.server callback handlers.

producer/consumer/broker configs: sasl.login.refresh.*Varied, though defaults are reasonableLow 

Summary for Production Use

To use SASL/OAUTHBEARER in a production scenario it is necessary to write two separate callback handlers implementing org.apache.kafka.common.security.auth.AuthenticateCallbackHandler:

  • A login callback handler that can retrieve an OAuth 2 bearer token from the token endpoint and wrap that token as an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerToken. It must attempt to do this when it is asked to handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback.
  • A SASL Server callback handler that can validate an OAuth 2 bearer token compact serialization and convert that compact serialization to an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerToken.  It must attempt to do this when it is asked to handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback.  Note that this may entail secure retrieval (and perhaps caching) of a public key along with digital signature verification if the token is a JSON Web Signature (JWS); validation of various token claims such as the 'iat' (Issued At), 'nbf' (Not Before), 'exp' (Expiration Time), 'aud' (Audience), and 'iss' (Issuer) claims if the token is a JSON Web Token (JWT); and/or decryption if the token is encrypted as a JSON Web Encryption (JWE).

See Summary of Configuration for details on how to declare these two classes to Kafka.

It is likely that the implementations of the above two callback handlers will leverage an open source JOSE (Javascript Object Signing and Encryption) library.  See https://jwt.io/ for a list of many of the available libraries.

OAuth 2 is a flexible framework that allows different installations to do things differently, so the principal name in Kafka could come from any claim in a JWT.  Most of the time it would come from the 'sub' claim, but it could certainly come from another claim, or it could be only indirectly based on a claim value (maybe certain text would be trimmed or prefixed/suffixed).  Because the OAuth 2 framework is flexible, we need to accommodate that flexibility – and the ability to plugin arbitrary implementations of the above two callback handler classes gives us the required flexibility.  As an example, the SASL Server callback handler implementation could leverage an open source JOSE library to parse the compact serialization, retrieve the public key if it has not yet been retrieved, verify the digital signature, validate various token claims, and map the 'sub' claim to the OAuthBearerToken's principal name (which becomes the SASL authorization ID, which becomes the Kafka principal name).  By writing the callback handler code we have complete flexibility to meet the requirements of any particular OAuth 2 installation.

The following references may be helpful:

URLDescription
https://tools.ietf.org/html/rfc6749RFC 6749: The OAuth 2.0 Authorization Framework
https://tools.ietf.org/html/rfc6750RFC 6750: The OAuth 2.0 Authorization Framework: Bearer Token Usage
https://tools.ietf.org/html/rfc7519

RFC 7519: JSON Web Token (JWT)

https://tools.ietf.org/html/rfc7515RFC 7515: JSON Web Signature (JWS)
https://tools.ietf.org/html/rfc7516RFC 7516: JSON Web Encryption (JWE)
https://tools.ietf.org/html/rfc7628RFC 7628: A Set of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth

Proposed Changes

Thanks to KIP-86: Configurable SASL callback handlers, no changes to existing public interfaces are required – all functionality represents additions rather than changes.  The only changes to the existing implementation are to define appropriate default callback handler/login classes for SASL/OAUTHBEARER.

...