You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Status

Current state: "Under Discussion"

Discussion thread:  here

JIRA: KAFKA-6562

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The ability to authenticate to Kafka with an OAuth 2 Access Token is desirable given the popularity of OAuth.  The introduction of KIP-86 (Configurable SASL callback handlers) makes it possible to non-intrusively add new SASL mechanisms, and OAUTHBEARER is the SASL mechanism for OAuth.  OAuth 2 is a flexible framework with multiple ways of doing the same thing (for example, getting a public key to confirm a digital signature), so pluggable implementations – and flexibility in their configuration – is a requirement.

This KIP proposes to add the following functionality related to SASL/OAUTHBEARER:

  • Allow clients (both brokers when SASL/OAUTHBEARER is the inter-broker protocol as well as non-broker clients) to flexibly retrieve an access token from an OAuth 2 authorization server according to JAAS-defined configuration (and potentially plugged-in implementation) and present the access token to a broker for authentication.
  • Allow brokers to flexibly validate provided access tokens when a client establishes a connection based on JAAS-defined configuration (and potentially plugged-in implementation).
  • Provide a toolkit of reusable functionality applicable to common token retrieval and validation scenarios so that implementations can either be reused or, when they are custom and plugged-in, be relatively small.
  • Allow clients to transparently retrieve a new access token in the background before the existing access token expires in case the client has to open new connections.  Any existing connections will remain unaffected by this "token refresh" functionality as long as the connection remains intact, but new connections from the same client will always use the latest access token (either the initial one or the one that was most recently retrieved by the token refresh functionality, if any) – this is how Kerberos-authenticated connections work with respect to ticket expiration.

Note that the access token can be made available to the broker for authorization decisions due to KIP-189 (by exposing the access token on the SaslServer implementation), but detailed discussion of this possibility is outside the scope of this proposal.  It is noted, however, that if access tokens are somehow used for authorization decisions, it is conceivable due to the long-lived nature of Kafka connections that authorization decisions will sometimes be made using expired access tokens.  For example, it is up to the broker to validate the token upon authentication, but the token will not be replaced for that particular connection as long as it remains intact; if the token expires in an hour then authorization decisions for that first hour will be made using the still-valid token, but after an hour the expired token would remain associated with the connection, and authorization decisions from that point forward for that particular connection would be made using the expired token.  This would have to be addressed via a separate KIP if it turns out to be problematic, but that seems unlikely (code signing certificates that have been timestamped remain valid after their expiration, for example, and access tokens are indeed timestamped).

Note also that the implementation of flexible, substitution-aware configuration that was originally proposed in an early draft of this KIP was deemed more generally useful and has been separated out into its own KIP-269 Substitution Within Configuration Values, which is now a prerequisite for this one.

Public Interfaces

Note that most of the implementation of this KIP will be public-facing.  The following sections define the various parts, and each includes an overall UML diagram as well as important code details (with Javadoc).

Exceptions

See Rejected Alternatives: Exceptions

We define a small exception hierarchy to cover the various cases related to the SASL/OAUTHBEARER code.

org.apache.kafka.common.security.oauthbearer.OAuthBearerException
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * Base class for all exceptions thrown by the SASL/OAUTHBEARER code.
 */
public abstract class OAuthBearerException extends KafkaException {
    // etc...
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerConfigException
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * Exception thrown when there is a problem with the configuration (an invalid
 * option in a JAAS config, for example).
 */
public class OAuthBearerConfigException extends OAuthBearerException {
    // etc...
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerIllegalTokenException
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * Exception thrown when token validation fails due to a problem with the token
 * itself (as opposed to a missing remote resource or a configuration problem)
 */
public class OAuthBearerIllegalTokenException extends OAuthBearerException {
    /**
     * Constructor
     *
     * @param reason
     *            the mandatory reason for the validation failure; it must indicate
     *            failure
     */
    public OAuthBearerIllegalTokenException(OAuthBearerValidationResult reason) {
        super(Objects.requireNonNull(reason).failureDescription());
        if (reason.success())
            throw new IllegalArgumentException("The reason indicates success; it must instead indicate failure");
        this.reason = reason;
    }

    /**
     * Return the (always non-null) reason for the validation failure
     *
     * @return the reason for the validation failure
     */
    public OAuthBearerValidationResult reason() {
        return reason;
    }
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidationResult
package org.apache.kafka.common.security.oauthbearer;

/**
 * The result of some kind of token validation
 */
public class OAuthBearerValidationResult implements Serializable {
 
    // etc...

    /**
     * Return an instance indicating success
     *
     * @return an instance indicating success
     */
    public static OAuthBearerValidationResult newSuccess() {
        return new OAuthBearerValidationResult(true, null, null, null);
    }

    /**
     * Return a new validation failure instance
     *
     * @param failureDescription
     *            optional description of the failure
     * @return a new validation failure instance
     */
    public static OAuthBearerValidationResult newFailure(String failureDescription) {
        return newFailure(failureDescription, null, null);
    }

    /**
     * Return a new validation failure instance
     *
     * @param failureDescription
     *            optional description of the failure
     * @param failureScope
     *            optional scope to be reported with the failure
     * @param failureOpenIdConfig
     *            optional OpenID Connect configuration to be reported with the
     *            failure
     * @return a new validation failure instance
     */
    public static OAuthBearerValidationResult newFailure(String failureDescription, String failureScope,
            String failureOpenIdConfig) {
        return new OAuthBearerValidationResult(false, failureDescription, failureScope, failureOpenIdConfig);
    }
 
    private OAuthBearerValidationResult(boolean success, String failureDescription, String failureScope,
            String failureOpenIdConfig) {
        // etc...
    }
    /**
     * Return true if this instance indicates success, otherwise false
     *
     * @return true if this instance indicates success, otherwise false
     */
    public boolean success() {
        return success;
    }

    /**
     * Return the (potentially null) descriptive message for the failure
     *
     * @return the (potentially null) descriptive message for the failure
     */
    public String failureDescription() {
        return failureDescription;
    }

    /**
     * Return the (potentially null) scope to be reported with the failure
     *
     * @return the (potentially null) scope to be reported with the failure
     */
    public String failureScope() {
        return failureScope;
    }

    /**
     * Return the (potentially null) OpenID Connect configuration to be reported
     * with the failure
     *
     * @return the (potentially null) OpenID Connect configuration to be reported
     *         with the failure
     */
    public String failureOpenIdConfig() {
        return failureOpenIdConfig;
    }
 
