Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Adopted in v2.0

Table of Contents

Status

Current state: "Under Discussion" Adopted (in 2.0)

Discussion thread:  here

JIRA: KAFKA-6562

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The ability to authenticate to Kafka with an OAuth 2 Access Token is desirable given the popularity of OAuth.  The introduction of KIP-86 (Configurable SASL callback handlers) makes it possible to non-intrusively add new SASL mechanisms, and OAUTHBEARER is the SASL mechanism for OAuth.  OAuth 2 is a flexible framework with multiple ways of doing the same thing (for example, getting a public key to confirm a digital signature), so pluggable implementations – and flexibility in their configuration – is a requirement; the introduction of KIP-86: Configurable SASL callback handlers provides the necessary flexibility.  "OAUTHBEARER" is the SASL mechanism for OAuth 2.

This KIP proposes to add the following functionality related to SASL/OAUTHBEARER:

  • Allow clients (both brokers when SASL/OAUTHBEARER is the inter-broker protocol as well as non-broker clients) to flexibly retrieve an access token from an OAuth 2 authorization server according to JAAS-defined configuration (and potentially plugged-in implementation) and present the access token based on the declaration of a custom login CallbackHandler implementation and have that access token transparently and automatically transmitted to a broker for authentication.
  • Allow brokers to flexibly validate provided access tokens when a client establishes a connection based on JAAS-defined configuration (and potentially plugged-in implementation)on the declaration of a custom SASL Server CallbackHandler implementation.
  • Provide a toolkit of reusable functionality applicable to common token retrieval and validation scenarios so that implementations can either be reused or, when they are custom and plugged-in, be relatively small.implementations of the above retrieval and validation features based on an unsecured JSON Web Token that function out-of-the-box with minimal configuration required (i.e. implementations of the two types of callback handlers mentioned above will be used by default with no need to explicitly declare them).  This unsecured functionality serves two purposes: first, it provides a way for SASL/OAUTHBEARER to be used in development scenarios out-of-the-box with no OAuth 2 infrastructure required; and second, it provides a way to test the SASL implementation itself.  See Rejected Alternatives: Motivation.
  • Allow clients (both brokers when SASL/OAUTHBEARER is the inter-broker protocol as well as non-broker clients) Allow clients to transparently retrieve a new access token in the background before the existing access token expires in case the client has to open new connections.  Any existing connections will remain unaffected by this "token refresh" functionality as long as the connection remains intact, but new connections from the same client will always use the latest access token (either the initial one or the one that was most recently retrieved by the token refresh functionality, if any) – this .  This is how Kerberos-authenticated connections work with respect to ticket expiration.  This KIP does not attempt to unify the refresh-related code for the OAUTHBEARER and GSSAPI mechanisms, but it does include code that suggests a potential path forward if this unification is desired in the future.

Note that the access token can be made available to the broker for authorization decisions due to KIP-189: Improve principal builder interface and add support for SASL (by exposing the access token via a negotiated property on the SaslServer implementation), but detailed discussion of this possibility is outside the scope of this proposal.  It is noted, however, that if access tokens are somehow used for authorization decisions, it is conceivable due to the long-lived nature of Kafka connections that authorization decisions will sometimes be made using expired access tokens.  For example, it is up to the broker to validate the token upon authentication, but the token will not be replaced for that particular connection as long as it remains intact; if the token expires in an hour then authorization decisions for that first hour will be made using the still-valid token, but after an hour the expired token would remain associated with the connection, and authorization decisions from that point forward for that particular connection would be made using the expired token.  This would have to be addressed via a separate KIP if it turns out to be problematic, but that seems unlikely (code signing certificates that have been timestamped remain valid after their expiration, for example, and access tokens are indeed timestamped)..  Another related issue that would need to be addressed is how to revoke authorizations.  Connections are long-lived, and bearer tokens are immutable, so a mechanism to evolve or revoke permissions over time would have to exist.  Again, this is outside the scope of this KIP.

Finally, note Note also that the implementation of flexible, substitution-aware configuration that was originally proposed in an early draft of this KIP was at first deemed more generally useful and has been was separated out into its own KIP-269: Substitution Within Configuration Values, which is now a prerequisite for this onebut that KIP is likely to be rejected/moved to the inactive list and is not required for this KIP (see Rejected Alternatives: Substitution Within Configuration Values).

Public Interfaces

Note that most of the implementation of this KIP will be public-facingThe public interface for this KIP consists of 3 Java classes and 1 Java interface along with various configuration possibilities.  The following sections define the various parts, and each includes these public-facing parts of this KIP, including an overall UML diagram as well as and important code details (with Javadoc) where appropriate.

...

OAuth Bearer Tokens and Token Retrieval

See Rejected Alternatives: Explicit Configuration of Token Refresh Class

See Rejected Alternatives: Exceptions

Image Removed

We define a small exception hierarchy to cover the various cases related to the SASL/OAUTHBEARER code.

...

Callback Handlers and Callbacks

OAuth Bearer Token RetrievalImage Added

...

We define 

org.apache.kafka.common.security.oauthbearer.

...

package org.apache.kafka.common.security.oauthbearer;
 
/**
 * Base class for all exceptions thrown by the SASL/OAUTHBEARER code.
 */
public abstract class OAuthBearerException extends KafkaException {
    // etc...
}

OAuthBearerToken to be the interface that all OAuth 2 bearer tokens must implement within the context of Kafka's SASL/OAUTHBEARER implementation.  Scenarios that leverage open source JWT/JWS/JWE implementations must wrap the library's implementation of a token to implement this interface.

...

The 

org.apache.kafka.common.security.oauthbearer.

...

package org.apache.kafka.common.security.oauthbearer;
 
/**
 * Exception thrown when there is a problem with the configuration (an invalid
 * option in a JAAS config, for example).
 */
public class OAuthBearerConfigException extends OAuthBearerException {
    // etc...
}

...

OAuthBearerLoginModule class is the JAAS login module that is declared in the JAAS configuration.  When a client (whether a non-broker client or a broker when SASL/OAUTHBEARER is the inter-broker protocol) connects to Kafka the OAuthBearerLoginModule instance asks its configured  AuthenticateCallbackHandler implementation to handle an instance of org.apache.kafka.common.security.oauthbearer.

...

package org.apache.kafka.common.security.oauthbearer;
 
/**
 * Exception thrown when token validation fails due to a problem with the token
 * itself (as opposed to a missing remote resource or a configuration problem)
 */
public class OAuthBearerIllegalTokenException extends OAuthBearerException {
    /**
     * Constructor
     *
     * @param reason
     *            the mandatory reason for the validation failure; it must indicate
     *            failure
     */
    public OAuthBearerIllegalTokenException(OAuthBearerValidationResult reason) {
        super(Objects.requireNonNull(reason).failureDescription());
        if (reason.success())
            throw new IllegalArgumentException("The reason indicates success; it must instead indicate failure");
        this.reason = reason;
    }

    /**
     * Return the (always non-null) reason for the validation failure
     *
     * @return the reason for the validation failure
     */
    public OAuthBearerValidationResult reason() {
        return reason;
    }
}

OAuthBearerTokenCallback and retrieve/return an instance of OAuthBearerToken.  A default, builtin AuthenticateCallbackHandler implementation creates an unsecured token as defined by the JAAS module options – see below for configuration details – when another implementation is not explicitly specified.  Production use cases will require writing an implementation of AuthenticateCallbackHandler that can handle an instance of OAuthBearerTokenCallback and declaring it via either the sasl.login.callback.handler.class configuration option for a non-broker client or via the listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class configuration option for brokers (when SASL/OAUTHBEARER is the inter-broker protocol).

Here are the parameters that can be provided as part of the JAAS configuration to determine the claims that appear in an unsecured OAuth Bearer Token generated by the default, out-of-the-box AuthenticateCallbackHandler implementation.

JAAS Module Option for Unsecured Token RetrievalDocumentation
unsecuredLoginStringClaim_<claimname>="value"

Creates a String claim with the given name and value. Any valid claim name
can be specified except 'iat' and 'exp' (these are automatically generated).

unsecuredLoginNumberClaim_<claimname>="value"

Creates a Number claim with the given name and value. Any valid claim name
can be specified except 'iat' and 'exp' (these are automatically generated).

unsecuredLoginListClaim_<claimname>="value"

Creates a String List claim with the given name and values parsed from the
given
value where the first character is taken as the delimiter. For example:
unsecuredLoginListClaim_fubar="|value1|value2".
Any valid claim name can be specified except '
iat' and 'exp' (these are
automatically generated).

unsecuredLoginPrincipalClaimNameSet to a custom claim name if you wish the name of the String claim holding
the principal name to be something other than '
sub'.
unsecuredLoginLifetimeSeconds

Set to an integer value if the token expiration is to be set to something other
than the default value of 3600 seconds (which is 1 hour). The 'exp' claim will
be set to reflect the expiration time.

unsecuredLoginScopeClaimNameSet to a custom claim name if you wish the name of the String or String List
claim holding any token scope to be something other than '
scope'.

Here is a typical, basic JAAS configuration for a client leveraging unsecured SASL/OAUTHBEARER authentication:

 KafkaClient {
   

...

org.apache.kafka.common.security.oauthbearer.

...

OAuthBearerLoginModule Required
    unsecuredLoginStringClaim_sub="thePrincipalName";
};

An implementation of the org.apache.kafka.common.security.auth.Login interface specific to the OAUTHBEARER mechanism is automatically applied; it periodically refreshes any token before it expires so that the client can continue to make connections to brokers.  The parameters that impact how the refresh algorithm operates are specified as part of the producer/consumer/broker configuration; they are as follows (the defaults are generally reasonable, so explicit configuration may not be necessary):


Producer/Consumer/Broker Configuration PropertyDocumentation
sasl.login.refresh.window.factorLogin refresh thread will sleep until the specified window factor relative to the credential's
lifetime has been reached, at which time it will try to refresh the credential. Legal values
are between 0.5 (50%) and 1.0 (100%) inclusive; a default value of 0.8 (80%) is used if
no value is specified. Currently applies only to OAUTHBEARER.
sasl.login.refresh.window.jitterThe maximum amount of random jitter relative to the credential's lifetime that is added to
the login refresh thread's sleep time. Legal values are between 0 and 0.25 (25%) inclusive;
a default value of 0.05 (5%) is used if no value is specified. Currently applies only to
OAUTHBEARER.
sasl.login.refresh.min.period.seconds

The desired minimum time for the login refresh thread to wait before refreshing a credential,
in seconds. Legal values are between 0 and 900 (15 minutes); a default value of 60
(1 minute) is used if no value is specified.
This value and
sasl.login.refresh.buffer.seconds are both ignored if their sum exceeds the
remaining lifetime of a credential. Currently applies only to OAUTHBEARER.

