Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Adopted in v2.0

Table of Contents

Status

Current state: AcceptedAdopted (in 2.0)

Discussion thread:  here

JIRA: KAFKA-6562

...

  • Allow clients (both brokers when SASL/OAUTHBEARER is the inter-broker protocol as well as non-broker clients) to flexibly retrieve an access token from an OAuth 2 authorization server based on the declaration of a custom login CallbackHandler implementation and have that access token transparently and automatically transmitted to a broker for authentication.
  • Allow brokers to flexibly validate provided access tokens when a client establishes a connection based on the declaration of a custom SASL Server CallbackHandler implementation.
  • Provide implementations of the above retrieval and validation features based on an unsecured JSON Web Token that function out-of-the-box with minimal configuration required (i.e. implementations of the two types of callback handlers mentioned above will be used by default with no need to explicitly declare them).  This unsecured functionality serves two purposes: first, it provides a way for SASL/OAUTHBEARER to be used in development scenarios out-of-the-box with no OAuth 2 infrastructure required; and second, it provides a way to test the SASL implementation itself.  See Rejected Alternatives: Motivation.
  • Allow clients (both brokers when SASL/OAUTHBEARER is the inter-broker protocol as well as non-broker clients) to transparently retrieve a new access token in the background before the existing access token expires in case the client has to open new connections.  Any existing connections will remain unaffected by this "token refresh" functionality as long as the connection remains intact, but new connections from the same client will always use the latest access token (either the initial one or the one that was most recently retrieved by the token refresh functionality, if any).  This is how Kerberos-authenticated connections work with respect to ticket expiration.  This KIP does not attempt to unify the refresh-related code for the OAUTHBEARER and GSSAPI mechanisms, but it does include code that suggests a potential path forward if this unification is desired in the future.

...

Finally, note that the implementation of flexible, substitution-aware configuration that was originally proposed in an early draft of this KIP was at first deemed more generally useful and was separated out into its own KIP-269: Substitution Within Configuration Values, but that KIP is likely to be rejected/moved to the inactive list and is not required for this KIP (see Rejected Alternatives: Substitution Within Configuration Values).

Public Interfaces

The public interface for this KIP consists of 3 Java classes and 1 Java interface along with various configuration possibilities.  The following sections define these public-facing parts of this KIP, including an overall UML diagram and important code details (with Javadoc) where appropriate.

OAuth Bearer Tokens and Token Retrieval

See Rejected Alternatives: Explicit Configuration of Token Refresh Class

See Rejected Alternatives: Callback Handlers and Callbacks

OAuth Bearer Token Retrieval

We define org.apache.kafka.common.security.oauthbearer.OAuthBearerToken to be the interface that all OAuth 2 bearer tokens must implement within the context of Kafka's SASL/OAUTHBEARER implementation.  Scenarios that leverage open source JWT/JWS/JWE implementations must wrap the library's implementation of a token to implement this interface.

...

Here are the parameters that can be provided as part of the JAAS configuration to determine the claims that appear in an unsecured OAuth Bearer Token generated by the default, out-of-the-box AuthenticateCallbackHandler implementation.

JAAS Module Option for Unsecured Token RetrievalDocumentation
unsecuredLoginStringClaim_<claimname>="value"

Creates a String claim with the given name and value. Any valid claim name
can be specified except 'iat' and 'exp' (these are automatically generated).

unsecuredLoginNumberClaim_<claimname>="value"

Creates a Number claim with the given name and value. Any valid claim name
can be specified except 'iat' and 'exp' (these are automatically generated).

unsecuredLoginListClaim_<claimname>="value"

Creates a String List claim with the given name and values parsed from the
given
value where the first character is taken as the delimiter. For example:
unsecuredLoginListClaim_fubar="|value1|value2".
Any valid claim name can be specified except '
iat' and 'exp' (these are
automatically generated).

unsecuredLoginPrincipalClaimNameSet to a custom claim name if you wish the name of the String claim holding
the principal name to be something other than '
sub'.
unsecuredLoginLifetimeSeconds