    /**
     * Raise an exception if this instance indicates failure, otherwise do nothing
     *
     * @throws OAuthBearerIllegalTokenException
     *             if this instance indicates failure
     */
    public void throwExceptionIfFailed() throws OAuthBearerIllegalTokenException {
        if (!success())
            throw new OAuthBearerIllegalTokenException(this);
    }
}

 

Token Refresh

See Rejected Alternatives: Token Refresh

Token Refresh

We define the org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredential and org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredentialLoginModule interfaces and the org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredentialRefreshingLogin class that can refresh expiring credentials.  We represent many of the parameters that impact how the refresh algorithm operates as a single map of enum/value pairs rather than as separate methods – this helps ensure forward compatibility if/when we decide we want to add more potential parameters (having the flexibility to add more parameters without adding methods to an interface is especially important since Java 7 does not support default methods on interfaces – adding a new method breaks forward compatibnility).  We provide the org.apache.kafka.common.security.oauthbearer.refresh.RefreshConfig class to provide the methods that we did not add to any interface, and the org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredentialRefreshingLogin class will parse the properties from the JAAS config options (with appropriate defaults in case they are not explicitly specified).

 


Note that the token refresh functionality and the related classes and interfaces are not necessarily specific to OAuth Bearer Tokens, so an open question is whether this refresh-related code belongs in a sub-package of the main SASL/OAUTHBEARER one or if it should live somewhere else.  If this functionality ends up in elsewhere then perhaps RefreshConfigProp.parseValue(Object) should not throw an OAuthBearerConfigException but rather a different exception type.


 

org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredential
package org.apache.kafka.common.security.oauthbearer.refresh;
 
/**
 * A credential that expires and that can potentially be refreshed
 *
 * @see ExpiringCredentialRefreshingLogin
 */
public interface ExpiringCredential {
    /**
     * The name of the principal to which this credential applies (used only for
     * logging)
     *
     * @return the always non-null/non-empty principal name
     */
    String principalName();
    /**
     * When the credential became valid, in terms of the number of milliseconds
     * since the epoch, if known, otherwise null. An expiring credential may not
     * necessarily indicate when it was created -- just when it expires -- so we
     * need to support a null return value here.
     *
     * @return the time when the credential became valid, in terms of the number of
     *         milliseconds since the epoch, if known, otherwise null
     */
    Long startTimeMillis();
 
    /**
     * When the credential expires, in terms of the number of milliseconds since the
     * epoch. All expiring credentials by definition must indicate their expiration
     * time -- thus, unlike other methods, we do not support a null return value
     * here.
     *
     * @return the time when the credential expires, in terms of the number of
     *         milliseconds since the epoch
     */
    long expireTimeMillis();
 
    /**
     * The point after which the credential can no longer be refreshed, in terms of
     * the number of milliseconds since the epoch, if any, otherwise null. Some
     * expiring credentials can be refreshed over and over again without limit, so
     * we support a null return value here.
     *
     * @return the point after which the credential can no longer be refreshed, in
     *         terms of the number of milliseconds since the epoch, if any,
     *         otherwise null
     */
    Long absoluteLastRefreshMillis();
}
org.apache.kafka.common.security.oauthbearer.refresh.RefreshConfigProp
package org.apache.kafka.common.security.oauthbearer.refresh;

/**
 * Individual refresh-related configuration properties defining how
 * {@link ExpiringCredentialRefreshingLogin} refreshes instances of
 * {@link ExpiringCredential}. Each value has a type, optional min/max/default
 * values, an ordered list of string keys (containing the enum name and the
 * camelCase version of the name, in that order) so it can be picked out of a
 * map keyed with Strings, and the ability to parse a string or non-string value
 * associated with one of its string keys in a map.
 *
 * @see RefreshConfig
 */
public enum RefreshConfigProp {
    /**
     * Background login refresh thread will sleep until the specified window factor
     * relative to the credential's total lifetime has been reached, at which time
     * it will try to refresh the credential. The default value is 0.8 (80%).
     */
    REFRESH_WINDOW_FACTOR(Double.class, 0.5, 1.0, 0.8),
    /**
     * Amount of random jitter added to the background login refresh thread's sleep
     * time. The default value is 0.05 (5%).
     */
    REFRESH_WINDOW_JITTER(Double.class, 0.0, 0.25, 0.05),
    /**
     * The minimum time between checks by the background login refresh thread,
     * regardless of other constraints, in milliseconds. The default value is 60,000
     * (1 minute).
     */
    REFRESH_MIN_PERIOD_MILLIS(Long.class, 0L, 1000L * 60 * 15, 1000L * 60 * 1),
    /**
     * If the {@code LoginModule} and {@code SaslClient} implementations support
     * multiple simultaneous login contexts on a single {@code Subject} at the same
     * time. If true, then upon refresh, logout will only be invoked on the original
     * {@code LoginContext} after a new one successfully logs in. This can be
     * helpful if the original credential still has some lifetime left when an
     * attempt to refresh the credential fails; the client will still be able to
     * create new connections as long as the original credential remains valid.
     * Otherwise, if logout is immediately invoked prior to relogin, a relogin
     * failure leaves the client without the ability to connect until relogin does
     * in fact succeed. The default value is false.
     */
    RELOGIN_ALLOWED_BEFORE_LOGOUT(Boolean.class, Boolean.FALSE, Boolean.TRUE, Boolean.FALSE);
    private RefreshConfigProp(Class<? extends Comparable<?>> valueType, Comparable<?> minValue, Comparable<?> maxValue,
            Comparable<?> defaultValue) {
        // etc...
    }
 
    /**
     * Return the always non-null value type
     *
     * @return the always non-null value type
     */
    public Class<? extends Comparable<?>> valueType() {
        return valueType;
    }
    /**
     * Return the minimum value, if any, as an instance of the value type, otherwise
     * null
     *
     * @return the minimum value, if any, as an instance of the value type,
     *         otherwise null
     */
    public Comparable<?> minValue() {
        return minValue;
    }

    /**
     * Return the maximum value, if any, as an instance of the value type, otherwise
     * null
     *
     * @return the maximum value, if any, as an instance of the value type,
     *         otherwise null
     */
    public Comparable<?> maxValue() {
        return maxValue;
    }

    /**
     * Return the default value, if any, as an instance of the value type, otherwise
     * null
     *
     * @return the default value, if any, as an instance of the value type,
     *         otherwise null
     */
    public Comparable<?> defaultValue() {
        return defaultValue;
    }
 
    /**
     * The ordered list of string keys (containing the enum name and the camelCase
     * version of the name, in that order) so the instance can be picked out of a
     * map keyed with Strings
     * 
     * @return the always non-null/non-empty list of string keys (containing the
     *         enum name and the camelCase version of the name, in that order) that
     *         denote this instance in a {@code Map}
     */
    public List<String> stringKeys() {
        return stringKeys;
    }
 
    /**
     * Return the (potentially null) value as the value type
     *
     * @param value
     *            a (potentially null) value of either the value type or a type such
     *            that the result of invoking {@code valueOf()} on the value type
     *            class against the value results in the value converted to the
     *            value type
     * @return the (potentially null) value as the value type
     * @throws OAuthBearerConfigException
     *             if a non-null value cannot be converted to the value type while
     *             remaining consistent with any min/max constraints
     */
    public Comparable<?> parseValue(Object value) throws OAuthBearerConfigException {
        // etc...
    }
 
    // etc...
}
org.apache.kafka.common.security.oauthbearer.refresh.RefreshConfig
package org.apache.kafka.common.security.oauthbearer.refresh;
 
/**
 * Immutable refresh-related configuration for instances of
 * {@link ExpiringCredentialRefreshingLogin}. Configuration that is independent
 * of the actual credential itself and that can be defined as login module
 * options in a JAAS config should be stored here.
 */
public class RefreshConfig {
    /**
     * Default constructor with individual refresh configuration properties
     * being set to their default values
     */
    public RefreshConfig() {
        this(Collections.<String, String>emptyMap(), "");
    }