sasl.login.refresh.buffer.secondsThe amount of buffer time before credential expiration to maintain when refreshing a
credential, in seconds. If a refresh would otherwise occur closer to expiration than the
number of buffer seconds then the refresh will be moved up to maintain as much of the
buffer time as possible, overriding all other considerations. Legal values are between 0 and
3600 (1 hour); a default value of 300 (5 minutes) is used if no value is specified. This value
and sasl.login.refresh.min.period.seconds are both ignored if their sum exceeds
the remaining lifetime of a credential.
Currently applies only to OAUTHBEARER.


Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerToken
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

/**
 * The <code>b64token</code> value as defined in
 * <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section
 * 2.1</a> along with the token's specific scope and lifetime and principal
 * name.
 * <p>
 * A network request would be required to re-hydrate an opaque token, and that
 * could result in (for example) an {@code IOException}, but retrievers for
 * various attributes ({@link #scope()}, {@link #lifetimeMs()}, etc.) declare no
 * exceptions. Therefore, if a network request is required for any of these
 * retriever methods, that request could be performed at construction time so
 * that the various attributes can be reliably provided thereafter. For example,
 * a constructor might declare {@code throws IOException} in such a case.
 * Alternatively, the retrievers could throw unchecked exceptions.
 *
 * @see <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749
 *      Section 1.4</a> and
 *      <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
 *      Section 2.1</a>
 */
public interface OAuthBearerToken {
    /**
     * The <code>b64token</code> value as defined in
     * <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section
     * 2.1</a>
     .oauthbearer;

/**
 * The result of some kind of token validation
 */
public class OAuthBearerValidationResult implements Serializable {
 
    // etc...

    /**
     * Return an instance indicating success
     *
     * @return an instance indicating success
     */
    public static OAuthBearerValidationResult newSuccess() {
        return new OAuthBearerValidationResult(true, null, null, null);
    }

    /**
     * Return a new validation failure instance
     *
     * @param failureDescription
     *            optional description of the failure
     * @return a new validation failure instance
     */
    public static OAuthBearerValidationResult newFailure(String failureDescription) {
        return newFailure(failureDescription, null, null);
    }

    /**
     * Return a new validation failure instance
     *
     * @param failureDescription
     *            optional description of the failure
     * @param failureScope
     *            optional scope to be reported with the failure
     * @param failureOpenIdConfig
     *            optional OpenID Connect configuration to be reported with the
     *            failure
     * @return a new validation failure instance
     */
    public static OAuthBearerValidationResult newFailure(String failureDescription, String failureScope,
            String failureOpenIdConfig) {
        return new OAuthBearerValidationResult(false, failureDescription, failureScope, failureOpenIdConfig);
    }
 
    private OAuthBearerValidationResult(boolean success, String failureDescription, String failureScope,
            String failureOpenIdConfig) {
        // etc...
    }
    /**
     * Return true if this instance indicates success, otherwise false
     *
     * @return true if this instance indicates success, otherwise false
     */
    public boolean success() {
        return success;
    }

    /**
     * Return@return the (potentially null) descriptive message for the failure<code>b64token</code> value as defined in
     *
     * @return the (potentially null) descriptive message for the failure<a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
     */
    public String failureDescription() {
   Section 2.1</a>
    return failureDescription;*/
    }String value();

    /**
     * ReturnThe the (potentially null) token's scope toof beaccess, reported with the failureas per
     *
 <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749 Section
 * @return the (potentially null) scope to be reported with the failure* 1.4</a>
     *
     */
    public String failureScope() { @return the token's (always non-null but potentially empty) scope of access,
     *   return failureScope;
    }

 as per <a /**href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC
     * Return the (potentially null) OpenID Connect configuration to be6749 reported
     * withSection 1.4</a>. Note that all values in the failure
returned set    *will
     * @return the (potentially null) OpenID Connect configuration to be reported
trimmed of preceding and trailing *whitespace, and the result will
     with* the failure
     */
  never contain publicthe String failureOpenIdConfig() {
empty string.
     */
    returnSet<String> failureOpenIdConfig;
    }
 scope();

    /**
     * RaiseThe an exception if this instance indicates failure, otherwise do nothingtoken's lifetime, expressed as the number of milliseconds since the
     * epoch, as per <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC
     * 6749 @throws OAuthBearerIllegalTokenExceptionSection 1.4</a>
     *
     * @return the token'slifetime, expressed as the number ifof this instance indicates failuremilliseconds since
     */
    public void throwExceptionIfFailed() throws OAuthBearerIllegalTokenException {
    the epoch, as per
     *     if (!success())    <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749
     *         Section 1.4</a>.
     */
   throw newlong OAuthBearerIllegalTokenExceptionlifetimeMs(this);

    }
}

 

Token Refresh

See Rejected Alternatives: Token Refresh

Token RefreshImage Removed

We define the org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredential and org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredentialLoginModule interfaces and the org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredentialRefreshingLogin class that can refresh expiring credentials.  We represent many of the parameters that impact how the refresh algorithm operates as a single map of enum/value pairs rather than as separate methods – this helps ensure forward compatibility if/when we decide we want to add more potential parameters (having the flexibility to add more parameters without adding methods to an interface is especially important since Java 7 does not support default methods on interfaces – adding a new method breaks forward compatibnility).  We provide the org.apache.kafka.common.security.oauthbearer.refresh.RefreshConfig class to provide the methods that we did not add to any interface, and the org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredentialRefreshingLogin class will parse the properties from the JAAS config options (with appropriate defaults in case they are not explicitly specified).

 

Note that the token refresh functionality and the related classes and interfaces are not necessarily specific to OAuth Bearer Tokens, so an open question is whether this refresh-related code belongs in a sub-package of the main SASL/OAUTHBEARER one or if it should live somewhere else.  If this functionality ends up in elsewhere then perhaps RefreshConfigProp.parseValue(Object) should not throw an OAuthBearerConfigException but rather a different exception type.

 

/**
     * The name of the principal to which this credential applies
     *
     * @return the always non-null/non-empty principal name
     */
    String principalName();

    /**
     * When the credential became valid, in terms of the number of milliseconds
     * since the epoch, if known, otherwise null. An expiring credential may not
     * necessarily indicate when it was created -- just when it expires -- so we
     * need to support a null return value here.
     *
     * @return the time when the credential became valid, in terms of the number of
     *         milliseconds since the epoch, if known, otherwise null
     */
    Long startTimeMs();
}


refresh.RefreshConfigProp
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredentialOAuthBearerLoginModule
collapsetrue
package org.apache.kafka.common.security.oauthbearer.refresh;
 
/**
 * AThe credential{@code thatLoginModule} expiresfor andthe that can potentially be refreshedSASL/OAUTHBEARER mechanism. When a client
 *
 *(whether @see ExpiringCredentialRefreshingLogin
 */
public interface ExpiringCredential {
    /**
     * The name of the principal to which this credential applies (used only for
     * logging)
     *
     * @return the always non-null/non-empty principal name
     */
    String principalName();
    /**
     * When the credential became valid, in terms of the number of milliseconds
     * since the epoch, if known, otherwise null. An expiring credential may not
     * necessarily indicate when it was created -- just when it expires -- so we
     * need to support a null return value here.
     *
     * @return the time when the credential became valid, in terms of the number of
     *         milliseconds since the epoch, if known, otherwise null
     */
    Long startTimeMillis();
 
    /**
     * When the credential expires, in terms of the number of milliseconds since the
     * epoch. All expiring credentials by definition must indicate their expiration
     * time -- thus, unlike other methods, we do not support a null return value
     * here.
     *
     * @return the time when the credential expires, in terms of the number of
     *         milliseconds since the epoch
     */
    long expireTimeMillis();
 