Set to an integer value if the token expiration is to be set to something other
than the default value of 3600 seconds (which is 1 hour). The 'exp' claim will
be set to reflect the expiration time.

unsecuredLoginScopeClaimNameSet to a custom claim name if you wish the name of the String or String List
claim holding any token scope to be something other than '
scope'.

Here is a typical, basic JAAS configuration for a client leveraging unsecured SASL/OAUTHBEARER authentication:

...

An implementation of the org.apache.kafka.common.security.auth.Login interface specific to the OAUTHBEARER mechanism is automatically applied; it periodically refreshes any token before it expires so that the client can continue to make connections to brokers.  The parameters that impact how the refresh algorithm operates are specified as part of the producer/consumer/broker configuration; they are as follows (the defaults are generally reasonable, so explicit configuration may not be necessary):

 


Producer/Consumer/Broker Configuration PropertyDocumentation
sasl.login.refresh.window.factorLogin refresh thread will sleep until the specified window factor relative to the credential's
lifetime has been reached, at which time it will try to refresh the credential. Legal values
are between 0.5 (50%) and 1.0 (100%) inclusive; a default value of 0.8 (80%) is used if
no value is specified. Currently applies only to OAUTHBEARER.
sasl.login.refresh.window.jitterThe maximum amount of random jitter relative to the credential's lifetime that is added to
the login refresh thread's sleep time. Legal values are between 0 and 0.25 (25%) inclusive;
a default value of 0.05 (5%) is used if no value is specified. Currently applies only to
OAUTHBEARER.
sasl.login.refresh.min.period.seconds

The desired minimum time for the login refresh thread to wait before refreshing a credential,
in seconds. Legal values are between 0 and 900 (15 minutes); a default value of 60
(1 minute) is used if no value is specified.
This value and
sasl.login.refresh.buffer.seconds are both ignored if their sum exceeds the
remaining lifetime of a credential. Currently applies only to OAUTHBEARER.

sasl.login.refresh.buffer.secondsThe amount of buffer time before credential expiration to maintain when refreshing a
credential, in seconds. If a refresh would otherwise occur closer to expiration than the
number of buffer seconds then the refresh will be moved up to maintain as much of the
buffer time as possible, overriding all other considerations. Legal values are between 0 and
3600 (1 hour); a default value of 300 (5 minutes) is used if no value is specified. This value
and sasl.login.refresh.min.period.seconds are both ignored if their sum exceeds
the remaining lifetime of a credential.
Currently applies only to OAUTHBEARER.


Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerToken
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

/**
 * The <code>b64token</code> value as defined in
 * <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section
 * 2.1</a> along with the token's specific scope and lifetime and principal
 * name.
 * <p>
 * A network request would be required to re-hydrate an opaque token, and that
 * could result in (for example) an {@code IOException}, but retrievers for
 * various attributes ({@link #scope()}, {@link #lifetimeMs()}, etc.) declare no
 * exceptions. Therefore, if a network request is required for any of these
 * retriever methods, that request could be performed at construction time so
 * that the various attributes can be reliably provided thereafter. For example,
 * a constructor might declare {@code throws IOException} in such a case.
 * Alternatively, the retrievers could throw unchecked exceptions.
 *
 * @see <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749
 *      Section 1.4</a> and
 *      <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
 *      Section 2.1</a>
 */
public interface OAuthBearerToken {
    /**
     * The <code>b64token</code> value as defined in
     * <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section
     * 2.1</a>
     *
     * @return <code>b64token</code> value as defined in
     *         <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
     *         Section 2.1</a>
     */
    String value();

    /**
     * The token's scope of access, as per
     * <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749 Section
     * 1.4</a>
     *
     * @return the token's (always non-null but potentially empty) scope of access,
     *         as per <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC
     *         6749 Section 1.4</a>. Note that all values in the returned set will
     *         be trimmed of preceding and trailing whitespace, and the result will
     *         never contain the empty string.
     */
    Set<String> scope();