    /**
     * Constructor based on a map with keys being the String keys associated with
     * {@link RefreshConfigProp} instances and values being either Strings or
     * non-Strings. Individual refresh configuration properties that are not
     * explicitly set to a valid value on the given map will be set to their default
     * value for this instance.
     * 
     * @param configMap
     *            the mandatory (but possibly empty) configuration map upon which to
     *            build this instance
     * @see RefreshConfigProp#stringKeys()
     * @see RefreshConfigProp#parseValue(Object)
     */
    public RefreshConfig(Map<String, String> configMap) {
        this(configMap, "");
    }

    /**
     * Constructor based on a map with keys being the String keys associated with
     * {@link RefreshConfigProp} instances and values being either Strings or
     * non-Strings. Individual refresh configuration properties that are not
     * explicitly set to a valid value on the given map will be set to their default
     * value for this instance.
     *
     * @param configMap
     *            the mandatory (but possibly empty) configuration map upon which to
     *            build this instance
     * @param keyPrefix
     *            the mandatory (but potentially blank) prefix to prepend to String
     *            keys
     * @see RefreshConfigProp#stringKeys()
     * @see RefreshConfigProp#parseValue(Object)
     */
    public RefreshConfig(Map<String, String> configMap, String keyPrefix) {
        // etc...
    }
 
    /**
     * Return the (always non-null and unmodifiable) {@code Map} that resulted from
     * the parsing of the input provided at construction time
     *
     * @return the (always non-null and unmodifiable) {@code Map} that resulted from
     *         the parsing of the input provided at construction time
     */
    public Map<RefreshConfigProp, Object> refreshConfigMap() {
        return refreshConfigMap;
    }

    /**
     * Background login refresh thread will sleep until the specified window factor
     * relative to the credential's total lifetime has been reached, at which time
     * it will try to refresh the credential. The default value is 0.8 (80%).
     *
     * @return the refresh window factor
     * @see RefreshConfigProp#REFRESH_WINDOW_FACTOR
     */
    public double refreshWindowFactor() {
        return refreshWindowFactor;
    }

    /**
     * Amount of random jitter added to the background login refresh thread's sleep
     * time. The default value is 0.05 (5%).
     *
     * @return the refresh window jitter
     * @see RefreshConfigProp#REFRESH_WINDOW_JITTER
     */
    public double refreshWindowJitter() {
        return refreshWindowJitter;
    }

    /**
     * The minimum time between checks by the background login refresh thread,
     * regardless of other constraints, in milliseconds. The default value is 60,000
     * (1 minute).
     *
     * @return the minimum refresh period, in milliseconds
     * @see RefreshConfigProp#REFRESH_MIN_PERIOD_MILLIS
     */
    public long refreshMinPeriodMillis() {
        return refreshMinPeriodMillis;
    }

    /**
     * If the LoginModule and SaslClient implementations support multiple
     * simultaneous login contexts on a single Subject at the same time. If true,
     * then upon refresh, logout will only be invoked on the original LoginContext
     * after a new one successfully logs in. This can be helpful if the original
     * credential still has some lifetime left when an attempt to refresh the
     * credential fails; the client will still be able to create new connections as
     * long as the original credential remains valid. Otherwise, if logout is
     * immediately invoked prior to relogin, a relogin failure leaves the client
     * without the ability to connect until relogin does in fact succeed. The
     * default value is false.
     *
     * @return true if relogin is allowed prior to discarding an existing
     *         (presumably unexpired) token, otherwise false
     * @see RefreshConfigProp#RELOGIN_ALLOWED_BEFORE_LOGOUT
     */
    public boolean reloginAllowedBeforeLogout() {
        return reloginAllowedBeforeLogout;
    }

    // etc...
}
org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredentialLoginModule
package org.apache.kafka.common.security.oauthbearer.refresh;

import javax.security.auth.spi.LoginModule;

/**
 * An extension of the {@code LoginModule} interface that must be implemented by
 * any login module that generates an instance of {@link ExpiringCredential} to
 * be managed/refreshed by {@link ExpiringCredentialRefreshingLogin}. The
 * existence of this interface is necessary to deal with the case when there are
 * multiple enabled SASL mechanisms, one or more of which generate an expiring
 * credential, and a SASL mechanism (as opposed to authentication via SSL) is
 * used for inter-broker communication.
 * <p>
 * The following broker-side JAAS configuration helps illustrate the need for
 * this interface:
 *
 * <pre>
 * KafkaServer {
 *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
 *      // etc...;
 *      some.othersaslmechanism.OtherSaslMechanismLoginModule Optional
 *      // etc...;
 * };
 * </pre>
 *
 * The {@code LoginContext} instance will initialize both login modules and ask
 * them both to login -- that's how JAAS works -- regardless of which SASL
 * mechanism is used for inter-broker communication. It is imperative that the
 * login succeeds for the login module associated with the mechanism configured
 * for inter-broker communication; it doesn't matter if any other mechanisms
 * fail because they aren't actually being used for client-side work (they are
 * only being used for the server side of the SASL handshake, and that is
 * performed by the {@code SaslServer} instance rather than the
 * {@code LoginModule} instance). This has 2 implications:
 * <p>
 * <ol>
 * <li>The {@code CallbackHandler} instance provided to the {@code LoginContext}
 * instance (which then passes it to all of the login modules) must be the
 * correct one for the login module that must succeed; it does not have to be
 * the correct one for any others.
 * <li>The login modules that don't have to succeed (because they aren't being
 * used for inter-broker communication) should be marked {@code Optional} in
 * case they fail.
 * </ol>
 * <p>
 * This raises the critical issue of how any instance of {@link Login} can know
 * which {@code CallbackHandler} instance to instantiate. The
 * {@link ScramLoginModule} and {@link PlainLoginModule} don't use the callback
 * handler, but {@code com.sun.security.auth.module.Krb5LoginModule} does, and
 * in fact {@link LoginCallbackHandler} serves the purpose of short-circuiting
 * any request for user interaction in that case. So this issue hasn't been
 * critical in the past; it is only now becoming more important.
 * <p>
 * All the {@code Login} instance knows is the SASL mechanism enabled for
 * inter-broker communication (from the broker config) and the JAAS
 * configuration (for example, the one shown above). It cannot know which
 * {@code CallbackHandler} instance to instantiate from just that information
 * because it cannot determine from that information alone which of the login
 * modules handles the declared inter-broker communication mechanism.
 * <p>
 * We thus arrive at the need for this interface. A {@code Login} instance can
 * look at all of the declared login module classes, determine which of them
 * implements this interface, and then for each of those it can instantiate an
 * instance using the default constructor and ask for its applicable mechanisms
 * via {@link #mechanisms()}. If it finds a login module with an applicable SASL
 * mechanism matching the one being used for inter-broker communication it can
 * then ask for a {@code CallbackHandler} instance via
 * {@link #newCallbackHandler()}. Otherwise, if it doesn't find a match, it just
 * instantiates an instance of {@link LoginCallbackHandler}, which is what
 * {@link AbstractLogin#login()} currently creates and is required for the
 * initial Kerberos login attempt for short-circuit behavior as mentioned above.
 */
public interface ExpiringCredentialLoginModule extends LoginModule {
    /**
     * Return the set of SASL mechanisms this login module applies to
     *
     * @return the set of SASL mechanisms this login module applies to
     */
    Set<String> mechanisms();