    /**
     * The point after which the credential can no longer be refreshed, in terms of
     * the number of milliseconds since the epoch, if any, otherwise null. Some
     * expiring credentials can be refreshed over and over again without limit, so
     * we support a null return value here.
     *
     * @return the point after which the credential can no longer be refreshed, in
     *         terms of the number of milliseconds since the epoch, if any,
     *         otherwise null
     */
    Long absoluteLastRefreshMillis();
}
Code Block
languagejava
title
a non-broker client or a broker when SASL/OAUTHBEARER is the
 * inter-broker protocol) connects to Kafka the {@code OAuthBearerLoginModule}
 * instance asks its configured {@link AuthenticateCallbackHandler}
 * implementation to handle an instance of {@link OAuthBearerTokenCallback} and
 * return an instance of {@link OAuthBearerToken}. A default, builtin
 * {@link AuthenticateCallbackHandler} implementation creates an unsecured token
 * as defined by these JAAS module options:
 * <table>
 * <tr>
 * <th>JAAS Module Option for Unsecured Token Retrieval</th>
 * <th>Documentation</th>
 * </tr>
 * <tr>
 * <td>{@code unsecuredLoginStringClaim_<claimname>="value"}</td>
 * <td>Creates a {@code String} claim with the given name and value. Any valid
 * claim name can be specified except '{@code iat}' and '{@code exp}' (these are
 * automatically generated).</td>
 * </tr>
 * <tr>
 * <td>{@code unsecuredLoginNumberClaim_<claimname>="value"}</td>
 * <td>Creates a {@code Number} claim with the given name and value. Any valid
 * claim name can be specified except '{@code iat}' and '{@code exp}' (these are
 * automatically generated).</td>
 * </tr>
 * <tr>
 * <td>{@code unsecuredLoginListClaim_<claimname>="value"}</td>
 * <td>Creates a {@code String List} claim with the given name and values parsed
 * from the given value where the first character is taken as the delimiter. For
 * example: {@code unsecuredLoginListClaim_fubar="|value1|value2"}. Any valid
 * claim name can be specified except '{@code iat}' and '{@code exp}' (these are
 * automatically generated).</td>
 * </tr>
 * <tr>
 * <td>{@code unsecuredLoginPrincipalClaimName}</td>
 * <td>Set to a custom claim name if you wish the name of the String claim
 * holding the principal name to be something other than '{@code sub}'.</td>
 * </tr>
 * <tr>
 * <td>{@code unsecuredLoginLifetimeSeconds}</td>
 * <td>Set to an integer value if the token expiration is to be set to something
 * other than the default value of 3600 seconds (which is 1 hour). The
 * '{@code exp}' claim will be set to reflect the expiration time.</td>
 * </tr>
 * <tr>
 * <td>{@code unsecuredLoginScopeClaimName}</td>
 * <td>Set to a custom claim name if you wish the name of the String or String
 * List claim holding any token scope to be something other than
 * '{@code scope}'.</td>
 * </tr>
 * </table>
 * Production use cases will require writing an implementation of
 * {@link AuthenticateCallbackHandler} that can handle an instance of
 * {@link OAuthBearerTokenCallback} and declaring it via either the
 * {@code sasl.login.callback.handler.class} configuration option for a
 * non-broker client or via the
 * {@code listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class}
 * configuration option for brokers (when SASL/OAUTHBEARER is the inter-broker
 * protocol).
 * <p>
 * Here is a typical, basic JAAS configuration for a client leveraging unsecured
 * SASL/OAUTHBEARER authentication:
 *
 * <pre>
 * KafkaClient {
 *      org.apache.kafka.common.security.oauthbearer.
collapsetrue
package org.apache.kafka.common.security.oauthbearer.refresh;

/**OAuthBearerLoginModule Required
 * Individual refresh-related configuration properties defining how unsecuredLoginStringClaim_sub="thePrincipalName";
 * {@link ExpiringCredentialRefreshingLogin};
 refreshes instances of* </pre>
 *
 * {@link ExpiringCredential}. Each value has a type, optional min/max/defaultAn implementation of the {@link Login} interface specific to the
 * values, an ordered list of string keys (containing the enum name and the
 * camelCase version of the name, in that order) so it can be picked out of a
 * map keyed with Strings, and the ability to parse a string or non-string value
 * associated with one of its string keys in a map.
 *
 * @see RefreshConfig
 */
public enum RefreshConfigProp {
    /**
     * Background login refresh thread will sleep until the specified window factor
     * relative to the credential's total lifetime has been reached, at which time
     * it will try to refresh the credential. The default value is 0.8 (80%).
     */
    REFRESH_WINDOW_FACTOR(Double.class, 0.5, 1.0, 0.8),
    /**
     * Amount of random jitter added to the background login refresh thread's sleep
     * time. The default value is 0.05 (5%).
     */
    REFRESH_WINDOW_JITTER(Double.class, 0.0, 0.25, 0.05),
    /**
     * The minimum time between checks by the background login refresh thread,
     * regardless of other constraints, in milliseconds. The default value is 60,000
     * (1 minute).
     */
    REFRESH_MIN_PERIOD_MILLIS(Long.class, 0L, 1000L * 60 * 15, 1000L * 60 * 1),
    /**
     * If the {@code LoginModule} and {@code SaslClient} implementations support
     * multiple simultaneous login contexts on a single {@code Subject} at the same
     * time. If true, then upon refresh, logout will only be invoked on the original
     * {@code LoginContext} after a new one successfully logs in. This can be
     * helpful if the original credential still has some lifetime left when an
     * attempt to refresh the credential fails; the client will still be able to
     * create new connections as long as the original credential remains valid.
     * Otherwise, if logout is immediately invoked prior to relogin, a relogin
     * failure leaves the client without the ability to connect until relogin does
     * in fact succeed. The default value is false.
     */
    RELOGIN_ALLOWED_BEFORE_LOGOUT(Boolean.class, Boolean.FALSE, Boolean.TRUE, Boolean.FALSE);
    private RefreshConfigProp(Class<? extends Comparable<?>> valueType, Comparable<?> minValue, Comparable<?> maxValue,
            Comparable<?> defaultValue) {
        // etc...
    }
 
    /**
     * Return the always non-null value type
     *
     * @return the always non-null value type
     */
    public Class<? extends Comparable<?>> valueType(){@code OAUTHBEARER} mechanism is automatically applied; it periodically
 * refreshes any token before it expires so that the client can continue to make
 * connections to brokers. The parameters that impact how the refresh algorithm
 * operates are specified as part of the producer/consumer/broker configuration
 * are as follows. See the documentation for these properties elsewhere for
 * details.
 * <table>
 * <tr>
 * <th>Producer/Consumer/Broker Configuration Property</th>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.window.factor}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.window.jitter}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.min.period.seconds}</td>
 * </tr>
 * <tr>
 * <td>{@code sasl.login.refresh.min.buffer.seconds}</td>
 * </tr>
 * </table>
 * When a broker accepts a SASL/OAUTHBEARER connection the instance of the
 * builtin {@code SaslServer} implementation asks its configured
 * {@link AuthenticateCallbackHandler} implementation to handle an instance of
 * {@link OAuthBearerValidatorCallback} constructed with the OAuth 2 Bearer
 * Token's compact serialization and return an instance of
 * {@link OAuthBearerToken} if the value validates. A default, builtin
 * {@link AuthenticateCallbackHandler} implementation validates an unsecured
 * token as defined by these JAAS module options:
 * <table>
 * <tr>
 * <th>JAAS Module Option for Unsecured Token Validation</th>
 * <th>Documentation</th>
 * </tr>
 * <tr>
 * <td>{@code unsecuredValidatorPrincipalClaimName="value"}</td>
 * <td>Set to a non-empty value if you wish a particular String claim holding a
 * principal name to be checked for existence; the default is to check for the
 * existence of the '{@code sub}' claim.</td>
 * </tr>
 * <tr>
 * <td>{@code unsecuredValidatorScopeClaimName="value"}</td>
 * <td>Set to a custom claim name if you wish the name of the String or String
 * List claim holding any token scope to be something other than
 * '{@code scope}'.</td>
 * </tr>
 * <tr>
 * <td>{@code unsecuredValidatorRequiredScope="value"}</td>
 * <td>Set to a space-delimited list of scope values if you wish the
 * String/String List claim holding the token scope to be checked to make sure
 * it contains certain values.</td>
 * </tr>
 * <tr>
 * <td>{@code unsecuredValidatorAllowableClockSkewMs="value"}</td>
 * <td>Set to a positive integer value if you wish to allow up to some number of
 * positive milliseconds of clock skew (the default is 0).</td>
 * </tr>
 * </table>
 * Here is a typical, basic JAAS configuration for a broker leveraging unsecured
 * SASL/OAUTHBEARER validation:
 *
 * <pre>
 * KafkaServer {
 *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
 *      unsecuredLoginStringClaim_sub="thePrincipalName";
 * };
 * </pre>
 *
 * Production use cases will require writing an implementation of
 * {@link AuthenticateCallbackHandler} that can handle an instance of
 * {@link OAuthBearerValidatorCallback} and declaring it via the
 * {@code listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class}
 * configuration option.
 *
 * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC
 * @see SaslConfigs#SASL_LOGIN_REFRESH_MIN_BUFFER_SECONDS_DOC
 */
public class OAuthBearerLoginModule implements LoginModule {
    static {
        return valueType;
    }
    /**OAuthBearerSaslClientProvider.initialize(); // not part of public API
     * Return the minimum value, if any, as an instance of the value type, otherwise OAuthBearerSaslServerProvider.initialize(); // not part of public API
     * null}

     *
     * @return the minimum value, if any, as an instance of the value type,
     *         otherwise null
     */
    public Comparable<?> minValue() {
        return minValue;
    }

    /**
     * Return the maximum value, if any, as an instance of the value type, otherwise
     * null
     *
     * @return the maximum value, if any, as an instance of the value type,
     *         otherwise null// etc...
}


Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

/**
 * A {@code Callback} for use by the {@code SaslClient} and {@code Login}
 * implementations when they require an OAuth 2 bearer token. Callback handlers
 * should use the {@link #error(String, String, String)} method to communicate
 * errors returned by the authorization server as per
 * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
 * 2.0 Authorization Framework</a>. Callback handlers should communicate other
 * problems by raising an {@code IOException}.
 */
public class OAuthBearerTokenCallback implements Callback {
    private OAuthBearerToken token = null;
    private String errorCode = null;
    private String errorDescription = null;
    private String errorUri = null;

    /**
     * Return the (potentially null) token
     *
     * @return the (potentially null) token
     */
    public Comparable<?>OAuthBearerToken maxValuetoken() {
        return maxValuetoken;
    }

    /**
     * Return the default value, if any,(always non-empty) error code as an instance of the value type, otherwiseper
     * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
     * null 2.0 Authorization Framework</a>.
     *
     * @return the default value, if any, as an instance of the value type,(always non-empty) error code
     */
    public *         otherwise null
     */
    public Comparable<?> defaultValueString errorCode() {
        return defaultValueerrorCode;
    }
 
    /**
     * The ordered list of string keysReturn the (containingpotentially thenull) enumerror namedescription andas the camelCaseper
     * version of the name, in that order) so the instance can be picked out of a <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
     * 2.0 Authorization Framework</a>.
     *
     * map keyed with Strings @return the (potentially null) error description
     * /
    public * @return the always non-null/non-empty list of string keys (containing theString errorDescription() {
        return errorDescription;
    }

    /**
     * Return the (potentially null) error enumURI nameas andper
 the camelCase version of the* name, in that order) that<a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
     * 2.0 Authorization Framework</a>.
     *
     denote* this@return instancethe in(potentially anull) {@codeerror Map}URI
     */
    public List<String>String stringKeyserrorUri() {
        return stringKeyserrorUri;
    }
 
    /**
     * ReturnSet the (potentially null) value as the value typetoken. All error-related values are cleared.
     *
     * @param valuetoken
     *            a (potentially null) value of either the valuemandatory typetoken or a type suchto set
     */
    public void token(OAuthBearerToken token) {
    that the result of invoking {@code valueOf()} on the value type
this.token = Objects.requireNonNull(token);
        this.errorCode *= null;
        this.errorDescription = null;
 class against the value results in the valuethis.errorUri converted to the= null;
    }