    /**
     * The token's lifetime, expressed as the number of milliseconds since the
     * epoch, as per <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC
     * 6749 Section 1.4</a>
     *
     * @return the token'slifetime, expressed as the number of milliseconds since
     *         the epoch, as per
     *         <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749
     *         Section 1.4</a>.
     */
    long lifetimeMs();

    /**
     * The name of the principal to which this credential applies
     *
     * @return the always non-null/non-empty principal name
     */
    String principalName();

    /**
     * When the credential became valid, in terms of the number of milliseconds
     * since the epoch, if known, otherwise null. An expiring credential may not
     * necessarily indicate when it was created -- just when it expires -- so we
     * need to support a null return value here.
     *
     * @return the time when the credential became valid, in terms of the number of
     *         milliseconds since the epoch, if known, otherwise null
     */
    Long startTimeMs();
}

...

Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

/**
 * A {@code Callback} for use by the {@code SaslClient} and {@code Login}
 * implementations when they require an OAuth 2 bearer token. Callback handlers
 * should use the {@link #error(String, String, String)} method to communicate
 * errors returned by the authorization server as per
 * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
 * 2.0 Authorization Framework</a>. Callback handlers should communicate other
 * problems by raising an {@code IOException}.
 */
public class OAuthBearerTokenCallback implements Callback {
    private OAuthBearerToken token = null;
    private String errorCode = null;
    private String errorDescription = null;
    private String errorUri = null;

    /**
     * Return the (potentially null) token
     *
     * @return the (potentially null) token
     */
    public OAuthBearerToken token() {
        return token;
    }

    /**
     * Return the (always non-empty) error code as per
     * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
     * 2.0 Authorization Framework</a>.
     *
     * @return the (always non-empty) error code
     */
    public String errorCode() {
        return errorCode;
    }

    /**
     * Return the (potentially null) error description as per
     * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
     * 2.0 Authorization Framework</a>.
     *
     * @return the (potentially null) error description
     */
    public String errorDescription() {
        return errorDescription;
    }

    /**
     * Return the (potentially null) error URI as per
     * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
     * 2.0 Authorization Framework</a>.
     *
     * @return the (potentially null) error URI
     */
    public String errorUri() {
        return errorUri;
    }

    /**
     * Set the token. All error-related values are cleared.
     *
     * @param token
     *            the mandatory token to set
     */
    public void token(OAuthBearerToken token) {
        this.token = Objects.requireNonNull(token);
        this.errorCode = null;
        this.errorDescription = null;
        this.errorUri = null;
    }

    /**
     * Set the error values as per
     * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
     * 2.0 Authorization Framework</a>. Any token is cleared.
     *
     * @param errorCode
     *            the mandatory error code to set
     * @param errorDescription
     *            the optional error description to set
     * @param errorCode
     *            the optional error URI to set
     */
    public void error(String errorCode, String errorDescription, String errorUri) {
        if (Objects.requireNonNull(errorCode).isEmpty())
            throw new IllegalArgumentException("error code must not be empty");
        this.errorCode = errorCode;
        this.errorDescription = errorDescription;
        this.errorUri = errorUri;
        this.token = null;
    }
}

...


Token Validation

See Rejected Alternatives: Callback Handlers and Callbacks

We define the org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback class as the callback class for communicating that we want to validate a bearer token compact serialization provided by the connecting client.  When a broker accepts a SASL/OAUTHBEARER connection the instance of the builtin SaslServer implementation asks its configured AuthenticateCallbackHandler implementation to handle an instance of OAuthBearerValidatorCallback constructed with the OAuth 2 Bearer Token's compact serialization and return an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerToken if the compact serialization validates. A default, builtin AuthenticateCallbackHandler implementation validates an unsecured token as defined by these JAAS module options:

JAAS Module Option for Unsecured Token ValidationDocumentation
unsecuredValidatorPrincipalClaimName="value"

Set to a non-empty value if you wish a particular String claim holding
a principal name to be checked for existence; the default is to check for the
existence of the 'sub' claim.

unsecuredValidatorScopeClaimName="value"

Set to a custom claim name if you wish the name of the String or String
List claim holding any token scope to be something other than 'scope'

unsecuredValidatorRequiredScope="value"