    /**
     * Return a new {@code CallbackHandler} instance appropriate for this login
     * module when one of its supported mechanisms as returned by
     * {@link #mechanisms()} is the SASL mechanism for inter-broker communication.
     *
     * @return a new {@code CallbackHandler} instance appropriate for this login
     *         module when one of its supported mechanisms as returned by
     *         {@link #mechanisms()} is the SASL mechanism for inter-broker
     *         communication.
     */
    CallbackHandler newCallbackHandler();
} 
org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredentialRefreshingLogin
package org.apache.kafka.common.security.oauthbearer.refresh;

import org.apache.kafka.common.security.auth.Login;

/**
 * This class is responsible for refreshing logins for both Kafka client and
 * server when the login is a type that has a limited lifetime and will expire. The
 * credentials for the login must implement {@link ExpiringCredential}, and the
 * {@code LoginModule} must implement {@link ExpiringCredentialLoginModule}.
 */
public class ExpiringCredentialRefreshingLogin implements Login {
    // etc...
 
    public RefreshConfig refreshConfig() {
        return refreshConfig;
    }
 
    // etc...
} 

 

OAuth Bearer Tokens

Every OAuth Bearer Token is a string representing an authorization issued to the client.  There are at least two types of OAuth Bearer Tokens: an opaque token and a JSON Web Token (JWT).  Nothing can be said about the contents of an opaque token other than that the contents are only available via some third party (e.g. an authorization server).  Every JWT contains both a JSON Object Signing and Encryption (JOSE) Header and a JWT Claims Set.  There are two distinct types of JWTs: a JSON Web Signature (JWS) and a JSON Web Encryption (JWE).  There is no intent to model a JWE with the initial implementation.

 

OAuth Bearer Tokens

org.apache.kafka.common.security.oauthbearer.OAuthBearerToken
package org.apache.kafka.common.security.oauthbearer;
 /**
 * The <code>b64token</code> value as defined in
 * <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section
 * 2.1</a> along with the token's specific scope and lifetime.
 *
 * @see <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749
 *      Section 1.4</a> and
 *      <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
 *      Section 2.1</a>
 */
public interface OAuthBearerToken extends ExpiringCredential {
    /**
     * The <code>b64token</code> value as defined in
     * <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section
     * 2.1</a>
     *
     * @return <code>b64token</code> value as defined in
     *         <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
     *         Section 2.1</a>
     */
    String value();


    /**
     * The token's scope of access, as per
     * <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749 Section
     * 1.4</a>
     *
     * @return the token's (always non-null but potentially empty) scope of access,
     *         as per <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC
     *         6749 Section 1.4</a>. Note that all values in the returned set will
     *         be trimmed of preceding and trailing whitespace, and the result will
     *         never contain the empty string.
     * @throws IOException
     *             if one or more required networked resources (e.g. to re-hydrate
     *             an opaque token) is unavailable.
     * @throws OAuthBearerIllegalTokenException
     *             if there is something fundamentally wrong with the token (if it
     *             is malformed, for example)
     */
    Set<String> scope() throws IOException, OAuthBearerIllegalTokenException;

    /**
     * The token's lifetime, expressed as the number of milliseconds since the
     * epoch, as per <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC
     * 6749 Section 1.4</a>
     *
     * @return the token'slifetime, expressed as the number of milliseconds since
     *         the epoch, as per
     *         <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749
     *         Section 1.4</a>.
     * @throws IOException
     *             if one or more required networked resources (e.g. to re-hydrate
     *             an opaque token) is unavailable.
     * @throws OAuthBearerIllegalTokenException
     *             if there is something fundamentally wrong with the token (if it
     *             is malformed, for example)
     */
    long lifetime() throws IOException, OAuthBearerIllegalTokenException;
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerJwt
package org.apache.kafka.common.security.oauthbearer;

/**
 * A JSON Web Token, which may either be a JWS or a JWE. Neither signature
 * validation nor decryption are performed here.
 *
 * @see <a href="https://tools.ietf.org/html/rfc7519">RFC 7519</a>
 */
public abstract class OAuthBearerJwt implements OAuthBearerToken {
    /**
     * Constructor with the principal taken from the '{@code sub}' claim (if any)
     * and no scope claim name
     *
     * @param compactSerialization
     *            the compact serialization to parse as a JWT
     * @throws OAuthBearerIllegalTokenException
     *             if compactSerialization cannot be either a JWS or JWE (due to
     *             there not being either 3 or 5 dot-separated values; or the JWT
     *             header either not being a valid Base64 URL-encoded value or not
     *             valid JSON after decoding)
     */
    public OAuthBearerJwt(String compactSerialization) throws OAuthBearerIllegalTokenException {
        this(compactSerialization, null);
    }

    /**
     * Constructor with the given principal and scope claim names
     *
     * @param compactSerialization
     *            the compact serialization to parse as a JWT
     * @param principalClaimName
     *            the optional principal claim name
     * @param scopeClaimName
     *            the optional scope claim name
     * @throws OAuthBearerIllegalTokenException
     *             if compactSerialization cannot be either a JWS or JWE (due to
     *             there not being either 3 or 5 dot-separated values; or the JWT
     *             header either not being a valid Base64 URL-encoded value or not
     *             valid JSON after decoding)
     */
    public OAuthBearerJwt(String compactSerialization, String principalClaimName, String scopeClaimName)
            throws OAuthBearerIllegalTokenException {
        // etc...
    }
 
    /**
     * Return the 3 or 5 dot-separated sections of the JWT compact serialization
     * 
     * @return the 3 or 5 dot-separated sections of the JWT compact serialization
     */
    public List<String> splits() {
        return splits;
    }

    /**
     * Return the JOSE Header as a {@code Map}
     *
     * @return the JOSE header
     */
    public Map<String, Object> header() {
        return header;
    }

    /**
     * Return the JWT Claim Set as a {@code Map}
     *
     * @return the (always non-null but possibly empty) claims
     */
    public abstract Map<String, Object> claims();

    /**
     * Return the scope claim name, if any, otherwise null
     *
     * @return the scope claim name, if any, otherwise null
     */
    public String scopeClaimName() {
        return scopeClaimName;
    }

    /**
     * Indicate if the claim exists and is the given type
     *
     * @param claimName
     *            the mandatory JWT claim name
     * @param type
     *            the mandatory type, which should either be String.class,
     *            Number.class, or List.class
     * @return true if the claim exists and is the given type, otherwise false
     */
    public boolean isClaimType(String claimName, Class<?> type) {
        // etc...
    }