  *  /**
     * Set the error values valueas typeper
     * @return the (potentially null) value as the value type<a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
     * 2.0 Authorization Framework</a>. Any token is cleared.
     *
     * @throws@param OAuthBearerConfigExceptionerrorCode
     *            the ifmandatory aerror non-nullcode valueto cannotset
 be converted to the value* type@param whileerrorDescription
     *            the remainingoptional consistenterror withdescription any min/max constraintsto set
     */ @param errorCode
    public Comparable<?> parseValue(Object value) throws OAuthBearerConfigException {
*            the // etc...
    }
 
optional error URI to set
     *// etc...
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.refresh.RefreshConfig
collapsetrue
package org.apache.kafka.common.security.oauthbearer.refresh;
 
/**
 * Immutable refresh-related configuration for instances of
 * {@link ExpiringCredentialRefreshingLogin}. Configuration that is independent
 * of the actual credential itself and that can be defined as login module
 * options in a JAAS config should be stored here.
 */
public class RefreshConfig {
    /**
     * Default constructor with individual refresh configuration properties
    public void error(String errorCode, String errorDescription, String errorUri) {
        if (Objects.requireNonNull(errorCode).isEmpty())
            throw new IllegalArgumentException("error code must not be empty");
        this.errorCode = errorCode;
     * being set tothis.errorDescription their default values= errorDescription;
     */
   this.errorUri public RefreshConfig() {= errorUri;
        this(Collections.<String, String>emptyMap(), "")token = null;
    }
}


Token Validation

See Rejected Alternatives: Callback Handlers and Callbacks

Image Added

We define the org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback class as the callback class for communicating that we want to validate a bearer token compact serialization provided by the connecting client.  When a broker accepts a SASL/OAUTHBEARER connection the instance of the builtin SaslServer implementation asks its configured AuthenticateCallbackHandler implementation to handle an instance of OAuthBearerValidatorCallback constructed with the OAuth 2 Bearer Token's compact serialization and return an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerToken if the compact serialization validates. A default, builtin AuthenticateCallbackHandler implementation validates an unsecured token as defined by these JAAS module options:

JAAS Module Option for Unsecured Token ValidationDocumentation
unsecuredValidatorPrincipalClaimName="value"

Set to a non-empty value if you wish a particular String claim holding
a principal name to be checked for existence; the default is to check for the
existence of the 'sub' claim.

unsecuredValidatorScopeClaimName="value"

Set to a custom claim name if you wish the name of the String or String
List claim holding any token scope to be something other than 'scope'

unsecuredValidatorRequiredScope="value"

Set to a space-delimited list of scope values if you wish the
String/String List claim holding the token scope to be checked to make sure
it contains certain values.

unsecuredValidatorAllowableClockSkewMs="value"Set to a positive integer value if you wish to allow up to some number of
positive milliseconds of clock skew (the default is 0).

Here is a typical, basic JAAS configuration for a broker leveraging unsecured SASL/OAUTHBEARER authentication: 

KafkaServer {
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
    unsecuredLoginStringClaim_sub="thePrincipalName";
};

Production use cases will require writing an implementation of AuthenticateCallbackHandler that can handle an instance of OAuthBearerValidatorCallback and declaring it via the listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class configuration option.

The validated token will be available as a negotiated property on the SaslServer instance with the key OAUTHBEARER.token so it can be used for authorization as per KIP-189: Improve principal builder interface and add support for SASL.  Note that the implementation of SaslServer is not part of the public interface – just the key where it makes the validated token available.


Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

/**
 * A {@code Callback} for use by the {@code SaslServer} implementation when it
 * needs to provide an OAuth 2 bearer token compact serialization for
 * validation. Callback handlers should use the
 * {@link #error(String, String, String)} method to communicate errors back to
 * the SASL Client as per
 * <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
 * 2.0 Authorization Framework</a> and the <a href=
 * "https://www.iana.org/assignments/oauth-parameters/oauth-parameters.xhtml#extensions-error">IANA
 * OAuth Extensions Error Registry</a>. Callback handlers should communicate
 * other problems by raising an {@code IOException}.
 */
public class OAuthBearerValidatorCallback implements Callback {
    private final String tokenValue;
    private OAuthBearerToken token = null;
    private String errorStatus = null;
    private String errorScope = null;
    private String errorOpenIDConfiguration = null;