Set to a space-delimited list of scope values if you wish the
String/String List claim holding the token scope to be checked to make sure
it contains certain values.

unsecuredValidatorAllowableClockSkewMs="value"Set to a positive integer value if you wish to allow up to some number of
positive milliseconds of clock skew (the default is 0).

Here is a typical, basic JAAS configuration for a broker leveraging unsecured SASL/OAUTHBEARER authentication: 

...

The validated token will be available as a negotiated property on the SaslServer instance with the key OAUTHBEARER.token so it can be used for authorization as per KIP-189: Improve principal builder interface and add support for SASL.  Note that the implementation of SaslServer is not part of the public interface – just the key where it makes the validated token available.

 


Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

/**
 * A {@code Callback} for use by the {@code SaslServer} implementation when it
 * needs to provide an OAuth 2 bearer token compact serialization for
 * validation. Callback handlers should use the
 * {@link #error(String, String, String)} method to communicate errors back to
 * the SASL Client as per
 * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
 * 2.0 Authorization Framework</a> and the <a href=
 * "https://www.iana.org/assignments/oauth-parameters/oauth-parameters.xhtml#extensions-error">IANA
 * OAuth Extensions Error Registry</a>. Callback handlers should communicate
 * other problems by raising an {@code IOException}.
 */
public class OAuthBearerValidatorCallback implements Callback {
    private final String tokenValue;
    private OAuthBearerToken token = null;
    private String errorStatus = null;
    private String errorScope = null;
    private String errorOpenIDConfiguration = null;

    /**
     * Constructor
     *
     * @param tokenValue
     *            the mandatory/non-blank token value
     */
    public OAuthBearerValidatorCallback(String tokenValue) {
        if (Objects.requireNonNull(tokenValue).isEmpty())
            throw new IllegalArgumentException("token value must not be empty");
        this.tokenValue = tokenValue;
    }

    /**
     * Return the (always non-null) token value
     *
     * @return the (always non-null) token value
     */
    public String tokenValue() {
        return tokenValue;
    }

    /**
     * Return the (potentially null) token
     *
     * @return the (potentially null) token
     */
    public OAuthBearerToken token() {
        return token;
    }

    /**
     * Return the (potentially null) error status value as per
     * <a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
     * of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>
     * and the <a href=
     * "https://www.iana.org/assignments/oauth-parameters/oauth-parameters.xhtml#extensions-error">IANA
     * OAuth Extensions Error Registry</a>.
     *
     * @return the (potentially null) error status value
     */
    public String errorStatus() {
        return errorStatus;
    }

    /**
     * Return the (potentially null) error scope value as per
     * <a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
     * of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>.
     *
     * @return the (potentially null) error scope value
     */
    public String errorScope() {
        return errorScope;
    }

    /**
     * Return the (potentially null) error openid-configuration value as per
     * <a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
     * of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>.
     *
     * @return the (potentially null) error openid-configuration value
     */
    public String errorOpenIDConfiguration() {
        return errorOpenIDConfiguration;
    }

    /**
     * Set the token. The token value is unchanged and is expected to match the
     * provided token's value. All error values are cleared.
     *
     * @param token
     *            the mandatory token to set
     */
    public void token(OAuthBearerToken token) {
        this.token = Objects.requireNonNull(token);
        this.errorStatus = null;
        this.errorScope = null;
        this.errorOpenIDConfiguration = null;
    }

    /**
     * Set the error values as per
     * <a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
     * of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>.
     * Any token is cleared.
     *
     * @param errorStatus
     *            the mandatory error status value from the <a href=
     *            "https://www.iana.org/assignments/oauth-parameters/oauth-parameters.xhtml#extensions-error">IANA
     *            OAuth Extensions Error Registry</a> to set
     * @param errorScope
     *            the optional error scope value to set
     * @param errorStatus
     *            the optional error openid-configuration value to set
     */
    public void error(String errorStatus, String errorScope, String errorOpenIDConfiguration) {
        if (Objects.requireNonNull(errorStatus).isEmpty())
            throw new IllegalArgumentException("error status must not be empty");
        this.errorStatus = errorStatus;
        this.errorScope = errorScope;
        this.errorOpenIDConfiguration = errorOpenIDConfiguration;
        this.token = null;
    }
}