    /**
     * Extract a claim of the given type
     *
     * @param claimName
     *            the mandatory JWT claim name
     * @param type
     *            the mandatory type, which must either be String.class,
     *            Number.class, or List.class
     * @return the claim if it exists, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim exists but is not the given type
     */
    public <T> T getClaim(String claimName, Class<T> type) throws OAuthBearerIllegalTokenException {
        // etc...
    }

    /**
     * Extract a claim that could be either a String or a String List as a String
     * List (if it was a String it will be returned as a list of size 1).
     *
     * @param claimName
     *            the mandatory JWT claim name
     * @return the claim in the form of a String List, if it exists, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim exists but is neither a String nor a List
     */
    public List<String> getListClaimFromStringOrList(String claimName) throws OAuthBearerIllegalTokenException {
        // etc...
    }
 
    /**
     * Extract a claim in its raw form
     *
     * @param claimName
     *            the mandatory JWT claim name
     * @return the raw claim value, if it exists, otherwise null
     */
    public Object getRawClaim(String claimName) {
        return claims().get(Objects.requireNonNull(claimName));
    }

    /**
     * Return the
     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.3">Audience</a>
     * claim as a String List
     *
     * @return the <a href=
     *         "https://tools.ietf.org/html/rfc7519#section-4.1.3">Audience</a>
     *         claim as a String List if available, otherwise null. An Audience
     *         claim that is a String will be returned as a List of size 1.
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public List<String> audience() throws OAuthBearerIllegalTokenException {
        return getListClaimFromStringOrList(JwtClaim.AUDIENCE.claimName());
    }

    /**
     * Return the
     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.4">Expiration
     * Time</a> claim
     *
     * @return the <a href=
     *         "https://tools.ietf.org/html/rfc7519#section-4.1.4">Expiration
     *         Time</a> claim if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public Number expirationTime() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.EXPIRATION_TIME.claimName(), Number.class);
    }

    /**
     * Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.6">Issued
     * At</a> claim
     *
     * @return the
     *         <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.6">Issued
     *         At</a> claim if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public Number issuedAt() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.ISSUED_AT.claimName(), Number.class);
    }
 
    /**
     * Return the
     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.1">Issuer</a> claim
     *
     * @return the <a href=
     *         "https://tools.ietf.org/html/rfc7519#section-4.1.1">Issuer</a> claim
     *         if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public String issuer() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.ISSUER.claimName(), String.class);
    }

    /**
     * Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.7">JWT
     * ID</a> claim
     *
     * @return the <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.7">JWT
     *         ID</a> claim if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public String jwtId() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.JWT_ID.claimName(), String.class);
    }

    /**
     * Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.5">Not
     * Before</a> claim
     *
     * @return the <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.5">Not
     *         Before</a> claim if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public Number notBefore() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.NOT_BEFORE.claimName(), Number.class);
    }

    /**
     * Return the
     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.2">Subject</a> claim
     *
     * @return the <a href=
     *         "https://tools.ietf.org/html/rfc7519#section-4.1.2">Subject</a> claim
     *         if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public String subject() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.SUBJECT.claimName(), String.class);
    }
 
    /**
     * Decode the given Base64URL-encoded value, parse the resulting JSON as a JSON
     * object, and return the map of member names to their values (each value being
     * represented as either a String, a Number, or a List of Strings).
     *
     * @param split
     *            the value to decode and parse
     * @return the map of JSON member names to their String, Number, or String List
     *         value
     * @throws OAuthBearerIllegalTokenException
     *             if the given Base64URL-encoded value cannot be decoded or parsed
     */
    public static Map<String, Object> toMap(String split) throws OAuthBearerIllegalTokenException {
        // etc...
    }
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerJws
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * @see <a href="https://tools.ietf.org/html/rfc7515">RFC 7515</a>. Note that
 *      digital signature validation is not performed.
 */
public class OAuthBearerJws extends OAuthBearerJwt {
    /**
     * Constructor with no scope claim name
     *
     * @param compactSerialization
     *            the compact serialization to parse as a JWT
     * @throws OAuthBearerIllegalTokenException
     *             if the compact serialization is not a valid JWS (meaning it did
     *             not have 3 dot-separated Base64URL sections; or the header or
     *             claims either are not valid Base 64 URL encoded values or are not
     *             JSON after decoding; or the existence or absence of a digital
     *             signature is inconsistent with the mandatory 'alg' header value)
     */
    public OAuthBearerJws(String compactSerialization) throws OAuthBearerIllegalTokenException {
        this(compactSerialization, JwtClaim.SUBJECT.claimName(), null);
    }

    /**
     * Constructor with the given principal and scope claim names
     *
     * @param compactSerialization
     *            the compact serialization to parse as a JWS
     * @param principalClaimName
     *            the optional principal claim name
     * @param scopeClaimName
     *            the optional scope claim name
     * @throws OAuthBearerIllegalTokenException
     *             if compactSerialization cannot be either a JWS or JWE (due to
     *             there not being either 3 or 5 dot-separated values; or the JWT
     *             header either not being a valid Base64 URL-encoded value or not
     *             valid JSON after decoding)
     */
    public OAuthBearerJws(String compactSerialization, String principalClaimName, String scopeClaimName)
            throws OAuthBearerIllegalTokenException {
        // etc...
    }
}
org.apache.kafka.common.security.oauthbearer.JwtHeaderParameter
package org.apache.kafka.common.security.oauthbearer;

/**
 * JSON Web Signature and Encryption Header Parameters
 *
 * @see <a href="https://www.iana.org/assignments/jose/jose.xhtml">JSON Web
 *      Signature and Encryption Header Parameters</a>
 */
public enum JwtHeaderParameter {
    ALGORITHM("alg"),
    JWK_SET_URL("jku"),
    JSON_WEB_KEY("jwk"),
    KEY_ID("kid"),
    X_509_URL("x5u"),
    X_509_CERTIFICATE_CHAIN("x5c"),
    X_509_CERTIFICATE_SHA1_THUMBPRINT("x5t"),
    X_509_CERTIFICATE_SHA256_THUMBPRINT("x5t#S256"),
    TYPE("typ"),
    CONTENT_TYPE("cty"),
    CRITICAL("crit"),
    ENCRYPTION_ALGORITHM("enc"),
    COMPRESSION_ALGORITHM("zip"),
    ISSUER("iss"),
    SUBJECT("sub"),
    AUDIENCE("aud");

    private String headerParameterName;

    private JwtHeaderParameter(String headerParameterName) {
        this.headerParameterName = headerParameterName;
    }

    String headerParameterName() {
        return headerParameterName;
    }
}
org.apache.kafka.common.security.oauthbearer.JwtClaim
package org.apache.kafka.common.security.oauthbearer;

/**
 * JSON Web Token Claims
 *
 * @see <a href="https://www.iana.org/assignments/jwt/jwt.xhtml">JSON Web Token
 *      Claims</a>
 */
public enum JwtClaim {
    ISSUER("iss"),
    SUBJECT("sub"),
    AUDIENCE("aud"),
    EXPIRATION_TIME("exp"),
    NOT_BEFORE("nbf"),
    ISSUED_AT("iat"),
    JWT_ID("jti");