    /**
     * Constructor
     *
     * @param tokenValue
     *            the mandatory/non-blank token value
     */
    public OAuthBearerValidatorCallback(String tokenValue) {
        if (Objects.requireNonNull(tokenValue).isEmpty())
            throw new IllegalArgumentException("token value must not be empty");
        this.tokenValue = tokenValue
    /**
     * Constructor based on a map with keys being the String keys associated with
     * {@link RefreshConfigProp} instances and values being either Strings or
     * non-Strings. Individual refresh configuration properties that are not
     * explicitly set to a valid value on the given map will be set to their default
     * value for this instance.
     * 
     * @param configMap
     *            the mandatory (but possibly empty) configuration map upon which to
     *            build this instance
     * @see RefreshConfigProp#stringKeys()
     * @see RefreshConfigProp#parseValue(Object)
     */
    public RefreshConfig(Map<String, String> configMap) {
        this(configMap, "");
    }

    /**
     * Constructor based on a map with keys being the String keys associated with
     * {@link RefreshConfigProp} instances and values being either Strings or
     * non-Strings. Individual refresh configuration properties that are not
     * explicitly set to a valid value on the given map will be set to their default
     * value for this instance.
     *
     * @param configMap
     *            the mandatory (but possibly empty) configuration map upon which to
     *            build this instance
     * @param keyPrefix
     *            the mandatory (but potentially blank) prefix to prepend to String
     *            keys
     * @see RefreshConfigProp#stringKeys()
     * @see RefreshConfigProp#parseValue(Object)
     */
    public RefreshConfig(Map<String, String> configMap, String keyPrefix) {
        // etc...
    }
 
    /**
     * Return the (always non-null and unmodifiable) {@code Map} that resulted from
     * the parsing of the input provided at construction time
     *
     * @return the (always non-null and unmodifiable) {@code Map} that resulted from
     *         the parsing of the input provided at construction time
     */
    public Map<RefreshConfigProp, Object> refreshConfigMap() {
        return refreshConfigMap;
    }

    /**
     * BackgroundReturn loginthe refresh thread will sleep until the specified window factor(always non-null) token value
     *
     * relative@return to the credential's total lifetime has been reached, at which time(always non-null) token value
     */
    public *String it will try to refresh the credential. The default value is 0.8 (80%).tokenValue() {
        return tokenValue;
    }

     /**
     * @returnReturn the refresh window factor (potentially null) token
     *
     * @see RefreshConfigProp#REFRESH_WINDOW_FACTOR@return the (potentially null) token
     */
    public doubleOAuthBearerToken refreshWindowFactortoken() {
        return refreshWindowFactortoken;
    }

    /**
     * AmountReturn ofthe random(potentially jitternull) addederror tostatus thevalue backgroundas loginper
 refresh thread's sleep
  * <a  * time. The default value is 0.05 (5%).href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
     * of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>
     * and the <a href=
     * "https://www.iana.org/assignments/oauth-parameters/oauth-parameters.xhtml#extensions-error">IANA
     * @return the refresh window jitter OAuth Extensions Error Registry</a>.
     *
     * @see RefreshConfigProp#REFRESH_WINDOW_JITTER @return the (potentially null) error status value
     */
    public doubleString refreshWindowJittererrorStatus() {
        return refreshWindowJittererrorStatus;
    }

    /**
     * TheReturn minimumthe time(potentially betweennull) checkserror byscope thevalue background login refresh thread,as per
     * regardless of other constraints, in milliseconds. The default value is 60,000
     * (1 minute)<a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
     * of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>.
     *
     * @return the minimum(potentially refreshnull) period,error inscope millisecondsvalue
     * @see RefreshConfigProp#REFRESH_MIN_PERIOD_MILLIS
     */
    public longString refreshMinPeriodMilliserrorScope() {
        return refreshMinPeriodMilliserrorScope;
    }

    /**
     * IfReturn the (potentially LoginModulenull) anderror SaslClientopenid-configuration implementationsvalue supportas multipleper
     * simultaneous login contexts on a single Subject at the same time. If true,
     * then upon refresh, logout will only be invoked on the original LoginContext<a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
     * of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>.
     *
     * after a new one successfully logs in. This can be helpful if the original @return the (potentially null) error openid-configuration value
     */
    public String errorOpenIDConfiguration() {
     * credential still hasreturn someerrorOpenIDConfiguration;
 lifetime left when an}

 attempt to refresh the/**
     * credentialSet fails; the clienttoken. willThe stilltoken be able to create new connections asvalue is unchanged and is expected to match the
     * provided long as the original credential remains valid. Otherwise, if logout istoken's value. All error values are cleared.
     *
     * @param immediatelytoken
 invoked prior to relogin, a* relogin failure leaves the client
     * without the mandatory abilitytoken to connectset
 until relogin does in fact*/
 succeed. The
  public void token(OAuthBearerToken *token) default{
 value is false.
     *
this.token = Objects.requireNonNull(token);
   * @return true if relogin isthis.errorStatus allowed= priornull;
 to discarding an existing
    this.errorScope *= null;
        (presumably unexpired) token, otherwise falsethis.errorOpenIDConfiguration = null;
    }

    /**
     * @see RefreshConfigProp#RELOGIN_ALLOWED_BEFORE_LOGOUT Set the error values as per
     * <a href="https://tools.ietf.org/html/rfc7628#section-3.2.2">RFC 7628: A Set
      public boolean reloginAllowedBeforeLogout() {* of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth</a>.
     * Any token returnis reloginAllowedBeforeLogout;cleared.
     }
*
     * @param errorStatus
    // etc...
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredentialLoginModule
collapsetrue
package org.apache.kafka.common.security.oauthbearer.refresh;

import javax.security.auth.spi.LoginModule;

/**
 * An extension of the {@code LoginModule} interface that must be implemented by
 * any login module that generates an instance of {@link ExpiringCredential} to
 * be managed/refreshed by {@link ExpiringCredentialRefreshingLogin}. The
 * existence of this interface is necessary to deal with the case when there are
 * multiple enabled SASL mechanisms, one or more of which generate an expiring
 * credential, and a SASL mechanism (as opposed to authentication via SSL) is
 * used for inter-broker communication.
 * <p>
 * The following broker-side JAAS configuration helps illustrate the need for
 * this interface:
 *
 * <pre>
 * KafkaServer {
 *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
 *      // etc...;
 *      some.othersaslmechanism.OtherSaslMechanismLoginModule Optional
 *      // etc...;
 * };
 * </pre>
 *
 * The {@code LoginContext} instance will initialize both login modules and ask
 * them both to login -- that's how JAAS works -- regardless of which SASL
 * mechanism is used for inter-broker communication. It is imperative that the
 * login succeeds for the login module associated with the mechanism configured
 * for inter-broker communication; it doesn't matter if any other mechanisms
 * fail because they aren't actually being used for client-side work (they are
 * only being used for the server side of the SASL handshake, and that is
 * performed by the {@code SaslServer} instance rather than the
 * {@code LoginModule} instance). This has 2 implications:
 * <p>
 * <ol>
 * <li>The {@code CallbackHandler} instance provided to the {@code LoginContext}
 * instance (which then passes it to all of the login modules) must be the
 * correct one for the login module that must succeed; it does not have to be
 * the correct one for any others.
 * <li>The login modules that don't have to succeed (because they aren't being
 * used for inter-broker communication) should be marked {@code Optional} in
 * case they fail.
 * </ol>
 * <p>
 * This raises the critical issue of how any instance of {@link Login} can know
 * which {@code CallbackHandler} instance to instantiate. The
 * {@link ScramLoginModule} and {@link PlainLoginModule} don't use the callback
 * handler, but {@code com.sun.security.auth.module.Krb5LoginModule} does, and
 * in fact {@link LoginCallbackHandler} serves the purpose of short-circuiting
 * any request for user interaction in that case. So this issue hasn't been
 * critical in the past; it is only now becoming more important.
 * <p>
 * All the {@code Login} instance knows is the SASL mechanism enabled for
 * inter-broker communication (from the broker config) and the JAAS
 * configuration (for example, the one shown above). It cannot know which
 * {@code CallbackHandler} instance to instantiate from just that information
 * because it cannot determine from that information alone which of the login
 * modules handles the declared inter-broker communication mechanism.
 * <p>
 * We thus arrive at the need for this interface. A {@code Login} instance can
 * look at all of the declared login module classes, determine which of them
 * implements this interface, and then for each of those it can instantiate an
 * instance using the default constructor and ask for its applicable mechanisms
 * via {@link #mechanisms()}. If it finds a login module with an applicable SASL
 * mechanism matching the one being used for inter-broker communication it can
 * then ask for a {@code CallbackHandler} instance via
 * {@link #newCallbackHandler()}. Otherwise, if it doesn't find a match, it just
 * instantiates an instance of {@link LoginCallbackHandler}, which is what
 * {@link AbstractLogin#login()} currently creates and is required for the
 * initial Kerberos login attempt for short-circuit behavior as mentioned above.
 */
public interface ExpiringCredentialLoginModule extends LoginModule {
    /**
     * Return the set of SASL mechanisms this login module applies to
     *
     * @return the set of SASL mechanisms this login module applies to
     */
    Set<String> mechanisms();

    /**
     * Return a new {@code CallbackHandler} instance appropriate for this login
     * module when one of its supported mechanisms as returned by
     * {@link #mechanisms()} is the SASL mechanism for inter-broker communication.
     *
     * @return a new {@code CallbackHandler} instance appropriate for this login
     *         module when one of its supported mechanisms as returned by
     *         {@link #mechanisms()} is the SASL mechanism for inter-broker
     *         communication.
     */
    CallbackHandler newCallbackHandler();
} 
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredentialRefreshingLogin
collapsetrue
package org.apache.kafka.common.security.oauthbearer.refresh;

import org.apache.kafka.common.security.auth.Login;

/**
 * This class is responsible for refreshing logins for both Kafka client and
 * server when the login is a type that has a limited lifetime and will expire. The
 * credentials for the login must implement {@link ExpiringCredential}, and the
 * {@code LoginModule} must implement {@link ExpiringCredentialLoginModule}.
 */
public class ExpiringCredentialRefreshingLogin implements Login {
    // etc...
 
    public RefreshConfig refreshConfig() {
        return refreshConfig;
    }
 
    // etc...
} 

 

OAuth Bearer Tokens

Every OAuth Bearer Token is a string representing an authorization issued to the client.  There are at least two types of OAuth Bearer Tokens: an opaque token and a JSON Web Token (JWT).  Nothing can be said about the contents of an opaque token other than that the contents are only available via some third party (e.g. an authorization server).  Every JWT contains both a JSON Object Signing and Encryption (JOSE) Header and a JWT Claims Set.  There are two distinct types of JWTs: a JSON Web Signature (JWS) and a JSON Web Encryption (JWE).  There is no intent to model a JWE with the initial implementation.

 

OAuth Bearer TokensImage Removed

Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerToken
collapsetrue
package org.apache.kafka.common.security.oauthbearer;
 /**
 * The <code>b64token</code> value as defined in
 * <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section
 * 2.1</a> along with the token's specific scope and lifetime.
 *
 * @see <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749
 *      Section 1.4</a> and
 *      <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
 *      Section 2.1</a>
 */
public interface OAuthBearerToken extends ExpiringCredential {
    /**
     * The <code>b64token</code> value as defined in
     * <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section
     * 2.1</a>
     *
     * @return <code>b64token</code> value as defined in
     *         <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
     *         Section 2.1</a>
     */
    String value();


    /**
     * The token's scope of access, as per
     * <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749 Section
     * 1.4</a>
     *
     * @return the token's (always non-null but potentially empty) scope of access,
     *         as per <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC
     *         6749 Section 1.4</a>. Note that all values in the returned set will
     *         be trimmed of preceding and trailing whitespace, and the result will
     *         never contain the empty string.
     * @throws IOException
     *             if one or more required networked resources (e.g. to re-hydrate
     *             an opaque token) is unavailable.
     * @throws OAuthBearerIllegalTokenException
     *             if there is something fundamentally wrong with the token (if it
     *             is malformed, for example)
     */
    Set<String> scope() throws IOException, OAuthBearerIllegalTokenException;

    /**
     * The token's lifetime, expressed as the number of milliseconds since the
     * epoch, as per <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC
     * 6749 Section 1.4</a>
     *
     * @return the token'slifetime, expressed as the number of milliseconds since
     *         the epoch, as per
     *         <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749
     *         Section 1.4</a>.
     * @throws IOException
     *             if one or more required networked resources (e.g. to re-hydrate
     *             an opaque token) is unavailable.
     * @throws OAuthBearerIllegalTokenException
     *             if there is something fundamentally wrong with the token (if it
     *             is malformed, for example)
     */
    long lifetime() throws IOException, OAuthBearerIllegalTokenException;
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerJwt
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

/**
 * A JSON Web Token, which may either be a JWS or a JWE. Neither signature
 * validation nor decryption are performed here.
 *
 * @see <a href="https://tools.ietf.org/html/rfc7519">RFC 7519</a>
 */
public abstract class OAuthBearerJwt implements OAuthBearerToken {
    /**
     * Constructor with the principal taken from the '{@code sub}' claim (if any)
     * and no scope claim name
     *
     * @param compactSerialization
     *            the compact serialization to parse as a JWT
     * @throws OAuthBearerIllegalTokenException
     *             if compactSerialization cannot be either a JWS or JWE (due to
     *             there not being either 3 or 5 dot-separated values; or the JWT
     *             header either not being a valid Base64 URL-encoded value or not
     *             valid JSON after decoding)
     */
    public OAuthBearerJwt(String compactSerialization) throws OAuthBearerIllegalTokenException {
        this(compactSerialization, null);
    }

    /**
     * Constructor with the given principal and scope claim names
     *
     * @param compactSerialization
     *            the compact serialization to parse as a JWT
     * @param principalClaimName
     *            the optional principal claim name
     * @param scopeClaimName
     *            the optional scope claim name
     * @throws OAuthBearerIllegalTokenException
     *             if compactSerialization cannot be either a JWS or JWE (due to
     *             there not being either 3 or 5 dot-separated values; or the JWT
     *             header either not being a valid Base64 URL-encoded value or not
     *             valid JSON after decoding)
     */
    public OAuthBearerJwt(String compactSerialization, String principalClaimName, String scopeClaimName)
            throws OAuthBearerIllegalTokenException {
        // etc...
    }
 
    /**
     * Return the 3 or 5 dot-separated sections of the JWT compact serialization
     * 
     * @return the 3 or 5 dot-separated sections of the JWT compact serialization
     */
    public List<String> splits() {
        return splits;
    }

    /**
     * Return the JOSE Header as a {@code Map}
     *
     * @return the JOSE header
     */
    public Map<String, Object> header() {
        return header;
    }

    /**
     * Return the JWT Claim Set as a {@code Map}
     *
     * @return the (always non-null but possibly empty) claims
     */
    public abstract Map<String, Object> claims();

    /**
     * Return the scope claim name, if any, otherwise null
     *
     * @return the scope claim name, if any, otherwise null
     */
    public String scopeClaimName() {
        return scopeClaimName;
    }

    /**
     * Indicate if the claim exists and is the given type
     *
     * @param claimName
     *            the mandatory JWT claim name
     * @param type
     *            the mandatory type, which should either be String.class,
     *            Number.class, or List.class
     * @return true if the claim exists and is the given type, otherwise false
     */
    public boolean isClaimType(String claimName, Class<?> type) {
        // etc...
    }

    /**
     * Extract a claim of the given type
     *
     * @param claimName
     *            the mandatory JWT claim name
     * @param type
     *            the mandatory type, which must either be String.class,
     *            Number.class, or List.class
     * @return the claim if it exists, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim exists but is not the given type
     */
    public <T> T getClaim(String claimName, Class<T> type) throws OAuthBearerIllegalTokenException {
        // etc...
    }

    /**
     * Extract a claim that could be either a String or a String List as a String
     * List (if it was a String it will be returned as a list of size 1).
     *
     * @param claimName
     *            the mandatory JWT claim name
     * @return the claim in the form of a String List, if it exists, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim exists but is neither a String nor a List
     */
    public List<String> getListClaimFromStringOrList(String claimName) throws OAuthBearerIllegalTokenException {
        // etc...
    }
 
    /**
     * Extract a claim in its raw form
     *
     * @param claimName
     *            the mandatory JWT claim name
     * @return the raw claim value, if it exists, otherwise null
     */
    public Object getRawClaim(String claimName) {
        return claims().get(Objects.requireNonNull(claimName));
    }

    /**
     * Return the
     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.3">Audience</a>
     * claim as a String List
     *
     * @return the <a href=
     *         "https://tools.ietf.org/html/rfc7519#section-4.1.3">Audience</a>
     *         claim as a String List if available, otherwise null. An Audience
     *         claim that is a String will be returned as a List of size 1.
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public List<String> audience() throws OAuthBearerIllegalTokenException {
        return getListClaimFromStringOrList(JwtClaim.AUDIENCE.claimName());
    }

    /**
     * Return the
     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.4">Expiration
     * Time</a> claim
     *
     * @return the <a href=
     *         "https://tools.ietf.org/html/rfc7519#section-4.1.4">Expiration
     *         Time</a> claim if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public Number expirationTime() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.EXPIRATION_TIME.claimName(), Number.class);
    }

    /**
     * Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.6">Issued
     * At</a> claim
     *
     * @return the
     *         <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.6">Issued
     *         At</a> claim if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public Number issuedAt() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.ISSUED_AT.claimName(), Number.class);
    }
 
    /**
     * Return the
     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.1">Issuer</a> claim
     *
     * @return the <a href=
     *         "https://tools.ietf.org/html/rfc7519#section-4.1.1">Issuer</a> claim
     *         if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public String issuer() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.ISSUER.claimName(), String.class);
    }

    /**
     * Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.7">JWT
     * ID</a> claim
     *
     * @return the <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.7">JWT
     *         ID</a> claim if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public String jwtId() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.JWT_ID.claimName(), String.class);
    }

    /**
     * Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.5">Not
     * Before</a> claim
     *
     * @return the <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.5">Not
     *         Before</a> claim if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public Number notBefore() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.NOT_BEFORE.claimName(), Number.class);
    }

    /**
     * Return the
     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.2">Subject</a> claim
     *
     * @return the <a href=
     *         "https://tools.ietf.org/html/rfc7519#section-4.1.2">Subject</a> claim
     *         if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public String subject() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.SUBJECT.claimName(), String.class);
    }
 
    /**
     * Decode the given Base64URL-encoded value, parse the resulting JSON as a JSON
     * object, and return the map of member names to their values (each value being
     * represented as either a String, a Number, or a List of Strings).
     *
     * @param split
     *            the value to decode and parse
     * @return the map of JSON member names to their String, Number, or String List
     *         value
     * @throws OAuthBearerIllegalTokenException
     *             if the given Base64URL-encoded value cannot be decoded or parsed
     */
    public static Map<String, Object> toMap(String split) throws OAuthBearerIllegalTokenException {
        // etc...
    }
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerJws
collapsetrue
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * @see <a href="https://tools.ietf.org/html/rfc7515">RFC 7515</a>. Note that
 *      digital signature validation is not performed.
 */
public class OAuthBearerJws extends OAuthBearerJwt {
    /**
     * Constructor with no scope claim name
     *
     * @param compactSerialization
     *            the compact serialization to parse as a JWT
     * @throws OAuthBearerIllegalTokenException
     *             if the compact serialization is not a valid JWS (meaning it did
     *             not have 3 dot-separated Base64URL sections; or the header or
     *             claims either are not valid Base 64 URL encoded values or are not
     *             JSON after decoding; or the existence or absence of a digital
     *             signature is inconsistent with the mandatory 'alg' header value)
     */
    public OAuthBearerJws(String compactSerialization) throws OAuthBearerIllegalTokenException {
        this(compactSerialization, JwtClaim.SUBJECT.claimName(), null);
    }

    /**
     * Constructor with the given principal and scope claim names
     *
     * @param compactSerialization
     *            the compact serialization to parse as a JWS
     * @param principalClaimName
     *            the optional principal claim name
     * @param scopeClaimName
     *            the optional scope claim name
     * @throws OAuthBearerIllegalTokenException
     *             if compactSerialization cannot be either a JWS or JWE (due to
     *             there not being either 3 or 5 dot-separated values; or the JWT
     *             header either not being a valid Base64 URL-encoded value or not
     *             valid JSON after decoding)
     */
    public OAuthBearerJws(String compactSerialization, String principalClaimName, String scopeClaimName)
            throws OAuthBearerIllegalTokenException {
        // etc...
    }
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.JwtHeaderParameter
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

/**
 * JSON Web Signature and Encryption Header Parameters
 *
 * @see <a href="https://www.iana.org/assignments/jose/jose.xhtml">JSON Web
 *      Signature and Encryption Header Parameters</a>
 */
public enum JwtHeaderParameter {
    ALGORITHM("alg"),
    JWK_SET_URL("jku"),
    JSON_WEB_KEY("jwk"),
    KEY_ID("kid"),
    X_509_URL("x5u"),
    X_509_CERTIFICATE_CHAIN("x5c"),
    X_509_CERTIFICATE_SHA1_THUMBPRINT("x5t"),
    X_509_CERTIFICATE_SHA256_THUMBPRINT("x5t#S256"),
    TYPE("typ"),
    CONTENT_TYPE("cty"),
    CRITICAL("crit"),
    ENCRYPTION_ALGORITHM("enc"),
    COMPRESSION_ALGORITHM("zip"),
    ISSUER("iss"),
    SUBJECT("sub"),
    AUDIENCE("aud");

    private String headerParameterName;

    private JwtHeaderParameter(String headerParameterName) {
        this.headerParameterName = headerParameterName;
    }

    String headerParameterName() {
        return headerParameterName;
    }
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.JwtClaim
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

/**
 * JSON Web Token Claims
 *
 * @see <a href="https://www.iana.org/assignments/jwt/jwt.xhtml">JSON Web Token
 *      Claims</a>
 */
public enum JwtClaim {
    ISSUER("iss"),
    SUBJECT("sub"),
    AUDIENCE("aud"),
    EXPIRATION_TIME("exp"),
    NOT_BEFORE("nbf"),
    ISSUED_AT("iat"),
    JWT_ID("jti");

    private String claimName;

    private JwtClaim(String claimName) {
        this.claimName = claimName;
    }

    public String claimName() {
        return claimName;
    }
}

Token Retrieval

Token RetrievalImage Removed

Token retrieval occurs on the client side of the SASL negotiation (in the producer/consumer, or on the broker when SASL/OAUTHBEARER is the inter-broker protocol), and the org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule class is the LoginModule implementation that creates and invokes an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenRetriever to perform the retrieval.  We provide the org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever class as a sample implementation that also provides value in development and testing situations.

Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

import org.apache.kafka.common.security.oauthbearer.internal.OAuthBearerSaslClientProvider;
import org.apache.kafka.common.security.oauthbearer.internal.OAuthBearerSaslServerProvider;
/**
 * A {@code LoginModule} for the SASL/OAUTHBEARER mechanism.
 * <p>
 * Example use on a non-broker client:
 *
 * <pre>
 * KafkaClient {
 *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
 *      reloginAllowedBeforeLogout="true"
 *      tokenRetriever="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever"
 *          stringClaim_sub="theClientPrincipalName"
 *          listClaim_scope="|scopeValue1|scopeValue2";
 * };
 * </pre>
 *
 * Example use on a broker:
 *
 * <pre>
 * KafkaServer {
 *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
 *      reloginAllowedBeforeLogout="true"
 *      tokenRetriever="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever"
 *          stringClaim_sub="theBrokerPrincipalName"
 *          listClaim_scope="|scopeValue1|scopeValue2"
 *      tokenValidator="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtValidator"
 *          principalClaimName="sub";
 * };
 * </pre>
 *
 * @see RefreshConfigProp#RELOGIN_ALLOWED_BEFORE_LOGOUT
 * @see OAuthBearerTokenRetriever
 * @see OAuthBearerTokenValidator
 */
public class OAuthBearerLoginModule implements ExpiringCredentialLoginModule {
    public static final String TOKEN_RETRIEVER_CLASS_NAME_OPTION = "tokenRetriever";
    public static final String TOKEN_VALIDATOR_CLASS_NAME_OPTION = "tokenValidator";
    private static final Set<String> MECHANISMS = Collections
            .unmodifiableSet(new HashSet<>(Arrays.asList("OAUTHBEARER")));
 
    static {
        OAuthBearerSaslClientProvider.initialize(); // not part of public API
        OAuthBearerSaslServerProvider.initialize(); // not part of public API
    }
 
    @Override
    public Set<String> mechanisms() {
        return MECHANISMS;
    }
    @Override
    public CallbackHandler newCallbackHandler() {
        return new SubstitutableModuleOptionsCallbackHandler();
    }
 
    // etc...

}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerTokenRetriever
collapsetrue
package org.apache.kafka.common.security.oauthbearer;
/**
 * An implementation of this interface must be configured for broker and
 * non-broker clients via JAAS when using the {@link OAuthBearerLoginModule}.
 * The configuration is done via the
 * {@value OAuthBearerLoginModule#TOKEN_RETRIEVER_CLASS_NAME_OPTION} option.
 */
public interface OAuthBearerTokenRetriever {
    /**
     * Retrieve a token using the given callback handler and JAAS login module
     * options.
     *
     * @param callbackHandler
     *            the mandatory callback handler. It will typically be capable of
     *            handling instances of {@link SubstitutableModuleOptionsCallback},
     *            though different implementations of this interface are free to
     *            differ in their requirements.
     * @param moduleOptionsMap
     *            the mandatory map representation of the <a href=
     *            "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
     *            options</a>
     * @return the retrieved token
     * @throws IOException
     *             if one or more networked resources required to perform the
     *             retrieval (e.g. a web service) is unavailable.
     * @throws UnsupportedCallbackException
     *             if the provided {@code CallbackHandler} cannot handle
     *             {@link SubstitutableModuleOptionsCallback}.
     * @throws OAuthBearerConfigException
     *             if there is a configuration problem that prevents this instance
     *             from functioning (a missing mandatory parameter, for example)
     * @throws LoginException
     *             if the retrieval fails for any other reason (if the token
     *             endpoint rejects the provided credentials, for example)
     */
    OAuthBearerToken retrieve(CallbackHandler callbackHandler, Map<String, String> moduleOptionsMap)
            throws IOException, UnsupportedCallbackException, OAuthBearerConfigException, LoginException;
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

/**
 * An implementation of {@link OAuthBearerTokenRetriever} that generates an
 * unsecured JWT. Claims and their values can be specified using
 * stringClaim_&lt;claimname&gt;, numberClaim_&lt;claimname&gt;, and
 * listClaim_&lt;claimname&gt; options. The first character of the value is
 * taken as the delimiter for list claims. You may define any claim name and
 * value except '{@code iat}' and '{@code exp}', both of which are calculated
 * automatically.
 * <p>
 * This implementation also accepts the following options:
 * <ul>
 * <li><code>{@value #PRINCIPAL_CLAIM_NAME_OPTION}</code> set to a custom claim
 * name if you wish the name of the String claim holding the principal name to
 * be something other than <code>"sub"</code>.</li>
 * <li><code>{@value #LIFETIME_SECONDS_OPTION}</code> set to an integer value if
 * the token expiration is to be set to something other than the default value
 * of 3600 seconds (which is 1 hour). The 'exp' claim reflects the expiration
 * time.</li>
 * <li><code>{@value #SCOPE_CLAIM_NAME_OPTION}</code> set to a custom claim name
 * if you wish the name of the String or String List claim holding any token
 * scope to be something other than "<code>scope</code>"</li>
 * </ul>
 * For example:
 *
 * <pre>
 * KafkaClient {
 *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
 *      tokenRetriever="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever"
 *      reloginAllowedBeforeLogout="true"
 *         stringClaim_sub="thePrincipalName"
 *         listClaim_scope="|scopeValue1|scopeValue2"
 *         lifetimeSeconds="60";
 * };
 *
 * </pre>
 */
public class OAuthBearerUnsecuredJwtRetriever implements OAuthBearerTokenRetriever {
    public static final String PRINCIPAL_CLAIM_NAME_OPTION = "principalClaimName";
    public static final String LIFETIME_SECONDS_OPTION = "lifetimeSeconds";
    public static final String SCOPE_CLAIM_NAME_OPTION = "scopeClaimName";
 
    // etc...
 
}

 

Token Validation

Token ValidationImage Removed

Token validation occurs on the broker side of the SASL negotiation, and the SaslServer implementation registered by org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule creates and invokes an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenValidator to perform the validation.  We provide the org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtValidator class as a sample implementation that also provides value in development and testing situations.  There are additional utility classes as shown in the above diagram that are also reusable with other implementations.

Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerSaslServer
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

import javax.security.sasl.SaslServer;

/**
 * The public interface of SaslServer implementations of SASL/OAUTHBEARER for
 * Kafka. The validated access token will be available via the {@link #token()}
 * method. The existence of this interface keeps the actual implementation out
 * of the public API.
 */
public interface OAuthBearerSaslServer extends SaslServer {
    /**
     * Return the validated access token. Exposing the token here is necessary if we
     * wish to make it available for authorization decisions.
     *
     * @return the validated access token
     * @see <a href=
     *      "https://cwiki.apache.org/confluence/display/KAFKA/KIP-189%3A+Improve+principal+builder+interface+and+add+support+for+SASL">KIP-189:
     *      Improve principal builder interface and add support for SASL</a>
     */
    public OAuthBearerToken token();
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerTokenValidator
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

/**
 * An implementation of this interface must be configured for brokers via JAAS
 * when using the {@link OAuthBearerLoginModule}. The configuration is done via
 * the {@value OAuthBearerLoginModule#TOKEN_VALIDATOR_CLASS_NAME_OPTION} option.
 */
public interface OAuthBearerTokenValidator {
    /**
     * Validate a token using the given callback handler and JAAS login module
     * options.
     *
     * @param tokenValue
     *            the <code>b64token</code> value as defined in
     *            <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
     *            Section 2.1</a> to validate
     * @param callbackHandler
     *            the mandatory callback handler. It will typically be capable of
     *            handling instances of {@link SubstitutableModuleOptionsCallback},
     *            though different implementations of this interface are free to
     *            differ in their requirements. Set the callback handler via the
     *            {@code sasl.server.callback.handler.class.map} option in the
     *            broker properties file.
     * @param moduleOptionsMap
     *            the mandatory map representation of the <a href=
     *            "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
     *            options</a>
     * @return the successfully validated token
     * @throws IOException
     *             if one or more networked resources required to perform the
     *             validation (e.g. a web service) is unavailable.
     * @throws UnsupportedCallbackException
     *             if the provided {@code CallbackHandler} cannot handle
     *             {@link SubstitutableModuleOptionsCallback}.
     * @throws OAuthBearerConfigException
     *             if there is a configuration problem that prevents this instance
     *             from functioning (a missing mandatory parameter, for example)
     * @throws OAuthBearerIllegalTokenException
     *             if there is a problem with the token itself (it cannot be parsed
     *             or it otherwise fails validation)
     * @see BrokerSecurityConfigs#SASL_SERVER_CALLBACK_HANDLER_CLASS_MAP_DOC
     */
    OAuthBearerToken validate(String tokenValue, CallbackHandler callbackHandler, Map<String, String> moduleOptionsMap)
            throws IOException, UnsupportedCallbackException, OAuthBearerConfigException,
            OAuthBearerIllegalTokenException;
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtValidator
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

/**
 * An implementation of {@link OAuthBearerTokenValidator} that validates a JWS
 * without regard to its algorithm and signature. It requires there to be an
 * <code>"exp" (Expiration Time)</code> claim of type Number. If
 * <code>"iat" (Issued At)</code> or <code>"nbf" (Not Before)</code> claims are
 * present each must be a number that precedes the Expiration Time claim, and if
 * both are present the Not Before claim must not precede the Issued At claim.
 * It also accepts the following options, none of which are required:
 * <ul>
 * <li><code>{@value #PRINCIPAL_CLAIM_NAME_OPTION}</code> set to a non-empty
 * value if you wish a String claim holding a principal name to be checked for
 * existence; the default is to perform no such check</li>
 * <li><code>{@value #SCOPE_CLAIM_NAME_OPTION}</code> set to a custom claim name
 * if you wish the name of the String/String List claim holding any token scope
 * to be something other than <code>"scope"</code></li>
 * <li><code>{@value #REQUIRED_SCOPE_OPTION}</code> set to a space-delimited
 * list of scope values if you wish the String/String List claim holding the
 * token scope to be checked to make sure it contains certain values</li>
 * <li><code>{@value #ALLOWABLE_CLOCK_SKEW_MILLIS_OPTION}</code> set to a
 * positive integer value if you wish to allow up to some number of positive
 * milliseconds of clock skew (the default is 0)</li>
 * </ul>
 * For example:
 *
 * <pre>
 * KafkaServer {
 *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
 *      reloginAllowedBeforeLogout="true"
 *      tokenRetriever="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever"
 *         stringClaim_sub="thePrincipalName"
 *         listClaim_scope=",KAFKA_BROKER,LOGIN_TO_KAFKA"
 *      tokenValidator="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtValidator"
 *          principalClaimName="sub"
 *          requiredScope="LOGIN_TO_KAFKA"
 *          allowableClockSkewMillis="3000";
 * };
 * </pre>
 */
public class OAuthBearerUnsecuredJwtValidator implements OAuthBearerTokenValidator {
    public static final String PRINCIPAL_CLAIM_NAME_OPTION = "principalClaimName";
    public static final String SCOPE_CLAIM_NAME_OPTION = "scopeClaimName";
    public static final String REQUIRED_SCOPE_OPTION = "requiredScope";
    public static final String ALLOWABLE_CLOCK_SKEW_MILLIS_OPTION = "allowableClockSkewMillis";
 
    // etc...
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerValidationUtils
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

public class OAuthBearerValidationUtils {
    /**
     * Validate the given claim for existence and type. It can be required to exist
     * in the given claims, and if it exists it must be one of the types indicated
     *
     * @param jwt
     *            the mandatory JWT to which the validation will be applied
     * @param required
     *            true if the claim is required to exist
     * @param claimName
     *            the required claim name identifying the clam to be checked
     * @param allowedTypes
     *            one or more of {@code String.class}, {@code Number.class}, and
     *            {@code List.class} identifying the type(s) that the claim value is
     *            allowed to be if it exists
     * @return the result of the validation
     */
    public static OAuthBearerValidationResult validateClaimForExistenceAndType(OAuthBearerJwt jwt, boolean required,
            String claimName, Class<?>... allowedTypes) {
        // etc...
    }
 
    /**
     * Validate the 'iat' (Issued At) claim. It can be required to exist in the
     * given claims, and if it exists it must be a (potentially fractional) number
     * of seconds since the epoch defining when the JWT was issued; it is a
     * validation error if the Issued At time is after the time at which the check
     * is being done (plus any allowable clock skew).
     *
     * @param jwt
     *            the mandatory JWT to which the validation will be applied
     * @param required
     *            true if the claim is required to exist
     * @param whenCheckTimeMillis
     *            the time relative to which the validation is to occur
     * @param allowableClockSkewMillis
     *            non-negative number to take into account some potential clock skew
     * @return the result of the validation
     * @throws OAuthBearerConfigException
     *             if the given allowable clock skew is negative
     */
    public static OAuthBearerValidationResult validateIssuedAt(OAuthBearerJwt jwt, boolean required,
            long whenCheckTimeMillis, int allowableClockSkewMillis) throws OAuthBearerConfigException {
        // etc...
    }
 
    /**
     * Validate the 'nbf' (Not Before) claim. It can be required to exist in the
     * given claims, and if it exists it must be a (potentially fractional) number
     * of seconds since the epoch defining the point at which the JWT becomes valid.
     * It is a validation error if the time at which the check is being done (plus
     * any allowable clock skew) is before the Not Before time.
     *
     * @param jwt
     *            the mandatory JWT to which the validation will be applied
     * @param required
     *            true if the claim is required to exist
     * @param whenCheckTimeMillis
     *            the time relative to which the validation is to occur
     * @param allowableClockSkewMillis
     *            non-negative number to take into account some potential clock skew
     * @return the result of the validation
     * @throws OAuthBearerConfigException
     *             if the given allowable clock skew is negative
     */
    public static OAuthBearerValidationResult validateNotBefore(OAuthBearerJwt jwt, boolean required,
            long whenCheckTimeMillis, int allowableClockSkewMillis) throws OAuthBearerConfigException {
        // etc...
    }
 
    /**
     * Validate the 'exp' (Expiration Time) claim. It can be required to exist in
     * the given claims, and if it exists it must be a (potentially fractional)
     * number of seconds defining the point at which the JWT expires. It is a
     * validation error if the time at which the check is being done (minus any
     * allowable clock skew) is on or after the Expiration Time time.
     *
     * @param jwt
     *            the mandatory JWT to which the validation will be applied
     * @param required
     *            true if the claim is required to exist
     * @param whenCheckTimeMillis
     *            the time relative to which the validation is to occur
     * @param allowableClockSkewMillis
     *            non-negative number to take into account some potential clock skew
     * @return the result of the validation
     * @throws OAuthBearerConfigException
     *             if the given allowable clock skew is negative
     */
    public static OAuthBearerValidationResult validateExpirationTime(OAuthBearerJwt jwt, boolean required,
            long whenCheckTimeMillis, int allowableClockSkewMillis) throws OAuthBearerConfigException {
        // etc...
    }
 
    /**
     * Validate the 'iat' (Issued At), 'nbf' (Not Before), and 'exp' (Expiration
     * Time) claims for internal consistency. For claim pairs that exist, the
     * following must be true:
     * <ul>
     * <li>nbf &gt;= iat</li>
     * <li>exp &gt; iat</li>
     * <li>exp &gt; nbf</li>
     * </ul>
     *
     * @param jwt
     *            the mandatory JWT to which the validation will be applied
     * @return the result of the validation
     */
    public static OAuthBearerValidationResult validateTimeConsistency(OAuthBearerJwt jwt) {
        // etc...
    }
 
    /**
     * Validate the given scope against the required scope. Every required scope
     * element (if any) must exist in the provided scope for the validation to
     * succeed.
     *
     * @param scope
     *            the optional scope to validate
     * @param requiredScope
     *            the optional required scope against which the given scope will be
     *            validated
     * @return the result of the validation
     * @throws IOException
     *             if one or more required networked resources (e.g. to re-hydrate
     *             an opaque token) is unavailable.
     * @throws OAuthBearerIllegalTokenException
     *             if there is something fundamentally wrong with the token (if it
     *             is malformed, for example)
     */
    public static OAuthBearerValidationResult validateScope(OAuthBearerToken token, List<String> requiredScope)
            throws IOException, OAuthBearerIllegalTokenException {
        // etc...
    }
 
    // etc...
 
    private OAuthBearerValidationUtils() {
        // empty
    }
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerScopeUtils
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

/**
 * Utility class for help dealing with
 * <a href="https://tools.ietf.org/html/rfc6749#section-3.3">Access Token
 * Scopes</a>
 */
public class OAuthBearerScopeUtils {
    private static final Pattern INDIVIDUAL_SCOPE_ITEM_PATTERN = Pattern.compile("[\\x23-\\x5B\\x5D-\\x7E\\x21]+");

    /**
     * Return true if the given value meets the definition of a valid scope item as
     * per <a href="https://tools.ietf.org/html/rfc6749#section-3.3">RFC 6749
     * Section 3.3</a>, otherwise false
     *
     * @param scopeItem
     *            the mandatory scope item to check for validity
     * @return true if the given value meets the definition of a valid scope item,
     *         otherwise false
     */
    public static boolean isValidScopeItem(String scopeItem) {
        return INDIVIDUAL_SCOPE_ITEM_PATTERN.matcher(Objects.requireNonNull(scopeItem)).matches();
    }

    /**
     * Convert a space-delimited list of scope values (for example,
     * <code>"scope1 scope2"</code>) to a List containing the individual elements
     * (<code>"scope1"</code> and <code>"scope2"</code>)
     *
     * @param spaceDelimitedScope
     *            the mandatory (but possibly empty) space-delimited scope values,
     *            each of which must be valid according to
     *            {@link #isValidScopeItem(String)}
     * @return the list of the given (possibly empty) space-delimited values
     * @throws OAuthBearerConfigException
     *             if any of the individual scope values are malformed/illegal
     */
    public static List<String> parseScope(String spaceDelimitedScope) throws OAuthBearerConfigException {
        // etc...
    }


    private OAuthBearerScopeUtils() {
        // empty
    }
}

 

Proposed Changes

Thanks to KIP-86, no changes to the existing code base are required – all functionality represents additions (rather than changes) to the existing code base.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

None

  • If we are changing behavior how will we phase out the older behavior?

There is no change to existing behavior

Rejected Alternatives

...

Unchecked exceptions can be added or deleted without breaking binary compatibility, and Kafka uses unchecked exceptions exclusively, so OAuthBearerException is unchecked.  We considered making OAuthBearerException a checked exception to get the compiler help provided by the use of checked exceptions because the set of exception types and the size of the overall code base are both small such that the risk that we would want to change the type(s) of exception(s) thrown is minimal.  Nonetheless, we agreed the benefit of consistency and eliminating the risk of breaking binary compatibility outweighed the benefit of the compiler help provided by checked exceptions.

...

 *            the mandatory error status value from the <a href=
     *            "https://www.iana.org/assignments/oauth-parameters/oauth-parameters.xhtml#extensions-error">IANA
     *            OAuth Extensions Error Registry</a> to set
     * @param errorScope
     *            the optional error scope value to set
     * @param errorStatus
     *            the optional error openid-configuration value to set
     */
    public void error(String errorStatus, String errorScope, String errorOpenIDConfiguration) {
        if (Objects.requireNonNull(errorStatus).isEmpty())
            throw new IllegalArgumentException("error status must not be empty");
        this.errorStatus = errorStatus;
        this.errorScope = errorScope;
        this.errorOpenIDConfiguration = errorOpenIDConfiguration;
        this.token = null;
    }
}

Summary of Configuration
Anchor
Summary of Configuration
Summary of Configuration

The following table summarizes the proposed configuration for Kafka's SASL/OAUTHBEARER implementation.

ConfigurationExample Value

Likelihood of Value Being Different

from the Example Value

Notes
JAAS Login Module

org.apache.kafka.common.security.oauthbearer.

OAuthBearerLoginModule

Low

Non-Broker:

 sasl.login.callback.handler.class

Broker:

 listener.name.sasl_ssl.oauthbearer.

   sasl.login.callback.handler.class


org.example.MyLoginCallbackHandler


High


Default implementation creates an unsecured OAuth Bearer Token

listener.name.sasl_ssl.oauthbearer.

  sasl.server.callback.handler.class

org.example.MyValidatorCallbackHandler

HighDefault implementation validates an unsecured OAuth Bearer Token
JAAS Module OptionsVariedHigh

Used to configure the above sasl.login and sasl.server callback handlers.

producer/consumer/broker configs: sasl.login.refresh.*Varied, though defaults are reasonableLow

Summary for Production Use

To use SASL/OAUTHBEARER in a production scenario it is necessary to write two separate callback handlers implementing org.apache.kafka.common.security.auth.AuthenticateCallbackHandler:

  • A login callback handler that can retrieve an OAuth 2 bearer token from the token endpoint and wrap that token as an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerToken. It must attempt to do this when it is asked to handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback.
  • A SASL Server callback handler that can validate an OAuth 2 bearer token compact serialization and convert that compact serialization to an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerToken.  It must attempt to do this when it is asked to handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback.  Note that this may entail secure retrieval (and perhaps caching) of a public key along with digital signature verification if the token is a JSON Web Signature (JWS); validation of various token claims such as the 'iat' (Issued At), 'nbf' (Not Before), 'exp' (Expiration Time), 'aud' (Audience), and 'iss' (Issuer) claims if the token is a JSON Web Token (JWT); and/or decryption if the token is encrypted as a JSON Web Encryption (JWE).

See 84803841 for details on how to declare these two classes to Kafka.

It is likely that the implementations of the above two callback handlers will leverage an open source JOSE (Javascript Object Signing and Encryption) library.  See https://jwt.io/ for a list of many of the available libraries.

OAuth 2 is a flexible framework that allows different installations to do things differently, so the principal name in Kafka could come from any claim in a JWT.  Most of the time it would come from the 'sub' claim, but it could certainly come from another claim, or it could be only indirectly based on a claim value (maybe certain text would be trimmed or prefixed/suffixed).  Because the OAuth 2 framework is flexible, we need to accommodate that flexibility – and the ability to plugin arbitrary implementations of the above two callback handler classes gives us the required flexibility.  As an example, the SASL Server callback handler implementation could leverage an open source JOSE library to parse the compact serialization, retrieve the public key if it has not yet been retrieved, verify the digital signature, validate various token claims, and map the 'sub' claim to the OAuthBearerToken's principal name (which becomes the SASL authorization ID, which becomes the Kafka principal name).  By writing the callback handler code we have complete flexibility to meet the requirements of any particular OAuth 2 installation.

The following references may be helpful:

URLDescription
https://tools.ietf.org/html/rfc6749RFC 6749: The OAuth 2.0 Authorization Framework
https://tools.ietf.org/html/rfc6750RFC 6750: The OAuth 2.0 Authorization Framework: Bearer Token Usage
https://tools.ietf.org/html/rfc7519

RFC 7519: JSON Web Token (JWT)

https://tools.ietf.org/html/rfc7515RFC 7515: JSON Web Signature (JWS)
https://tools.ietf.org/html/rfc7516RFC 7516: JSON Web Encryption (JWE)
https://tools.ietf.org/html/rfc7628RFC 7628: A Set of Simple Authentication and Security Layer (SASL) Mechanisms for OAuth

Proposed Changes

Thanks to KIP-86: Configurable SASL callback handlers, no changes to existing public interfaces are required – all functionality represents additions rather than changes.  The only changes to the existing implementation are to define appropriate default callback handler/login classes for SASL/OAUTHBEARER.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

None

  • If we are changing behavior how will we phase out the older behavior?

There is no change to existing behavior

Rejected Alternatives

Motivation
Anchor
Rejected Alternatives Motivation
Rejected Alternatives Motivation

The idea of providing a toolkit of reusable JWT/JWS/JWE retrieval and validation functionality was discarded in favor of a single, simple unsecured JWS implementation.  Anything beyond simple unsecured use cases requires significant functionality that is available via multiple open source libraries, and there is no need to duplicate that functionality here.  It is also not desirable for the Kafka project to define a specific open source JWT/JWS/JWE library dependency; better to allow installations to use the library that they feel most comfortable with.  This decision also allows the public-facing interface of this KIP to be considerably smaller than it would otherwise be.

Substitution Within Configuration Values
Anchor
Rejected Alternatives Substitution Within Configuration Values
Rejected Alternatives Substitution Within Configuration Values

Flexible, substitution-aware configuration was originally proposed in an early draft of this KIP but was separated out into its own KIP-269 Substitution Within Configuration Values because it was thought that such functionality might be more broadly useful.  That KIP did not gain traction in part because it was pointed out that most secret-related configuration could be done dynamically via the functionality provided by KIP-226 Dynamic Broker Configuration, which can help remove secrets from the configuration files themselves and is based on an event notification paradigm as opposed to the passive, pull-based one used by substitution.  It was also pointed out that production implementations of SASL/OAUTHBEARER for Kafka as defined here would have to provide their own AuthenticateCallbackHandler implementations (as per KIP-86: Configurable SASL callback handlers), and those implementation can be as flexible as desired/required with respect to how they deal with secrets.  In the end, this KIP has no dependency on substitution, and that KIP can live/die/resurrect – as the community sees fit – independently of this one.

Explicit Configuration of Token Refresh Class
Anchor
Rejected Alternatives Explicit Configuration of Token Refresh Class
Rejected Alternatives Explicit Configuration of Token Refresh Class

We adjusted the logic in SaslChannelBuilder to automatically configure the org.apache.kafka.common.security.oauthbearer.internal.OAuthBearerRefreshingLogin class as the Login implementation by default for SASL/OAUTHBEARER use cases.  We also decided not to unify the refresh logic required for both SASL/OAUTHBEARER and SASL/GSSAPI mechanisms as part of this KIP, though this unification could occur at some point in the future.  The above-mentioned OAuthBearerRefreshingLogin class (which is NOT part of the public API) delegates to an underlying imlementation in the same internal package, and this delegation-based approach suggests a potential way forward with regard to unification, but unification is explicitly out of scope here.  The chosen delegation design along with automatic configuration allows several classes to be moved to the internal package, which helps to minimize the public-facing API for this KIP and creates a better out-of-the-box experience.

Callback Handlers and Callbacks
Anchor
Rejected Alternatives Callback Handlers and Callbacks
Rejected Alternatives Callback Handlers and Callbacks

We decided to leverage the standard JAAS Callback/CallbackHandler mechanism for communicating information between various components (specifically, between the SASL Client and the Login Module, between the Login Module and the Login/Token Retrieval mechanism, and between the SASL Server and the Token Validation mechanism).  We had originally documented the need for just the last two (token retrieval and token validation), and the original proposal was to declare the class names as part of the JAAS configuration.  The only real benefit to doing this was that the declaration of the classes and their configuration were co-located in the JAAS configuration.  We decided the benefit of consistency was at least as valuable as any cost associated with separating the declaration from the configuration – and probably more valuable given that this is how configuration is done for other SASL mechanisms and is therefore not unfamiliar.  We also now leverage callback handlers in all three places where it is supported, though one of them (SASL Client callback handler) has no need for explicit configuration in any known use case.  Finally, we adjusted the logic in SaslChannelBuilder and LoginManager to automatically apply default AuthenticateCallbackHandler and Login implementations for SASL/OAUTHBEARER in the absence of explicit declarations, which simplifies the out-of-the-box experience for unsecured (i.e. development and testing) use cases.

...

<<<NOTE: Please consider this to be open for debate at this point>>>

We implement org.apache.kafka.common.security.oauthbearer.ExpiringCredential as an interface rather than as a class because an interface creates less of a constraint – implementers of other types of expiring credentials remain free to extend any class they want.  The risk of having to add a new method while Java 7 is still a supported target (which would break forward compatibility) is low.

We considered associating refresh-related properties (such as the minimum refresh period in milliseconds) with the ExpiringCredential rather than the ExpiringCredentialRefreshingLogin instance because the ExpiringCredentialRefreshingLogin instance couldn't know which of the potentially multiple login modules actually applies (i.e. which is the one associated with the inter-broker protocol); it wouldn't always know how to find the JAAS config options, so it wouldn't always know how to get the refresh configuration.  There was problem with this aproach, though: we can't invoke login() on the LoginContext and get an ExpiringCredential instance without a CallbackHandler, so we needed to know the type of CallbackHandler to instantiate – and there is no way to know that.  It simply made sense to give the ExpiringCredentialRefreshingLogin instance the ability to discover the correct login module in all cases and be able to ask it for the CallbackHandler instance; hence we created the ExpiringCredentialLoginModule interface.

...