...

The following table summarizes the proposed configuration for Kafka's SASL/OAUTHBEARER implementation.

ConfigurationExample Value

Likelihood of Value Being Different

from the Example Value

Notes
JAAS Login Module

org.apache.kafka.common.security.oauthbearer.

OAuthBearerLoginModule

Low
 

Non-Broker:

 sasl.login.callback.handler.class

Broker:

 listener.name.sasl_ssl.oauthbearer.

   sasl.login.callback.handler.class

 


org.example.MyLoginCallbackHandler

 


High

 


Default implementation creates an unsecured OAuth Bearer Token

listener.name.sasl_ssl.oauthbearer.

  sasl.server.callback.handler.class

org.example.MyValidatorCallbackHandler

HighDefault implementation validates an unsecured OAuth Bearer Token
JAAS Module OptionsVariedHigh

Used to configure the above sasl.login and sasl.server callback handlers.

producer/consumer/broker configs: sasl.login.refresh.*Varied, though defaults are reasonableLow
 

Summary for Production Use

...

  • A login callback handler that can retrieve an OAuth 2 bearer token from the token endpoint and wrap that token as an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerToken. It must attempt to do this when it is asked to handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback.
  • A SASL Server callback handler that can validate an OAuth 2 bearer token compact serialization and convert that compact serialization to an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerToken.  It must attempt to do this when it is asked to handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback.  Note that this may entail secure retrieval (and perhaps caching) of a public key along with digital signature verification if the token is a JSON Web Signature (JWS); validation of various token claims such as the 'iat' (Issued At), 'nbf' (Not Before), 'exp' (Expiration Time), 'aud' (Audience), and 'iss' (Issuer) claims if the token is a JSON Web Token (JWT); and/or decryption if the token is encrypted as a JSON Web Encryption (JWE).

See Summary of Configuration 84803841 for details on how to declare these two classes to Kafka.

...

The following references may be helpful:

URLDescription
https://tools.ietf.org/html/rfc6749RFC 6749: The OAuth 2.0 Authorization Framework
https://tools.ietf.org/html/rfc6750RFC 6750: The OAuth 2.0 Authorization Framework: Bearer Token Usage
https://tools.ietf.org/html/rfc7519

RFC 7519: JSON Web Token (JWT)

https://tools.ietf.org/html/rfc7515RFC 7515: JSON Web Signature (JWS)
https://tools.ietf.org/html/rfc7516RFC 7516: JSON Web Encryption (JWE)
https://tools.ietf.org/html/rfc7628RFC 7628: A Set of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth

Proposed Changes

Thanks to KIP-86: Configurable SASL callback handlers, no changes to existing public interfaces are required – all functionality represents additions rather than changes.  The only changes to the existing implementation are to define appropriate default callback handler/login classes for SASL/OAUTHBEARER.

...

We decided to leverage the standard JAAS Callback/CallbackHandler mechanism for communicating information between various components (specifically, between the SASL Client and the Login Module, between the Login Module and the Login/Token Retrieval mechanism, and between the SASL Server and the Token Validation mechanism).  We had originally documented the need for just the last two (token retrieval and token validation), and the original proposal was to declare the class names as part of the JAAS configuration.  The only real benefit to doing this was that the declaration of the classes and their configuration were co-located in the JAAS configuration.  We decided the benefit of consistency was at least as valuable as any cost associated with separating the declaration from the configuration – and probably more valuable given that this is how configuration is done for other SASL mechanisms and is therefore not unfamiliar.  We also now leverage callback handlers in all three places where it is supported, though one of them (SASL Client callback handler) has no need for explicit configuration in any known use case.  Finally, we adjusted the logic in SaslChannelBuilder and LoginManager to automatically apply default AuthenticateCallbackHandler and Login implementations for SASL/OAUTHBEARER in the absence of explicit declarations, which simplifies the out-of-the-box experience for unsecured (i.e. development and testing) use cases.