    private String claimName;

    private JwtClaim(String claimName) {
        this.claimName = claimName;
    }

    public String claimName() {
        return claimName;
    }
}

Token Retrieval

Token Retrieval

Token retrieval occurs on the client side of the SASL negotiation (in the producer/consumer, or on the broker when SASL/OAUTHBEARER is the inter-broker protocol), and the org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule class is the LoginModule implementation that creates and invokes an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenRetriever to perform the retrieval.  We provide the org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever class as a sample implementation that also provides value in development and testing situations.

org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
package org.apache.kafka.common.security.oauthbearer;

import org.apache.kafka.common.security.oauthbearer.internal.OAuthBearerSaslClientProvider;
import org.apache.kafka.common.security.oauthbearer.internal.OAuthBearerSaslServerProvider;
/**
 * A {@code LoginModule} for the SASL/OAUTHBEARER mechanism.
 * <p>
 * Example use on a non-broker client:
 *
 * <pre>
 * KafkaClient {
 *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
 *      reloginAllowedBeforeLogout="true"
 *      tokenRetriever="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever"
 *          stringClaim_sub="theClientPrincipalName"
 *          listClaim_scope="|scopeValue1|scopeValue2";
 * };
 * </pre>
 *
 * Example use on a broker:
 *
 * <pre>
 * KafkaServer {
 *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
 *      reloginAllowedBeforeLogout="true"
 *      tokenRetriever="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever"
 *          stringClaim_sub="theBrokerPrincipalName"
 *          listClaim_scope="|scopeValue1|scopeValue2"
 *      tokenValidator="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtValidator"
 *          principalClaimName="sub";
 * };
 * </pre>
 *
 * @see RefreshConfigProp#RELOGIN_ALLOWED_BEFORE_LOGOUT
 * @see OAuthBearerTokenRetriever
 * @see OAuthBearerTokenValidator
 */
public class OAuthBearerLoginModule implements ExpiringCredentialLoginModule {
    public static final String TOKEN_RETRIEVER_CLASS_NAME_OPTION = "tokenRetriever";
    public static final String TOKEN_VALIDATOR_CLASS_NAME_OPTION = "tokenValidator";
    private static final Set<String> MECHANISMS = Collections
            .unmodifiableSet(new HashSet<>(Arrays.asList("OAUTHBEARER")));
 
    static {
        OAuthBearerSaslClientProvider.initialize(); // not part of public API
        OAuthBearerSaslServerProvider.initialize(); // not part of public API
    }
 
    @Override
    public Set<String> mechanisms() {
        return MECHANISMS;
    }
    @Override
    public CallbackHandler newCallbackHandler() {
        return new SubstitutableModuleOptionsCallbackHandler();
    }
 
    // etc...

}
org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenRetriever
package org.apache.kafka.common.security.oauthbearer;
/**
 * An implementation of this interface must be configured for broker and
 * non-broker clients via JAAS when using the {@link OAuthBearerLoginModule}.
 * The configuration is done via the
 * {@value OAuthBearerLoginModule#TOKEN_RETRIEVER_CLASS_NAME_OPTION} option.
 */
public interface OAuthBearerTokenRetriever {
    /**
     * Retrieve a token using the given callback handler and JAAS login module
     * options.
     *
     * @param callbackHandler
     *            the mandatory callback handler. It will typically be capable of
     *            handling instances of {@link SubstitutableModuleOptionsCallback},
     *            though different implementations of this interface are free to
     *            differ in their requirements.
     * @param moduleOptionsMap
     *            the mandatory map representation of the <a href=
     *            "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
     *            options</a>
     * @return the retrieved token
     * @throws IOException
     *             if one or more networked resources required to perform the
     *             retrieval (e.g. a web service) is unavailable.
     * @throws UnsupportedCallbackException
     *             if the provided {@code CallbackHandler} cannot handle
     *             {@link SubstitutableModuleOptionsCallback}.
     * @throws OAuthBearerConfigException
     *             if there is a configuration problem that prevents this instance
     *             from functioning (a missing mandatory parameter, for example)
     * @throws LoginException
     *             if the retrieval fails for any other reason (if the token
     *             endpoint rejects the provided credentials, for example)
     */
    OAuthBearerToken retrieve(CallbackHandler callbackHandler, Map<String, String> moduleOptionsMap)
            throws IOException, UnsupportedCallbackException, OAuthBearerConfigException, LoginException;
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever
package org.apache.kafka.common.security.oauthbearer;

/**
 * An implementation of {@link OAuthBearerTokenRetriever} that generates an
 * unsecured JWT. Claims and their values can be specified using
 * stringClaim_&lt;claimname&gt;, numberClaim_&lt;claimname&gt;, and
 * listClaim_&lt;claimname&gt; options. The first character of the value is
 * taken as the delimiter for list claims. You may define any claim name and
 * value except '{@code iat}' and '{@code exp}', both of which are calculated
 * automatically.
 * <p>
 * This implementation also accepts the following options:
 * <ul>
 * <li><code>{@value #PRINCIPAL_CLAIM_NAME_OPTION}</code> set to a custom claim
 * name if you wish the name of the String claim holding the principal name to
 * be something other than <code>"sub"</code>.</li>
 * <li><code>{@value #LIFETIME_SECONDS_OPTION}</code> set to an integer value if
 * the token expiration is to be set to something other than the default value
 * of 3600 seconds (which is 1 hour). The 'exp' claim reflects the expiration
 * time.</li>
 * <li><code>{@value #SCOPE_CLAIM_NAME_OPTION}</code> set to a custom claim name
 * if you wish the name of the String or String List claim holding any token
 * scope to be something other than "<code>scope</code>"</li>
 * </ul>
 * For example:
 *
 * <pre>
 * KafkaClient {
 *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
 *      tokenRetriever="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever"
 *      reloginAllowedBeforeLogout="true"
 *         stringClaim_sub="thePrincipalName"
 *         listClaim_scope="|scopeValue1|scopeValue2"
 *         lifetimeSeconds="60";
 * };
 *
 * </pre>
 */
public class OAuthBearerUnsecuredJwtRetriever implements OAuthBearerTokenRetriever {
    public static final String PRINCIPAL_CLAIM_NAME_OPTION = "principalClaimName";
    public static final String LIFETIME_SECONDS_OPTION = "lifetimeSeconds";
    public static final String SCOPE_CLAIM_NAME_OPTION = "scopeClaimName";
 
    // etc...
 
}

 

Token Validation

Token Validation

Token validation occurs on the broker side of the SASL negotiation, and the SaslServer implementation registered by org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule creates and invokes an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenValidator to perform the validation.  We provide the org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtValidator class as a sample implementation that also provides value in development and testing situations.  There are additional utility classes as shown in the above diagram that are also reusable with other implementations.

org.apache.kafka.common.security.oauthbearer.OAuthBearerSaslServer
package org.apache.kafka.common.security.oauthbearer;

import javax.security.sasl.SaslServer;

/**
 * The public interface of SaslServer implementations of SASL/OAUTHBEARER for
 * Kafka. The validated access token will be available via the {@link #token()}
 * method. The existence of this interface keeps the actual implementation out
 * of the public API.
 */
public interface OAuthBearerSaslServer extends SaslServer {
    /**
     * Return the validated access token. Exposing the token here is necessary if we
     * wish to make it available for authorization decisions.
     *
     * @return the validated access token
     * @see <a href=
     *      "https://cwiki.apache.org/confluence/display/KAFKA/KIP-189%3A+Improve+principal+builder+interface+and+add+support+for+SASL">KIP-189:
     *      Improve principal builder interface and add support for SASL</a>
     */
    public OAuthBearerToken token();
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenValidator
package org.apache.kafka.common.security.oauthbearer;

/**
 * An implementation of this interface must be configured for brokers via JAAS
 * when using the {@link OAuthBearerLoginModule}. The configuration is done via
 * the {@value OAuthBearerLoginModule#TOKEN_VALIDATOR_CLASS_NAME_OPTION} option.
 */
public interface OAuthBearerTokenValidator {
    /**
     * Validate a token using the given callback handler and JAAS login module
     * options.
     *
     * @param tokenValue
     *            the <code>b64token</code> value as defined in
     *            <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
     *            Section 2.1</a> to validate
     * @param callbackHandler
     *            the mandatory callback handler. It will typically be capable of
     *            handling instances of {@link SubstitutableModuleOptionsCallback},
     *            though different implementations of this interface are free to
     *            differ in their requirements. Set the callback handler via the
     *            {@code sasl.server.callback.handler.class.map} option in the
     *            broker properties file.
     * @param moduleOptionsMap
     *            the mandatory map representation of the <a href=
     *            "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
     *            options</a>
     * @return the successfully validated token
     * @throws IOException
     *             if one or more networked resources required to perform the
     *             validation (e.g. a web service) is unavailable.
     * @throws UnsupportedCallbackException
     *             if the provided {@code CallbackHandler} cannot handle
     *             {@link SubstitutableModuleOptionsCallback}.
     * @throws OAuthBearerConfigException
     *             if there is a configuration problem that prevents this instance
     *             from functioning (a missing mandatory parameter, for example)
     * @throws OAuthBearerIllegalTokenException
     *             if there is a problem with the token itself (it cannot be parsed
     *             or it otherwise fails validation)
     * @see BrokerSecurityConfigs#SASL_SERVER_CALLBACK_HANDLER_CLASS_MAP_DOC
     */
    OAuthBearerToken validate(String tokenValue, CallbackHandler callbackHandler, Map<String, String> moduleOptionsMap)
            throws IOException, UnsupportedCallbackException, OAuthBearerConfigException,
            OAuthBearerIllegalTokenException;
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtValidator
package org.apache.kafka.common.security.oauthbearer;

/**
 * An implementation of {@link OAuthBearerTokenValidator} that validates a JWS
 * without regard to its algorithm and signature. It requires there to be an
 * <code>"exp" (Expiration Time)</code> claim of type Number. If
 * <code>"iat" (Issued At)</code> or <code>"nbf" (Not Before)</code> claims are
 * present each must be a number that precedes the Expiration Time claim, and if
 * both are present the Not Before claim must not precede the Issued At claim.
 * It also accepts the following options, none of which are required:
 * <ul>
 * <li><code>{@value #PRINCIPAL_CLAIM_NAME_OPTION}</code> set to a non-empty
 * value if you wish a String claim holding a principal name to be checked for
 * existence; the default is to perform no such check</li>
 * <li><code>{@value #SCOPE_CLAIM_NAME_OPTION}</code> set to a custom claim name
 * if you wish the name of the String/String List claim holding any token scope
 * to be something other than <code>"scope"</code></li>
 * <li><code>{@value #REQUIRED_SCOPE_OPTION}</code> set to a space-delimited
 * list of scope values if you wish the String/String List claim holding the
 * token scope to be checked to make sure it contains certain values</li>
 * <li><code>{@value #ALLOWABLE_CLOCK_SKEW_MILLIS_OPTION}</code> set to a
 * positive integer value if you wish to allow up to some number of positive
 * milliseconds of clock skew (the default is 0)</li>
 * </ul>
 * For example:
 *
 * <pre>
 * KafkaServer {
 *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
 *      reloginAllowedBeforeLogout="true"
 *      tokenRetriever="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever"
 *         stringClaim_sub="thePrincipalName"
 *         listClaim_scope=",KAFKA_BROKER,LOGIN_TO_KAFKA"
 *      tokenValidator="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtValidator"
 *          principalClaimName="sub"
 *          requiredScope="LOGIN_TO_KAFKA"
 *          allowableClockSkewMillis="3000";
 * };
 * </pre>
 */
public class OAuthBearerUnsecuredJwtValidator implements OAuthBearerTokenValidator {
    public static final String PRINCIPAL_CLAIM_NAME_OPTION = "principalClaimName";
    public static final String SCOPE_CLAIM_NAME_OPTION = "scopeClaimName";
    public static final String REQUIRED_SCOPE_OPTION = "requiredScope";
    public static final String ALLOWABLE_CLOCK_SKEW_MILLIS_OPTION = "allowableClockSkewMillis";
 
    // etc...
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidationUtils
package org.apache.kafka.common.security.oauthbearer;

public class OAuthBearerValidationUtils {
    /**
     * Validate the given claim for existence and type. It can be required to exist
     * in the given claims, and if it exists it must be one of the types indicated
     *
     * @param jwt
     *            the mandatory JWT to which the validation will be applied
     * @param required
     *            true if the claim is required to exist
     * @param claimName
     *            the required claim name identifying the clam to be checked
     * @param allowedTypes
     *            one or more of {@code String.class}, {@code Number.class}, and
     *            {@code List.class} identifying the type(s) that the claim value is
     *            allowed to be if it exists
     * @return the result of the validation
     */
    public static OAuthBearerValidationResult validateClaimForExistenceAndType(OAuthBearerJwt jwt, boolean required,
            String claimName, Class<?>... allowedTypes) {
        // etc...
    }
 
    /**
     * Validate the 'iat' (Issued At) claim. It can be required to exist in the
     * given claims, and if it exists it must be a (potentially fractional) number
     * of seconds since the epoch defining when the JWT was issued; it is a
     * validation error if the Issued At time is after the time at which the check
     * is being done (plus any allowable clock skew).
     *
     * @param jwt
     *            the mandatory JWT to which the validation will be applied
     * @param required
     *            true if the claim is required to exist
     * @param whenCheckTimeMillis
     *            the time relative to which the validation is to occur
     * @param allowableClockSkewMillis
     *            non-negative number to take into account some potential clock skew
     * @return the result of the validation
     * @throws OAuthBearerConfigException
     *             if the given allowable clock skew is negative
     */
    public static OAuthBearerValidationResult validateIssuedAt(OAuthBearerJwt jwt, boolean required,
            long whenCheckTimeMillis, int allowableClockSkewMillis) throws OAuthBearerConfigException {
        // etc...
    }
 
    /**
     * Validate the 'nbf' (Not Before) claim. It can be required to exist in the
     * given claims, and if it exists it must be a (potentially fractional) number
     * of seconds since the epoch defining the point at which the JWT becomes valid.
     * It is a validation error if the time at which the check is being done (plus
     * any allowable clock skew) is before the Not Before time.
     *
     * @param jwt
     *            the mandatory JWT to which the validation will be applied
     * @param required
     *            true if the claim is required to exist
     * @param whenCheckTimeMillis
     *            the time relative to which the validation is to occur
     * @param allowableClockSkewMillis
     *            non-negative number to take into account some potential clock skew
     * @return the result of the validation
     * @throws OAuthBearerConfigException
     *             if the given allowable clock skew is negative
     */
    public static OAuthBearerValidationResult validateNotBefore(OAuthBearerJwt jwt, boolean required,
            long whenCheckTimeMillis, int allowableClockSkewMillis) throws OAuthBearerConfigException {
        // etc...
    }
 
    /**
     * Validate the 'exp' (Expiration Time) claim. It can be required to exist in
     * the given claims, and if it exists it must be a (potentially fractional)
     * number of seconds defining the point at which the JWT expires. It is a
     * validation error if the time at which the check is being done (minus any
     * allowable clock skew) is on or after the Expiration Time time.
     *
     * @param jwt
     *            the mandatory JWT to which the validation will be applied
     * @param required
     *            true if the claim is required to exist
     * @param whenCheckTimeMillis
     *            the time relative to which the validation is to occur
     * @param allowableClockSkewMillis
     *            non-negative number to take into account some potential clock skew
     * @return the result of the validation
     * @throws OAuthBearerConfigException
     *             if the given allowable clock skew is negative
     */
    public static OAuthBearerValidationResult validateExpirationTime(OAuthBearerJwt jwt, boolean required,
            long whenCheckTimeMillis, int allowableClockSkewMillis) throws OAuthBearerConfigException {
        // etc...
    }
 
    /**
     * Validate the 'iat' (Issued At), 'nbf' (Not Before), and 'exp' (Expiration
     * Time) claims for internal consistency. For claim pairs that exist, the
     * following must be true:
     * <ul>
     * <li>nbf &gt;= iat</li>
     * <li>exp &gt; iat</li>
     * <li>exp &gt; nbf</li>
     * </ul>
     *
     * @param jwt
     *            the mandatory JWT to which the validation will be applied
     * @return the result of the validation
     */
    public static OAuthBearerValidationResult validateTimeConsistency(OAuthBearerJwt jwt) {
        // etc...
    }
 
    /**
     * Validate the given scope against the required scope. Every required scope
     * element (if any) must exist in the provided scope for the validation to
     * succeed.
     *
     * @param scope
     *            the optional scope to validate
     * @param requiredScope
     *            the optional required scope against which the given scope will be
     *            validated
     * @return the result of the validation
     * @throws IOException
     *             if one or more required networked resources (e.g. to re-hydrate
     *             an opaque token) is unavailable.
     * @throws OAuthBearerIllegalTokenException
     *             if there is something fundamentally wrong with the token (if it
     *             is malformed, for example)
     */
    public static OAuthBearerValidationResult validateScope(OAuthBearerToken token, List<String> requiredScope)
            throws IOException, OAuthBearerIllegalTokenException {
        // etc...
    }
 
    // etc...
 
    private OAuthBearerValidationUtils() {
        // empty
    }
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerScopeUtils
package org.apache.kafka.common.security.oauthbearer;

/**
 * Utility class for help dealing with
 * <a href="https://tools.ietf.org/html/rfc6749#section-3.3">Access Token
 * Scopes</a>
 */
public class OAuthBearerScopeUtils {
    private static final Pattern INDIVIDUAL_SCOPE_ITEM_PATTERN = Pattern.compile("[\\x23-\\x5B\\x5D-\\x7E\\x21]+");

    /**
     * Return true if the given value meets the definition of a valid scope item as
     * per <a href="https://tools.ietf.org/html/rfc6749#section-3.3">RFC 6749
     * Section 3.3</a>, otherwise false
     *
     * @param scopeItem
     *            the mandatory scope item to check for validity
     * @return true if the given value meets the definition of a valid scope item,
     *         otherwise false
     */
    public static boolean isValidScopeItem(String scopeItem) {
        return INDIVIDUAL_SCOPE_ITEM_PATTERN.matcher(Objects.requireNonNull(scopeItem)).matches();
    }

    /**
     * Convert a space-delimited list of scope values (for example,
     * <code>"scope1 scope2"</code>) to a List containing the individual elements
     * (<code>"scope1"</code> and <code>"scope2"</code>)
     *
     * @param spaceDelimitedScope
     *            the mandatory (but possibly empty) space-delimited scope values,
     *            each of which must be valid according to
     *            {@link #isValidScopeItem(String)}
     * @return the list of the given (possibly empty) space-delimited values
     * @throws OAuthBearerConfigException
     *             if any of the individual scope values are malformed/illegal
     */
    public static List<String> parseScope(String spaceDelimitedScope) throws OAuthBearerConfigException {
        // etc...
    }


    private OAuthBearerScopeUtils() {
        // empty
    }
}

 

Proposed Changes

Thanks to KIP-86, no changes to the existing code base are required – all functionality represents additions (rather than changes) to the existing code base.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

None

  • If we are changing behavior how will we phase out the older behavior?

There is no change to existing behavior

Rejected Alternatives

Exceptions

Unchecked exceptions can be added or deleted without breaking binary compatibility, and Kafka uses unchecked exceptions exclusively, so OAuthBearerException is unchecked.  We considered making OAuthBearerException a checked exception to get the compiler help provided by the use of checked exceptions because the set of exception types and the size of the overall code base are both small such that the risk that we would want to change the type(s) of exception(s) thrown is minimal.  Nonetheless, we agreed the benefit of consistency and eliminating the risk of breaking binary compatibility outweighed the benefit of the compiler help provided by checked exceptions.

Token Refresh

<<<NOTE: Please consider this to be open for debate at this point>>>

We implement org.apache.kafka.common.security.oauthbearer.ExpiringCredential as an interface rather than as a class because an interface creates less of a constraint – implementers of other types of expiring credentials remain free to extend any class they want.  The risk of having to add a new method while Java 7 is still a supported target (which would break forward compatibility) is low.

We considered associating refresh-related properties (such as the minimum refresh period in milliseconds) with the ExpiringCredential rather than the ExpiringCredentialRefreshingLogin instance because the ExpiringCredentialRefreshingLogin instance couldn't know which of the potentially multiple login modules actually applies (i.e. which is the one associated with the inter-broker protocol); it wouldn't always know how to find the JAAS config options, so it wouldn't always know how to get the refresh configuration.  There was problem with this aproach, though: we can't invoke login() on the LoginContext and get an ExpiringCredential instance without a CallbackHandler, so we needed to know the type of CallbackHandler to instantiate – and there is no way to know that.  It simply made sense to give the ExpiringCredentialRefreshingLogin instance the ability to discover the correct login module in all cases and be able to ask it for the CallbackHandler instance; hence we created the ExpiringCredentialLoginModule interface.

 

  • No labels