You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current state: "Under Discussion"

Discussion thread:  here

JIRA: KAFKA-6562

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The ability to authenticate to Kafka with an OAuth 2 Access Token is desirable given the popularity of OAuth.  The introduction of KIP-86 (Configurable SASL callback handlers) makes it possible to non-intrusively add new SASL mechanisms, and OAUTHBEARER is the SASL mechanism for OAuth.  OAuth 2 is a flexible framework with multiple ways of doing the same thing (for example, getting a public key to confirm a digital signature), so pluggable implementations – and flexibility in their configuration – is a requirement.

This KIP proposes to add the following functionality related to SASL/OAUTHBEARER:

  • Allow clients (both brokers when SASL/OAUTHBEARER is the inter-broker protocol as well as non-broker clients) to flexibly retrieve an access token from an OAuth 2 authorization server according to JAAS-defined configuration (and potentially plugged-in implementation) and present the access token to a broker for authentication.
  • Allow brokers to flexibly validate provided access tokens when a client establishes a connection based on JAAS-defined configuration (and potentially plugged-in implementation).
  • Provide a toolkit of reusable functionality applicable to common token retrieval and validation scenarios so that implementations can either be reused or, when they are custom and plugged-in, be relatively small.
  • Allow clients to transparently retrieve a new access token in the background before the existing access token expires in case the client has to open new connections.  Any existing connections will remain unaffected by this "token refresh" functionality as long as the connection remains intact, but new connections from the same client will always use the latest access token (either the initial one or the one that was most recently retrieved by the token refresh functionality, if any) – this is how Kerberos-authenticated connections work with respect to ticket expiration.

Note that the access token can be made available to the broker for authorization decisions due to KIP-189 (by exposing the access token on the SaslServer implementation), but detailed discussion of this possibility is outside the scope of this proposal.  It is noted, however, that if access tokens are somehow used for authorization decisions, it is conceivable due to the long-lived nature of Kafka connections that authorization decisions will sometimes be made using expired access tokens.  For example, it is up to the broker to validate the token upon authentication, but the token will not be replaced for that particular connection as long as it remains intact; if the token expires in an hour then authorization decisions for that first hour will be made using the still-valid token, but after an hour the expired token would remain associated with the connection, and authorization decisions from that point forward for that particular connection would be made using the expired token.  This would have to be addressed via a separate KIP if it turns out to be problematic, but that seems unlikely (code signing certificates that have been timestamped remain valid after their expiration, for example, and access tokens are indeed timestamped).

 

Public Interfaces

Note that most of the implementation of this KIP will be public-facing.  The following sections define the various parts, and each includes an overall UML diagram as well as important code details (with Javadoc).

Exceptions

See Rejected Alternatives: Exceptions

Exceptions

We define a small exception hierarchy to cover the various cases related to the SASL/OAUTHBEARER code.

org.apache.kafka.common.security.oauthbearer.OAuthBearerException
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * Base class for all exceptions thrown by the SASL/OAUTHBEARER code. Unlike the
 * unchecked {@link KafkaException} hierarchy, this hierarchy can be checked to
 * obtain compiler help because instances are never exposed to the core Kafka
 * code base and there is minimal risk of changing the set of thrown checked
 * exceptions and impacting contract/source code compatibility given the small
 * size of this code base.
 */
public abstract class OAuthBearerException extends Exception {
    // etc...
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerConfigException
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * Exception thrown when there is a problem with the configuration (an invalid
 * option in a JAAS config, for example).
 */
public class OAuthBearerConfigException extends OAuthBearerException {
    // etc...
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerIllegalTokenException
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * Exception thrown when token validation fails due to a problem with the token
 * itself (as opposed to a missing remote resource or a configuration problem)
 */
public class OAuthBearerIllegalTokenException extends OAuthBearerException {
    /**
     * Constructor
     *
     * @param reason
     *            the mandatory reason for the validation failure; it must indicate
     *            failure
     */
    public OAuthBearerIllegalTokenException(OAuthBearerValidationResult reason) {
        super(Objects.requireNonNull(reason).failureDescription());
        if (reason.success())
            throw new IllegalArgumentException("The reason indicates success; it must instead indicate failure");
        this.reason = reason;
    }

    /**
     * Return the (always non-null) reason for the validation failure
     *
     * @return the reason for the validation failure
     */
    public OAuthBearerValidationResult reason() {
        return reason;
    }
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidationResult
package org.apache.kafka.common.security.oauthbearer;

/**
 * The result of some kind of token validation
 */
public class OAuthBearerValidationResult implements Serializable {
 
    // etc...

    /**
     * Return an instance indicating success
     *
     * @return an instance indicating success
     */
    public static OAuthBearerValidationResult newSuccess() {
        return new OAuthBearerValidationResult(true, null, null, null);
    }

    /**
     * Return a new validation failure instance
     *
     * @param failureDescription
     *            optional description of the failure
     * @return a new validation failure instance
     */
    public static OAuthBearerValidationResult newFailure(String failureDescription) {
        return newFailure(failureDescription, null, null);
    }

    /**
     * Return a new validation failure instance
     *
     * @param failureDescription
     *            optional description of the failure
     * @param failureScope
     *            optional scope to be reported with the failure
     * @param failureOpenIdConfig
     *            optional OpenID Connect configuration to be reported with the
     *            failure
     * @return a new validation failure instance
     */
    public static OAuthBearerValidationResult newFailure(String failureDescription, String failureScope,
            String failureOpenIdConfig) {
        return new OAuthBearerValidationResult(false, failureDescription, failureScope, failureOpenIdConfig);
    }
 
    private OAuthBearerValidationResult(boolean success, String failureDescription, String failureScope,
            String failureOpenIdConfig) {
        // etc...
    }
    /**
     * Return true if this instance indicates success, otherwise false
     *
     * @return true if this instance indicates success, otherwise false
     */
    public boolean success() {
        return success;
    }

    /**
     * Return the (potentially null) descriptive message for the failure
     *
     * @return the (potentially null) descriptive message for the failure
     */
    public String failureDescription() {
        return failureDescription;
    }

    /**
     * Return the (potentially null) scope to be reported with the failure
     *
     * @return the (potentially null) scope to be reported with the failure
     */
    public String failureScope() {
        return failureScope;
    }

    /**
     * Return the (potentially null) OpenID Connect configuration to be reported
     * with the failure
     *
     * @return the (potentially null) OpenID Connect configuration to be reported
     *         with the failure
     */
    public String failureOpenIdConfig() {
        return failureOpenIdConfig;
    }
 
    /**
     * Raise an exception if this instance indicates failure, otherwise do nothing
     *
     * @throws OAuthBearerIllegalTokenException
     *             if this instance indicates failure
     */
    public void throwExceptionIfFailed() throws OAuthBearerIllegalTokenException {
        if (!success())
            throw new OAuthBearerIllegalTokenException(this);
    }
}

 

Token Refresh

See Rejected Alternatives: Token Refresh

Token Refresh

We define the org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredential and org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredentialLoginModule interfaces and the org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredentialRefreshingLogin class that can refresh expiring credentials.  We represent many of the parameters that impact how the refresh algorithm operates as a single map of enum/value pairs rather than as separate methods – this helps ensure forward compatibility if/when we decide we want to add more potential parameters (having the flexibility to add more parameters without adding methods to an interface is especially important since Java 7 does not support default methods on interfaces – adding a new method breaks forward compatibnility).  We provide the org.apache.kafka.common.security.oauthbearer.refresh.RefreshConfig class to provide the methods that we did not add to any interface, and the org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredentialRefreshingLogin class will parse the properties from the JAAS config options (with appropriate defaults in case they are not explicitly specified).

 


Note that the token refresh functionality and the related classes and interfaces are not necessarily specific to OAuth Bearer Tokens, so an open question is whether this refresh-related code belongs in a sub-package of the main SASL/OAUTHBEARER one or if it should live somewhere else.  If this functionality ends up in elsewhere then perhaps RefreshConfigProp.parseValue(Object) should not throw an OAuthBearerConfigException but rather a different exception type.


 

org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredential
package org.apache.kafka.common.security.oauthbearer.refresh;
 
/**
 * A credential that expires and that can potentially be refreshed
 *
 * @see ExpiringCredentialRefreshingLogin
 */
public interface ExpiringCredential {
    /**
     * The name of the principal to which this credential applies (used only for
     * logging)
     *
     * @return the always non-null/non-empty principal name
     */
    String principalName();
    /**
     * When the credential became valid, in terms of the number of milliseconds
     * since the epoch, if known, otherwise null. An expiring credential may not
     * necessarily indicate when it was created -- just when it expires -- so we
     * need to support a null return value here.
     *
     * @return the time when the credential became valid, in terms of the number of
     *         milliseconds since the epoch, if known, otherwise null
     */
    Long startTimeMillis();
 
    /**
     * When the credential expires, in terms of the number of milliseconds since the
     * epoch. All expiring credentials by definition must indicate their expiration
     * time -- thus, unlike other methods, we do not support a null return value
     * here.
     *
     * @return the time when the credential expires, in terms of the number of
     *         milliseconds since the epoch
     */
    long expireTimeMillis();
 
    /**
     * The point after which the credential can no longer be refreshed, in terms of
     * the number of milliseconds since the epoch, if any, otherwise null. Some
     * expiring credentials can be refreshed over and over again without limit, so
     * we support a null return value here.
     *
     * @return the point after which the credential can no longer be refreshed, in
     *         terms of the number of milliseconds since the epoch, if any,
     *         otherwise null
     */
    Long absoluteLastRefreshMillis();
}
org.apache.kafka.common.security.oauthbearer.refresh.RefreshConfigProp
package org.apache.kafka.common.security.oauthbearer.refresh;

/**
 * Individual refresh-related configuration properties defining how
 * {@link ExpiringCredentialRefreshingLogin} refreshes instances of
 * {@link ExpiringCredential}. Each value has a type, optional min/max/default
 * values, an ordered list of string keys (containing the enum name and the
 * camelCase version of the name, in that order) so it can be picked out of a
 * map keyed with Strings, and the ability to parse a string or non-string value
 * associated with one of its string keys in a map.
 *
 * @see RefreshConfig
 */
public enum RefreshConfigProp {
    /**
     * Background login refresh thread will sleep until the specified window factor
     * relative to the credential's total lifetime has been reached, at which time
     * it will try to refresh the credential. The default value is 0.8 (80%).
     */
    REFRESH_WINDOW_FACTOR(Double.class, 0.5, 1.0, 0.8),
    /**
     * Amount of random jitter added to the background login refresh thread's sleep
     * time. The default value is 0.05 (5%).
     */
    REFRESH_WINDOW_JITTER(Double.class, 0.0, 0.25, 0.05),
    /**
     * The minimum time between checks by the background login refresh thread,
     * regardless of other constraints, in milliseconds. The default value is 60,000
     * (1 minute).
     */
    REFRESH_MIN_PERIOD_MILLIS(Long.class, 0L, 1000L * 60 * 15, 1000L * 60 * 1),
    /**
     * If the {@code LoginModule} and {@code SaslClient} implementations support
     * multiple simultaneous login contexts on a single {@code Subject} at the same
     * time. If true, then upon refresh, logout will only be invoked on the original
     * {@code LoginContext} after a new one successfully logs in. This can be
     * helpful if the original credential still has some lifetime left when an
     * attempt to refresh the credential fails; the client will still be able to
     * create new connections as long as the original credential remains valid.
     * Otherwise, if logout is immediately invoked prior to relogin, a relogin
     * failure leaves the client without the ability to connect until relogin does
     * in fact succeed. The default value is false.
     */
    RELOGIN_ALLOWED_BEFORE_LOGOUT(Boolean.class, Boolean.FALSE, Boolean.TRUE, Boolean.FALSE);
    private RefreshConfigProp(Class<? extends Comparable<?>> valueType, Comparable<?> minValue, Comparable<?> maxValue,
            Comparable<?> defaultValue) {
        // etc...
    }
 
    /**
     * Return the always non-null value type
     *
     * @return the always non-null value type
     */
    public Class<? extends Comparable<?>> valueType() {
        return valueType;
    }
    /**
     * Return the minimum value, if any, as an instance of the value type, otherwise
     * null
     *
     * @return the minimum value, if any, as an instance of the value type,
     *         otherwise null
     */
    public Comparable<?> minValue() {
        return minValue;
    }

    /**
     * Return the maximum value, if any, as an instance of the value type, otherwise
     * null
     *
     * @return the maximum value, if any, as an instance of the value type,
     *         otherwise null
     */
    public Comparable<?> maxValue() {
        return maxValue;
    }

    /**
     * Return the default value, if any, as an instance of the value type, otherwise
     * null
     *
     * @return the default value, if any, as an instance of the value type,
     *         otherwise null
     */
    public Comparable<?> defaultValue() {
        return defaultValue;
    }
 
    /**
     * The ordered list of string keys (containing the enum name and the camelCase
     * version of the name, in that order) so the instance can be picked out of a
     * map keyed with Strings
     * 
     * @return the always non-null/non-empty list of string keys (containing the
     *         enum name and the camelCase version of the name, in that order) that
     *         denote this instance in a {@code Map}
     */
    public List<String> stringKeys() {
        return stringKeys;
    }
 
    /**
     * Return the (potentially null) value as the value type
     *
     * @param value
     *            a (potentially null) value of either the value type or a type such
     *            that the result of invoking {@code valueOf()} on the value type
     *            class against the value results in the value converted to the
     *            value type
     * @return the (potentially null) value as the value type
     * @throws OAuthBearerConfigException
     *             if a non-null value cannot be converted to the value type while
     *             remaining consistent with any min/max constraints
     */
    public Comparable<?> parseValue(Object value) throws OAuthBearerConfigException {
        // etc...
    }
 
    // etc...
}
org.apache.kafka.common.security.oauthbearer.refresh.RefreshConfig
package org.apache.kafka.common.security.oauthbearer.refresh;
 
/**
 * Immutable refresh-related configuration for instances of
 * {@link ExpiringCredentialRefreshingLogin}. Configuration that is independent
 * of the actual credential itself and that can be defined as login module
 * options in a JAAS config should be stored here.
 */
public class RefreshConfig {
    /**
     * Default constructor with individual refresh configuration properties
     * being set to their default values
     */
    public RefreshConfig() {
        this(Collections.<String, String>emptyMap(), "");
    }

    /**
     * Constructor based on a map with keys being the String keys associated with
     * {@link RefreshConfigProp} instances and values being either Strings or
     * non-Strings. Individual refresh configuration properties that are not
     * explicitly set to a valid value on the given map will be set to their default
     * value for this instance.
     * 
     * @param configMap
     *            the mandatory (but possibly empty) configuration map upon which to
     *            build this instance
     * @see RefreshConfigProp#stringKeys()
     * @see RefreshConfigProp#parseValue(Object)
     */
    public RefreshConfig(Map<String, String> configMap) {
        this(configMap, "");
    }

    /**
     * Constructor based on a map with keys being the String keys associated with
     * {@link RefreshConfigProp} instances and values being either Strings or
     * non-Strings. Individual refresh configuration properties that are not
     * explicitly set to a valid value on the given map will be set to their default
     * value for this instance.
     *
     * @param configMap
     *            the mandatory (but possibly empty) configuration map upon which to
     *            build this instance
     * @param keyPrefix
     *            the mandatory (but potentially blank) prefix to prepend to String
     *            keys
     * @see RefreshConfigProp#stringKeys()
     * @see RefreshConfigProp#parseValue(Object)
     */
    public RefreshConfig(Map<String, String> configMap, String keyPrefix) {
        // etc...
    }
 
    /**
     * Return the (always non-null and unmodifiable) {@code Map} that resulted from
     * the parsing of the input provided at construction time
     *
     * @return the (always non-null and unmodifiable) {@code Map} that resulted from
     *         the parsing of the input provided at construction time
     */
    public Map<RefreshConfigProp, Object> refreshConfigMap() {
        return refreshConfigMap;
    }

    /**
     * Background login refresh thread will sleep until the specified window factor
     * relative to the credential's total lifetime has been reached, at which time
     * it will try to refresh the credential. The default value is 0.8 (80%).
     *
     * @return the refresh window factor
     * @see RefreshConfigProp#REFRESH_WINDOW_FACTOR
     */
    public double refreshWindowFactor() {
        return refreshWindowFactor;
    }

    /**
     * Amount of random jitter added to the background login refresh thread's sleep
     * time. The default value is 0.05 (5%).
     *
     * @return the refresh window jitter
     * @see RefreshConfigProp#REFRESH_WINDOW_JITTER
     */
    public double refreshWindowJitter() {
        return refreshWindowJitter;
    }

    /**
     * The minimum time between checks by the background login refresh thread,
     * regardless of other constraints, in milliseconds. The default value is 60,000
     * (1 minute).
     *
     * @return the minimum refresh period, in milliseconds
     * @see RefreshConfigProp#REFRESH_MIN_PERIOD_MILLIS
     */
    public long refreshMinPeriodMillis() {
        return refreshMinPeriodMillis;
    }

    /**
     * If the LoginModule and SaslClient implementations support multiple
     * simultaneous login contexts on a single Subject at the same time. If true,
     * then upon refresh, logout will only be invoked on the original LoginContext
     * after a new one successfully logs in. This can be helpful if the original
     * credential still has some lifetime left when an attempt to refresh the
     * credential fails; the client will still be able to create new connections as
     * long as the original credential remains valid. Otherwise, if logout is
     * immediately invoked prior to relogin, a relogin failure leaves the client
     * without the ability to connect until relogin does in fact succeed. The
     * default value is false.
     *
     * @return true if relogin is allowed prior to discarding an existing
     *         (presumably unexpired) token, otherwise false
     * @see RefreshConfigProp#RELOGIN_ALLOWED_BEFORE_LOGOUT
     */
    public boolean reloginAllowedBeforeLogout() {
        return reloginAllowedBeforeLogout;
    }

    // etc...
}
org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredentialLoginModule
package org.apache.kafka.common.security.oauthbearer.refresh;

import javax.security.auth.spi.LoginModule;

/**
 * An extension of the {@code LoginModule} interface that must be implemented by
 * any login module that generates an instance of {@link ExpiringCredential} to
 * be managed/refreshed by {@link ExpiringCredentialRefreshingLogin}. The
 * existence of this interface is necessary to deal with the case when there are
 * multiple enabled SASL mechanisms, one or more of which generate an expiring
 * credential, and a SASL mechanism (as opposed to authentication via SSL) is
 * used for inter-broker communication.
 * <p>
 * The following broker-side JAAS configuration helps illustrate the need for
 * this interface:
 *
 * <pre>
 * KafkaServer {
 *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
 *      // etc...;
 *      some.othersaslmechanism.OtherSaslMechanismLoginModule Optional
 *      // etc...;
 * };
 * </pre>
 *
 * The {@code LoginContext} instance will initialize both login modules and ask
 * them both to login -- that's how JAAS works -- regardless of which SASL
 * mechanism is used for inter-broker communication. It is imperative that the
 * login succeeds for the login module associated with the mechanism configured
 * for inter-broker communication; it doesn't matter if any other mechanisms
 * fail because they aren't actually being used for client-side work (they are
 * only being used for the server side of the SASL handshake, and that is
 * performed by the {@code SaslServer} instance rather than the
 * {@code LoginModule} instance). This has 2 implications:
 * <p>
 * <ol>
 * <li>The {@code CallbackHandler} instance provided to the {@code LoginContext}
 * instance (which then passes it to all of the login modules) must be the
 * correct one for the login module that must succeed; it does not have to be
 * the correct one for any others.
 * <li>The login modules that don't have to succeed (because they aren't being
 * used for inter-broker communication) should be marked {@code Optional} in
 * case they fail.
 * </ol>
 * <p>
 * This raises the critical issue of how any instance of {@link Login} can know
 * which {@code CallbackHandler} instance to instantiate. The
 * {@link ScramLoginModule} and {@link PlainLoginModule} don't use the callback
 * handler, but {@code com.sun.security.auth.module.Krb5LoginModule} does, and
 * in fact {@link LoginCallbackHandler} serves the purpose of short-circuiting
 * any request for user interaction in that case. So this issue hasn't been
 * critical in the past; it is only now becoming more important.
 * <p>
 * All the {@code Login} instance knows is the SASL mechanism enabled for
 * inter-broker communication (from the broker config) and the JAAS
 * configuration (for example, the one shown above). It cannot know which
 * {@code CallbackHandler} instance to instantiate from just that information
 * because it cannot determine from that information alone which of the login
 * modules handles the declared inter-broker communication mechanism.
 * <p>
 * We thus arrive at the need for this interface. A {@code Login} instance can
 * look at all of the declared login module classes, determine which of them
 * implements this interface, and then for each of those it can instantiate an
 * instance using the default constructor and ask for its applicable mechanisms
 * via {@link #mechanisms()}. If it finds a login module with an applicable SASL
 * mechanism matching the one being used for inter-broker communication it can
 * then ask for a {@code CallbackHandler} instance via
 * {@link #newCallbackHandler()}. Otherwise, if it doesn't find a match, it just
 * instantiates an instance of {@link LoginCallbackHandler}, which is what
 * {@link AbstractLogin#login()} currently creates and is required for the
 * initial Kerberos login attempt for short-circuit behavior as mentioned above.
 */
public interface ExpiringCredentialLoginModule extends LoginModule {
    /**
     * Return the set of SASL mechanisms this login module applies to
     *
     * @return the set of SASL mechanisms this login module applies to
     */
    Set<String> mechanisms();

    /**
     * Return a new {@code CallbackHandler} instance appropriate for this login
     * module when one of its supported mechanisms as returned by
     * {@link #mechanisms()} is the SASL mechanism for inter-broker communication.
     *
     * @return a new {@code CallbackHandler} instance appropriate for this login
     *         module when one of its supported mechanisms as returned by
     *         {@link #mechanisms()} is the SASL mechanism for inter-broker
     *         communication.
     */
    CallbackHandler newCallbackHandler();
} 
org.apache.kafka.common.security.oauthbearer.refresh.ExpiringCredentialRefreshingLogin
package org.apache.kafka.common.security.oauthbearer.refresh;

import org.apache.kafka.common.security.auth.Login;

/**
 * This class is responsible for refreshing logins for both Kafka client and
 * server when the login is a type that has a limited lifetime and will expire. The
 * credentials for the login must implement {@link ExpiringCredential}, and the
 * {@code LoginModule} must implement {@link ExpiringCredentialLoginModule}.
 */
public class ExpiringCredentialRefreshingLogin implements Login {
    // etc...
 
    public RefreshConfig refreshConfig() {
        return refreshConfig;
    }
 
    // etc...
} 

 

OAuth Bearer Tokens

Every OAuth Bearer Token is a string representing an authorization issued to the client.  There are at least two types of OAuth Bearer Tokens: an opaque token and a JSON Web Token (JWT).  Nothing can be said about the contents of an opaque token other than that the contents are only available via some third party (e.g. an authorization server).  Every JWT contains both a JSON Object Signing and Encryption (JOSE) Header and a JWT Claims Set.  There are two distinct types of JWTs: a JSON Web Signature (JWS) and a JSON Web Encryption (JWE).  There is no intent to model a JWE with the initial implementation.

 

OAuth Bearer Tokens

org.apache.kafka.common.security.oauthbearer.OAuthBearerToken
package org.apache.kafka.common.security.oauthbearer;
 /**
 * The <code>b64token</code> value as defined in
 * <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section
 * 2.1</a> along with the token's specific scope and lifetime.
 *
 * @see <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749
 *      Section 1.4</a> and
 *      <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
 *      Section 2.1</a>
 */
public interface OAuthBearerToken extends ExpiringCredential {
    /**
     * The <code>b64token</code> value as defined in
     * <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section
     * 2.1</a>
     *
     * @return <code>b64token</code> value as defined in
     *         <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
     *         Section 2.1</a>
     */
    String value();


    /**
     * The token's scope of access, as per
     * <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749 Section
     * 1.4</a>
     *
     * @return the token's (always non-null but potentially empty) scope of access,
     *         as per <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC
     *         6749 Section 1.4</a>. Note that all values in the returned set will
     *         be trimmed of preceding and trailing whitespace, and the result will
     *         never contain the empty string.
     * @throws IOException
     *             if one or more required networked resources (e.g. to re-hydrate
     *             an opaque token) is unavailable.
     * @throws OAuthBearerIllegalTokenException
     *             if there is something fundamentally wrong with the token (if it
     *             is malformed, for example)
     */
    Set<String> scope() throws IOException, OAuthBearerIllegalTokenException;

    /**
     * The token's lifetime, expressed as the number of milliseconds since the
     * epoch, as per <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC
     * 6749 Section 1.4</a>
     *
     * @return the token'slifetime, expressed as the number of milliseconds since
     *         the epoch, as per
     *         <a href="https://tools.ietf.org/html/rfc6749#section-1.4">RFC 6749
     *         Section 1.4</a>.
     * @throws IOException
     *             if one or more required networked resources (e.g. to re-hydrate
     *             an opaque token) is unavailable.
     * @throws OAuthBearerIllegalTokenException
     *             if there is something fundamentally wrong with the token (if it
     *             is malformed, for example)
     */
    long lifetime() throws IOException, OAuthBearerIllegalTokenException;
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerJwt
package org.apache.kafka.common.security.oauthbearer;

/**
 * A JSON Web Token, which may either be a JWS or a JWE. Neither signature
 * validation nor decryption are performed here.
 *
 * @see <a href="https://tools.ietf.org/html/rfc7519">RFC 7519</a>
 */
public abstract class OAuthBearerJwt implements OAuthBearerToken {
    /**
     * Constructor with the principal taken from the '{@code sub}' claim (if any)
     * and no scope claim name
     *
     * @param compactSerialization
     *            the compact serialization to parse as a JWT
     * @throws OAuthBearerIllegalTokenException
     *             if compactSerialization cannot be either a JWS or JWE (due to
     *             there not being either 3 or 5 dot-separated values; or the JWT
     *             header either not being a valid Base64 URL-encoded value or not
     *             valid JSON after decoding)
     */
    public OAuthBearerJwt(String compactSerialization) throws OAuthBearerIllegalTokenException {
        this(compactSerialization, null);
    }

    /**
     * Constructor with the given principal and scope claim names
     *
     * @param compactSerialization
     *            the compact serialization to parse as a JWT
     * @param principalClaimName
     *            the optional principal claim name
     * @param scopeClaimName
     *            the optional scope claim name
     * @throws OAuthBearerIllegalTokenException
     *             if compactSerialization cannot be either a JWS or JWE (due to
     *             there not being either 3 or 5 dot-separated values; or the JWT
     *             header either not being a valid Base64 URL-encoded value or not
     *             valid JSON after decoding)
     */
    public OAuthBearerJwt(String compactSerialization, String principalClaimName, String scopeClaimName)
            throws OAuthBearerIllegalTokenException {
        // etc...
    }
 
    /**
     * Return the 3 or 5 dot-separated sections of the JWT compact serialization
     * 
     * @return the 3 or 5 dot-separated sections of the JWT compact serialization
     */
    public List<String> splits() {
        return splits;
    }

    /**
     * Return the JOSE Header as a {@code Map}
     *
     * @return the JOSE header
     */
    public Map<String, Object> header() {
        return header;
    }

    /**
     * Return the JWT Claim Set as a {@code Map}
     *
     * @return the (always non-null but possibly empty) claims
     */
    public abstract Map<String, Object> claims();

    /**
     * Return the scope claim name, if any, otherwise null
     *
     * @return the scope claim name, if any, otherwise null
     */
    public String scopeClaimName() {
        return scopeClaimName;
    }

    /**
     * Indicate if the claim exists and is the given type
     *
     * @param claimName
     *            the mandatory JWT claim name
     * @param type
     *            the mandatory type, which should either be String.class,
     *            Number.class, or List.class
     * @return true if the claim exists and is the given type, otherwise false
     */
    public boolean isClaimType(String claimName, Class<?> type) {
        // etc...
    }

    /**
     * Extract a claim of the given type
     *
     * @param claimName
     *            the mandatory JWT claim name
     * @param type
     *            the mandatory type, which must either be String.class,
     *            Number.class, or List.class
     * @return the claim if it exists, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim exists but is not the given type
     */
    public <T> T getClaim(String claimName, Class<T> type) throws OAuthBearerIllegalTokenException {
        // etc...
    }

    /**
     * Extract a claim that could be either a String or a String List as a String
     * List (if it was a String it will be returned as a list of size 1).
     *
     * @param claimName
     *            the mandatory JWT claim name
     * @return the claim in the form of a String List, if it exists, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim exists but is neither a String nor a List
     */
    public List<String> getListClaimFromStringOrList(String claimName) throws OAuthBearerIllegalTokenException {
        // etc...
    }
 
    /**
     * Extract a claim in its raw form
     *
     * @param claimName
     *            the mandatory JWT claim name
     * @return the raw claim value, if it exists, otherwise null
     */
    public Object getRawClaim(String claimName) {
        return claims().get(Objects.requireNonNull(claimName));
    }

    /**
     * Return the
     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.3">Audience</a>
     * claim as a String List
     *
     * @return the <a href=
     *         "https://tools.ietf.org/html/rfc7519#section-4.1.3">Audience</a>
     *         claim as a String List if available, otherwise null. An Audience
     *         claim that is a String will be returned as a List of size 1.
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public List<String> audience() throws OAuthBearerIllegalTokenException {
        return getListClaimFromStringOrList(JwtClaim.AUDIENCE.claimName());
    }

    /**
     * Return the
     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.4">Expiration
     * Time</a> claim
     *
     * @return the <a href=
     *         "https://tools.ietf.org/html/rfc7519#section-4.1.4">Expiration
     *         Time</a> claim if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public Number expirationTime() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.EXPIRATION_TIME.claimName(), Number.class);
    }

    /**
     * Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.6">Issued
     * At</a> claim
     *
     * @return the
     *         <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.6">Issued
     *         At</a> claim if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public Number issuedAt() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.ISSUED_AT.claimName(), Number.class);
    }
 
    /**
     * Return the
     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.1">Issuer</a> claim
     *
     * @return the <a href=
     *         "https://tools.ietf.org/html/rfc7519#section-4.1.1">Issuer</a> claim
     *         if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public String issuer() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.ISSUER.claimName(), String.class);
    }

    /**
     * Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.7">JWT
     * ID</a> claim
     *
     * @return the <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.7">JWT
     *         ID</a> claim if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public String jwtId() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.JWT_ID.claimName(), String.class);
    }

    /**
     * Return the <a href="https://tools.ietf.org/html/rfc7519#section-4.1.5">Not
     * Before</a> claim
     *
     * @return the <a href= "https://tools.ietf.org/html/rfc7519#section-4.1.5">Not
     *         Before</a> claim if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public Number notBefore() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.NOT_BEFORE.claimName(), Number.class);
    }

    /**
     * Return the
     * <a href="https://tools.ietf.org/html/rfc7519#section-4.1.2">Subject</a> claim
     *
     * @return the <a href=
     *         "https://tools.ietf.org/html/rfc7519#section-4.1.2">Subject</a> claim
     *         if available, otherwise null
     * @throws OAuthBearerIllegalTokenException
     *             if the claim value is the incorrect type
     */
    public String subject() throws OAuthBearerIllegalTokenException {
        return getClaim(JwtClaim.SUBJECT.claimName(), String.class);
    }
 
    /**
     * Decode the given Base64URL-encoded value, parse the resulting JSON as a JSON
     * object, and return the map of member names to their values (each value being
     * represented as either a String, a Number, or a List of Strings).
     *
     * @param split
     *            the value to decode and parse
     * @return the map of JSON member names to their String, Number, or String List
     *         value
     * @throws OAuthBearerIllegalTokenException
     *             if the given Base64URL-encoded value cannot be decoded or parsed
     */
    public static Map<String, Object> toMap(String split) throws OAuthBearerIllegalTokenException {
        // etc...
    }
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerJws
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * @see <a href="https://tools.ietf.org/html/rfc7515">RFC 7515</a>. Note that
 *      digital signature validation is not performed.
 */
public class OAuthBearerJws extends OAuthBearerJwt {
    /**
     * Constructor with no scope claim name
     *
     * @param compactSerialization
     *            the compact serialization to parse as a JWT
     * @throws OAuthBearerIllegalTokenException
     *             if the compact serialization is not a valid JWS (meaning it did
     *             not have 3 dot-separated Base64URL sections; or the header or
     *             claims either are not valid Base 64 URL encoded values or are not
     *             JSON after decoding; or the existence or absence of a digital
     *             signature is inconsistent with the mandatory 'alg' header value)
     */
    public OAuthBearerJws(String compactSerialization) throws OAuthBearerIllegalTokenException {
        this(compactSerialization, JwtClaim.SUBJECT.claimName(), null);
    }

    /**
     * Constructor with the given principal and scope claim names
     *
     * @param compactSerialization
     *            the compact serialization to parse as a JWS
     * @param principalClaimName
     *            the optional principal claim name
     * @param scopeClaimName
     *            the optional scope claim name
     * @throws OAuthBearerIllegalTokenException
     *             if compactSerialization cannot be either a JWS or JWE (due to
     *             there not being either 3 or 5 dot-separated values; or the JWT
     *             header either not being a valid Base64 URL-encoded value or not
     *             valid JSON after decoding)
     */
    public OAuthBearerJws(String compactSerialization, String principalClaimName, String scopeClaimName)
            throws OAuthBearerIllegalTokenException {
        // etc...
    }
}
org.apache.kafka.common.security.oauthbearer.JwtHeaderParameter
package org.apache.kafka.common.security.oauthbearer;

/**
 * JSON Web Signature and Encryption Header Parameters
 *
 * @see <a href="https://www.iana.org/assignments/jose/jose.xhtml">JSON Web
 *      Signature and Encryption Header Parameters</a>
 */
public enum JwtHeaderParameter {
    ALGORITHM("alg"),
    JWK_SET_URL("jku"),
    JSON_WEB_KEY("jwk"),
    KEY_ID("kid"),
    X_509_URL("x5u"),
    X_509_CERTIFICATE_CHAIN("x5c"),
    X_509_CERTIFICATE_SHA1_THUMBPRINT("x5t"),
    X_509_CERTIFICATE_SHA256_THUMBPRINT("x5t#S256"),
    TYPE("typ"),
    CONTENT_TYPE("cty"),
    CRITICAL("crit"),
    ENCRYPTION_ALGORITHM("enc"),
    COMPRESSION_ALGORITHM("zip"),
    ISSUER("iss"),
    SUBJECT("sub"),
    AUDIENCE("aud");

    private String headerParameterName;

    private JwtHeaderParameter(String headerParameterName) {
        this.headerParameterName = headerParameterName;
    }

    String headerParameterName() {
        return headerParameterName;
    }
}
org.apache.kafka.common.security.oauthbearer.JwtClaim
package org.apache.kafka.common.security.oauthbearer;

/**
 * JSON Web Token Claims
 *
 * @see <a href="https://www.iana.org/assignments/jwt/jwt.xhtml">JSON Web Token
 *      Claims</a>
 */
public enum JwtClaim {
    ISSUER("iss"),
    SUBJECT("sub"),
    AUDIENCE("aud"),
    EXPIRATION_TIME("exp"),
    NOT_BEFORE("nbf"),
    ISSUED_AT("iat"),
    JWT_ID("jti");

    private String claimName;

    private JwtClaim(String claimName) {
        this.claimName = claimName;
    }

    public String claimName() {
        return claimName;
    }
}

Substitutable Module Options

See Rejected Alternatives: Substitutable Module Options

Substitutable Module Options

The mechanics of token retrieval and token validation (both described later) will differ across OAuth deployments.  For example, for token retrieval, each deployment will inject credentials to the token endpoint differently, and the parameters sent to the token endpoint may also differ.  Token validation will also differ because OAuth supports multiple methods of validation.  Configuration of the retrieval and validation mechanisms, which are done via JAAS configuration, must therefore be flexible.  In particular, while we collaboratively use instances implementing javax.security.auth.Callback and javax.security.auth.CallbackHandler to retrieve information, we can't know in advance what information will be required in order to retrieve or validate a token; a username and password might – or might not – be required, for example, and retrieval and validation will likely require much more as well.  We also don't know where this information will come from: a file?  an environment variable?  a database?  Somewhere else?  We need implementations of the Callback and CallbackHandler interfaces that are just as flexible as we need the JAAS configuration to be.

JAAS config options (each of which is an individual element of the space-separated ModuleOptions list), in combination with appropriate Callback and CallbackHandler implementations, will support arbitrarily complex retrieval and substitution inside the option value.  The JAAS Configuration spec already supports system property substitution via the ${system.property} syntax; we will implement support for arbitrarily complex substitutions.  For example, the following would support substitution of the contents of a file (which is a common way to store secrets, especially within containers) into the option value:

thePassword="$[file|redact|notBlank|defaultOption=fileDefault|=/path/to/secrets/the_secret]"

There are several features here that deserve comment:

  • The "$[" and "]" delimiters are the signal to perform a substitution (we can't use "${" and "}" because that is already defined by the JAAS Configuration spec to mean system property substitution).  Note that we will not allow substitution within a substitution, and in fact it is not needed as described below.  We will also support "$[[" and "]]" as delimiters (all the way up to 5 brackets, actually) to allow  "$[" and "]" to appear in text without causing substitution. 
  • Immediately inside the opening delimiter is the type of substitution followed by any optional modifiers we wish to apply.  In the above, we identify this as a file substitution and we indicate three modifiers: the resulting value should never be logged (i.e. store it such that its value wil be redacted when logged); the contents of the file must not be blank (meaning it must not be empty or only contain whitespace); if the file does not exist or its contents are blank then use the value of the "fileDefault" option (which could itself have substitutions).  It is an error if any constraints implied by the modifiers are violated.  Any punctuation character except the equal sign (=) can be used to delimit the modifiers.
  • Immediately after the type of substitution and any optional modifiers is an equal sign ("=") followed by the value (which in the case of the "file" type is interpreted as the filename to read); then ultimately the closing delimiter appears.

This scheme is flexible and powerful; it handles most cases, but it remains relatively easy to create and read.  Importantly, the types of replacements can be expanded in the future without breaking compatibility.  See the Javadoc for the org.apache.kafka.common.security.oauthbearer.smo.SubstitutableModuleOptionsCallbackHandler class for more details.

How would we retrieve the above value?  We define the following representation of the module options and their substitution state along with associated Callback and CallbackHandler implementations and the org.apache.kafka.common.security.oauthbearer.smo.RedactableObject classs for holding the results.  We also define several built-in substitutions and a mechanism for adding new ones.

org.apache.kafka.common.security.oauthbearer.smo.SubstitutableModuleOptions
package org.apache.kafka.common.security.oauthbearer.smo;

/**
 * Holds state regarding which {@code LoginModule} <a href=
 * "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
 * options</a> have been substituted -- both completely (via
 * {@link #substitutionResults}) as well as in-progress (via
 * {@link #withOptionEvaluationCausingCircularReference(String)}). Instances of
 * this class are thread-safe.
 *
 * @see SubstitutableModuleOptionsCallbackHandler
 * @see SubstitutableModuleOptionsCallback
 * @see RedactableObject
 */
public class SubstitutableModuleOptions {
    private final Map<String, String> moduleOptionsMap;
    private final ConcurrentHashMap<String, RedactableObject> substitutionResults;
    private final SubstitutableModuleOptions delegate;
    private final String optionCausingCircularReference;
 
    /**
     * Constructor
     *
     * @param moduleOptionsMap
     *            the mandatory map representation of the <a href=
     *            "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
     *            options</a>
     */
    public SubstitutableModuleOptions(Map<String, String> moduleOptionsMap) {
        this.moduleOptionsMap = Collections.unmodifiableMap(Objects.requireNonNull(moduleOptionsMap));
        this.substitutionResults = new ConcurrentHashMap<>();
        this.delegate = null;
        this.optionCausingCircularReference = null;
    }
 
    /**
     * Constructor to create a new instance based on the given instance except that
     * the given option is marked as causing a circular reference if an attempt is
     * made to evaluate it
     *
     * @param delegate
     *            the mandatory instance from which the view will be created
     * @param optionCausingCircularReference
     *            the mandatory option that causes a circular reference if it is
     *            evaluated
     * @see #withOptionEvaluationCausingCircularReference(String)
     * @see #optionEvaluationCausesCircularReference(String)
     */
    public SubstitutableModuleOptions(SubstitutableModuleOptions delegate, String optionCausingCircularReference) {
        this.moduleOptionsMap = delegate.moduleOptionsMap;
        this.substitutionResults = delegate.substitutionResults;
        this.delegate = delegate;
        this.optionCausingCircularReference = optionCausingCircularReference;
    }
 
    /**
     * Return (an unmodifiable copy of) the original module options map provided
     * during construction
     *
     * @return (an unmodifiable copy of) the original module options map provided
     *         during construction
     */
    public Map<String, String> moduleOptionsMap() {
        return moduleOptionsMap;
    }
 
    /**
     * Convenience method to indicate if a particular option name exists, returning
     * true if it does, otherwise false
     *
     * @param optionName
     *            the mandatory option name
     * @return true if the indicated option exists, otherwise false
     */
    public boolean optionExists(String optionName) {
        return moduleOptionsMap.containsKey(Objects.requireNonNull(optionName));
    }
 
    /**
     * Create and return a new view of this instance where in that context the given
     * option is in the process of being evaluated such that an attempt to evaluate
     * its substitution value again within the same context would cause a circular
     * reference
     *
     * @param optionName
     *            the mandatory option name
     * @return a new view of this instance where in that context the given option is
     *         in the process of being evaluated such that an attempt to evaluate
     *         its substitution value again within the same context would cause a
     *         circular reference
     * @see #optionEvaluationCausesCircularReference(String)
     */
    public SubstitutableModuleOptions withOptionEvaluationCausingCircularReference(String optionName) {
        return new SubstitutableModuleOptions(this, Objects.requireNonNull(optionName));
    }
 
    /**
     * Indicate if, in the context of this instance's view, the given option is in
     * the process of being evaluated; return true if an attempt to evaluate the
     * option's substitution value would cause a circular reference, otherwise false
     *
     * @param optionName
     *            the mandatory option name
     * @return true if an attempt to evaluate the option's substitution value would
     *         cause a circular reference, otherwise false
     * @see #withOptionEvaluationCausingCircularReference(String)
     */
    public boolean optionEvaluationCausesCircularReference(String optionName) {
        return Objects.requireNonNull(optionName).equals(optionCausingCircularReference)
                || (delegate != null && delegate.optionEvaluationCausesCircularReference(optionName));
    }
 
    /**
     * Return an unmodifiable map identifying which module options have been
     * processed for substitution and the result (if any). A module option is
     * guaranteed to have been processed for substitution and its name will appear
     * as a key in the returned map only after
     * {@link #setSubstitutionResult(String, RedactableObject)} is called.
     *
     * @return an unmodifiable map identifying which module options have been
     *         processed for substitution and the result (if any)
     */
    public Map<String, RedactableObject> substitutionResults() {
        return Collections.unmodifiableMap(substitutionResults);
    }
 
    /**
     * Identify that the option with the given name has had substitution performed
     * for it yielding the given result. This method is idempotent; invoking it with
     * a substitution result equal to the current substitution result (as defined by
     * {@code Object.equals()}) has no effect. The substitution result for an option
     * cannot be changed (again, as defined by {@code Object.equals()}) once it has
     * been set; an attempt to do so will raise an exception.
     *
     * @param optionName
     *            the mandatory option name, which must exist in the map returned by
     *            {@link #moduleOptionsMap()}
     * @param substitutionResult
     *            the mandatory substitution result to set
     */
    public void setSubstitutionResult(String optionName, RedactableObject substitutionResult) {
        if (!moduleOptionsMap.containsKey(Objects.requireNonNull(optionName))) {
            throw new IllegalArgumentException(String.format("Unknown module option name: %s", optionName));
        }
        RedactableObject priorSubstitutionResult = substitutionResults.putIfAbsent(optionName,
                Objects.requireNonNull(substitutionResult));
        if (priorSubstitutionResult != null && !priorSubstitutionResult.equals(substitutionResult))
            throw new IllegalArgumentException(String
                    .format("Cannot change the substitution result for the module option with name '%s'", optionName));
    }
}
org.apache.kafka.common.security.oauthbearer.smo.SubstitutableModuleOptionsCallback
package org.apache.kafka.common.security.oauthbearer.smo;

/**
 * A {@code Callback} related to introspection requests against a JAAS
 * configuration
 *
 * @see SubstitutableModuleOptionsCallbackHandler
 * @see SubstitutableModuleOptions
 */
public class SubstitutableModuleOptionsCallback implements Callback {
    private final SubstitutableModuleOptions substitutableModuleOptions;
    private final String optionName;
    private final boolean optionRequiredToExist;

    /**
     * Constructor
     *
     * @param substitutableModuleOptions
     *            the mandatory substitutable module options
     * @param optionName
     *            the requested option
     * @param optionRequiredToExist
     *            if true then the requested option is required to exist
     */
    public SubstitutableModuleOptionsCallback(SubstitutableModuleOptions substitutableModuleOptions, String optionName,
            boolean optionRequiredToExist) {
        this.substitutableModuleOptions = Objects.requireNonNull(substitutableModuleOptions);
        this.optionName = Objects.requireNonNull(optionName);
        this.optionRequiredToExist = optionRequiredToExist;
    }
 
    /**
     * Return the substitutable module options provided at construction time
     *
     * @return the substitutable module options provided at construction time
     */
    public SubstitutableModuleOptions substitutableModuleOptions() {
        return substitutableModuleOptions;
    }

    /**
     * Return the option name provided at construction time
     *
     * @return the option name provided at construction time
     */
    public String optionName() {
        return optionName;
    }
 
    /**
     * Return true if the the requested option is required to exist, otherwise false
     * 
     * @return true if the the requested option is required to exist, otherwise
     *         false
     */
    public boolean optionRequiredToExist() {
        return optionRequiredToExist;
    }
 
    /**
     * Identify that the option identified by {@link #optionName()}, on the instance
     * returned by {@link #substitutableModuleOptions()}, has had substitution
     * performed for it yielding the given result. This method is idempotent;
     * invoking it with a substitution result equal to the option's current
     * substitution result (as defined by {@code Object.equals()}) has no effect.
     * The substitution result for an option cannot be changed (again, as defined by
     * {@code Object.equals()}) once it has been set; an attempt to do so will raise
     * an exception.
     *
     * @param substitutionResult
     *            the mandatory substitution result to set
     */
    public void setSubstitutionResult(RedactableObject substitutionResult) {
        substitutableModuleOptions.setSubstitutionResult(optionName, Objects.requireNonNull(substitutionResult));
    }

    /**
     * Return the substitution result, if any has been set, otherwise null. Note
     * that the result may not have been set via a call to
     * {@link #setSubstitutionResult(RedactableObject)}; it is possible that the
     * result was already set prior to construction of this instance, in which case
     * that result will be returned here.
     *
     * @return the substitution result, if any has been set, otherwise null
     */
    public RedactableObject substitutionResult() {
        return substitutableModuleOptions.substitutionResults().get(optionName);
    } 
}
org.apache.kafka.common.security.oauthbearer.smo.SubstitutableModuleOptionsCallbackHandler
package org.apache.kafka.common.security.oauthbearer.smo;

/**
 * A {@code CallbackHandler} that handles introspection requests against a JAAS
 * configuration and supports substitution into option values via delimited text
 * of the following form:
 *
 * <pre>
 * &lt;OPENING_DELIMITER&gt;&lt;TYPE&gt;&lt;OPTIONAL_MODIFIERS&gt;=&lt;OPTIONAL_VALUE&gt;&lt;CLOSING_DELIMITER&gt;
 * </pre>
 *
 * Where the above elements are defined as follows:
 *
 * <pre>
 * OPENING_DELIMITER: $[, $[[, $[[[, $[[[[, or $[[[[[
 * CLLOSING_DELIMITER: ],  ]],  ]]],  ]]]], or  ]]]]] (number of brackets must match)
 *
 * TYPE: everything up to (but not including) the first punctuation character
 *
 * OPTIONAL_MODIFIERS: the optional section immediately after the TYPE, starting with any
 *                     punctuation character except for the equal sign (=), and ending
 *                     with the same punctuation character followed immediately by an
 *                     equal sign. The same punctuation character delimits individual
 *                     modifiers, which come in two flavors: flags, which do not contain
 *                     an equal sign, and name=value arguments, which do.
 *
 * OPTIONAL_VALUE: the optional section immediately after the modifier section-delimiting
 *                 punctuation character (if any) and the equal sign (=). It is not trimmed.
 * </pre>
 *
 * For example:
 *
 * <pre>
 * $[envvar=THE_ENV_VAR]
 * $[envvar/notBlank/redact/=THE_ENV_VAR]
 * $[envvar/defaultValue = theDefaultValue/=THE_ENV_VAR]
 * $[envvar/defaultOption = theOptionName/=THE_ENV_VAR]
 * $[file|redact|notBlank|=/the/path/to/the/file]
 * </pre>
 *
 * Working left to right, once the delimiter is defined for a module option (for
 * example, {@code $[} and {@code ]}), only that delimiter is recognized for the
 * rest of that option (and it is always recognized as meaning substitution for
 * the rest of that module option). A special "empty" substitution does nothing
 * except, when it appears to the left of every other occurrence of matching
 * delimiters, it serves to force the delimiter for that module option to the
 * one indicated . For example, to force the delimiter to {@code $[[} and
 * {@code ]]} (and prevent {@code $[} and {@code ]} from causing substitution)
 * for a module option:
 *
 * <pre>
 * optionName = "$[[]]These $[ and ] delimiters do not cause substitution"
 * </pre>
 *
 * The following substitutions are supported, though it is straightforward to
 * add others (see below):
 *
 * <ul>
 * <li>{@code envvar}: substitute the value of an environment variable</li>
 * <li>{@code sysprop}: substitute the value of a system property</li>
 * <li>{@code file}: substitute the contents of a file</li>
 * <li>{@code option}: substitute the contents of another option</li>
 * </ul>
 *
 * The above substitutions support the following modifiers:
 *
 * <ul>
 * <li>{@code redact}: prevent values from being logged</li>
 * <li>{@code notEmpty}: the value must not be empty</li>
 * <li>{@code notBlank}: the value must not be blank (i.e. consisting only of
 * whitespace); implies {@code notEmpty}.</li>
 * <li>{@code defaultValue=<value>}: substitute the given literal value if the
 * substitution cannot otherwise be made (either because the value does not
 * exist or the determined value was disallowed because it was empty or blank).
 * The substituted value must satisfy any modifiers that act as
 * constraints.</li>
 * <li>{@code defaultOption=<optionName>}: substitute the value of the indicated
 * option if the substitution cannot otherwise be made (either because the value
 * does not exist or the determined value was disallowed because it was empty or
 * blank). The substituted value must satisfy any modifiers that act as
 * constraints.</li>
 * <li>{@code fromOption}: provides a level of indirection so that the value,
 * instead of always being literally specified (i.e. read this file, or this
 * environment variable), can be determined via some other option. This allows,
 * for example, the filename, system property name, etc. to be generated
 * potentially from multiple substitutions concatenated together.</li>
 * </ul>
 *
 * To add new substitutions simply define a module option of the following form:
 *
 * <pre>
 * typeModuleOptionSubstituter = "fully.qualified.class.name"
 * </pre>
 *
 * For example:
 *
 * <pre>
 * fooModuleOptionSubstituter = "org.example.FooSubstituter"
 * </pre>
 *
 * The indicated class must implement {@link ModuleOptionSubstituter}, and you
 * can invoke the substitution like this:
 *
 * <pre>
 * $[foo/optional/modifiers/=optionalValue]
 * </pre>
 *
 * @see SubstitutableModuleOptionsCallback
 * @see SubstitutableModuleOptions
 * @see ModuleOptionSubstituter
 */
public class SubstitutableModuleOptionsCallbackHandler implements AuthenticateCallbackHandler {
    // etc...
 
    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        for (Callback callback : callbacks) {
            if (callback instanceof SubstitutableModuleOptionsCallback) {
                SubstitutableModuleOptionsCallback substitutableModuleOptionsCallback = (SubstitutableModuleOptionsCallback) callback;
                RedactableObject substitutionResult = getSubstitutionResult(
                        substitutableModuleOptionsCallback.substitutableModuleOptions(),
                        substitutableModuleOptionsCallback.optionName(),
                        substitutableModuleOptionsCallback.optionRequiredToExist(), forceDebugLogForTesting);
                if (substitutionObject != null)
                    // depend on idempotence here because the substitution object has already been
                    // set on the underlying SubstitutableModuleOptions instance
                    substitutableModuleOptionsCallback.setSubstitutionResult(substitutionObject);
            } else
                throw new UnsupportedCallbackException(callback,
                        String.format("Unrecognized Callback type: %s", callback.getClass().getName()));
        }
    }
 
    /**
     * Perform substitution without using callbacks and a callback handler.
     *
     * @param substitutableModuleOptions
     *            the mandatory substitutable module options to query
     * @param optionName
     *            the mandatory requested option name
     * @param optionRequiredToExist
     *            if true then the requested option is required to exist
     * @return the given option's substitution result, after any required
     *         substitution is applied, or null if the option does not exist and it
     *         was not required to exist
     * @throws IOException
     *             if a required substitution cannot be performed, including if the
     *             given (or any other) required option does not exist
     */
    public static RedactableObject getSubstitutionResult(SubstitutableModuleOptions substitutableModuleOptions,
            String optionName, boolean optionRequiredToExist) throws IOException {
        // etc...
    }
    /**
     * Return the server configuration provided during
     * {@link #configure(Map, String, List)}, if any, otherwise null
     *
     * @return the server configuration provided during
     *         {@link #configure(Map, String, List)}, if any, otherwise null
     */
    public Map<String, ?> serverConfig() {
        return serverConfig;
    }

    /**
     * Return the SASL mechanism provided during
     * {@link #configure(Map, String, List)}, if any, otherwise null
     *
     * @return the SASL mechanism provided during
     *         {@link #configure(Map, String, List)}, if any, otherwise null
     */
    public String mechanism() {
        return mechanism;
    }

    /**
     * Return the JAAS login module configurations provided during
     * {@link #configure(Map, String, List)}, if any, otherwise null
     *
     * @return the JAAS login module configurations provided during
     *         {@link #configure(Map, String, List)}, if any, otherwise null
     */
    public List<AppConfigurationEntry> jaasConfigEntries() {
        return jaasConfigEntries;
    }
 
    // etc...
}
org.apache.kafka.common.security.oauthbearer.smo.RedactableObject
package org.apache.kafka.common.security.oauthbearer.smo;

/**
 * An object whose text value can be redacted
 */
public class RedactableObject {
    static final String REDACTED = "[redacted]";
    private final Object object;
    private final boolean redacted;

    /**
     * Constructor
     *
     * @param object
     *            the mandatory object
     * @param redact
     *            when true the object's value will be redacted in
     *            {@link #redactedText()}
     */
    public RedactableObject(Object object, boolean redact) {
        this.object = Objects.requireNonNull(object);
        this.redacted = redact;
    }


    /**
     * Return the (always non-null) underlying object provided during instance
     * construction
     *
     * @return the (always non-null) underlying object provided during instance
     *         construction
     */
    public Object object() {
        return object;
    }
 
    /**
     * Return true if this instance contains information that will be redacted when
     * {@link #redactedText()} is invoked, otherwise false
     *
     * @return true if this instance contains information that will be redacted when
     *         {@link #redactedText()} is invoked, otherwise false
     */
    public boolean isRedacted() {
        return redacted;
    }

    /**
     * Return the redacted text for this instance, if redaction is required,
     * otherwise return the {@link #value()}
     *
     * @return the redacted text for this instance, if redaction is required,
     *         otherwise return the {@link #value()}
     */
    public String redactedText() {
        return redacted ? REDACTED : value();
    }

    /**
     * Return the {@code String} value of this instance, including information that
     * would otherwise be redacted
     *
     * @return the {@code String} value of this instance, including information that
     *         would otherwise be redacted
     */
    public String value() {
        if (object instanceof String)
            return (String) object;
        throw new IllegalStateException(
                String.format("Unknown substitution result object type: %s", object.getClass().getName()));
    }
 
    /**
     * Return true if this result is considered to be empty, otherwise false
     *
     * @return true if this result is considered to be empty, otherwise false
     */
    public boolean isEmpty() {
        return value().isEmpty();
    }

    /**
     * Return true if this result is considered to be blank (containing at most just
     * whitespace), otherwise false
     *
     * @return true if this result is considered to be blank (containing at most
     *         just whitespace), otherwise false
     */
    public boolean isBlank() {
        return value().trim().isEmpty();
    }

    /**
     * Return this instance if it is redacted according to {@link #isRedacted()},
     * otherwise return a new, redacted instance with the same underlying object
     *
     * @return this instance if it is redacted according to {@link #isRedacted()},
     *         otherwise return a new, redacted instance with the same underlying
     *         object
     */
    public RedactableObject redactedVersion() {
        return redacted ? this : new RedactableObject(object, true);
    }
 
    @Override
    public String toString() {
        // be sure to redact information as required
        return redactedText();
    }


    @Override
    public boolean equals(Object obj) {
        if (!(obj instanceof RedactableObject))
            return false;
        RedactableObject other = (RedactableObject) obj;
        /*
         * Note that differences in redaction cause inequality
         */
        return redacted == other.redacted && object.equals(other.object);
    }


    @Override
    public int hashCode() {
        return object.hashCode();
    }
} 
org.apache.kafka.common.security.oauthbearer.smo.ModuleOptionSubstituter
package org.apache.kafka.common.security.oauthbearer.smo;

/**
 * The interface that pluggable module option substituter classes must
 * implement.
 *
 * @see SubstitutableModuleOptionsCallbackHandler
 */
public interface ModuleOptionSubstituter {
    /**
     * Perform the substitution of the given type on the given options using the
     * given modifiers and value
     *
     * @param type
     *            the (always non-null) type of substitution to perform
     * @param modifiers
     *            the (always non-null but potentially empty) modifiers to apply, if
     *            any. They are presented exactly as they appear in the
     *            configuration, with no whitespace trimming applied.
     * @param value
     *            the always non-null (but potentially empty) value
     * @param substitutableModuleOptions
     *            the module options and their current substitution state
     * @return the (always non-null) result of performing the substitution
     * @throws IOException
     *             if the substitution cannot be performed
     */
    RedactableObject doSubstitution(String type, List<String> modifiers, String value,
            SubstitutableModuleOptions substitutableModuleOptions) throws IOException;
}
org.apache.kafka.common.security.oauthbearer.smo.SubstituterHelper
package org.apache.kafka.common.security.oauthbearer.smo;

/**
 * A template {@code ModuleOptionSubstituter} that handles the following
 * modifiers:
 * <ul>
 * <li>{@code redact} -- when enabled, values are stored such that they are
 * prevented from being logged</li>
 * <li>{@code notBlank} -- when enabled, blank (only whitespace) or non-existent
 * values are replaced by default values. Implies {@code notEmpty}.</li>
 * <li>{@code notEmpty} -- when enabled, either explicitly or via
 * {@code notBlank}, empty {@code ""} or non-existent values are replaced by
 * default values.</li>
 * <li>{@code fromOption} -- when enabled, provides a level of indirection such
 * that the option's value is taken as an option name from which the actual
 * value will be retrieved</li>
 * <li>{@code defaultValue=<value>} -- when enabled, the provided value is used
 * as a default value in case the requested value either does not exist or is
 * disallowed via {@code notBlank} or {@code notEmpty}</li>
 * <li>{@code defaultOption=<value>} -- when enabled, the indicated option is
 * evaluated as a default value in case the requested value either does not
 * exist or is disallowed via {@code notBlank} or {@code notEmpty}</li>
 * </ul>
 *
 * Flags (modifiers without an equal sign) are trimmed, so "{@code redact}" and
 * "{@code  redact }" are recognized as being the same. Arguments (modifiers
 * with an equal sign) have their name trimmed but not their value, so
 * "{@code name=value}" and "{@code  name = value }" are both recognized as
 * setting the {@code name} argument (though their values do not match due to
 * whitespace differences).
 * <p>
 * It is an error to set the same named argument multiple times (even if the
 * values are the same). Redundantly specifying the same flag is acceptable.
 * <p>
 * Flags and arguments are presented to the substitution's implementation via
 * the
 * {@link #retrieveResult(String, String, boolean, SubstitutableModuleOptions, Set, Map)}
 * method.
 */
public abstract class SubstituterHelper implements ModuleOptionSubstituter {
    /**
     * Retrieve the substitution result associated with the given key, if any,
     * otherwise null
     *
     * @param type
     *            the (always non-null) type of substitution to perform
     * @param key
     *            the required key
     * @param redact
     *            if the result must be redacted regardless of any information to
     *            the contrary
     * @param substitutableModuleOptions
     *            the module options and their current substitution state
     * @param flags
     *            the flags specified, if any, beyond the standard {@code redact},
     *            {@code notBlank}, {@code notEmpty}, and {@code fromOption} flags
     * @param args
     *            the arguments specified, if any, beyond the standard
     *            {@code defaultValue} and {@code defaultOption} arguments
     * @return the substitution result associated with the given key, if any,
     *         otherwise null
     * @throws IOException
     *             if the request cannot be performed such that the use of a default
     *             value would be inappropriate
     */
    public abstract RedactableObject retrieveResult(String type, String key, boolean redact,
            SubstitutableModuleOptions substitutableModuleOptions, Set<String> flags, Map<String, String> args)
            throws IOException;


    @Override
    public RedactableObject doSubstitution(String type, List<String> modifiers, String value,
            SubstitutableModuleOptions substitutableModuleOptions) throws IOException {
        // etc...
    }
 
    // etc...
}
org.apache.kafka.common.security.oauthbearer.smo.OptionSubstituter
package org.apache.kafka.common.security.oauthbearer.smo;

/**
 * A {@code ModuleOptionSubstituter} that handles module option substitution.
 * <p>
 * Note that the {@code fromOption} modifier, if present, still adds an
 * additional level of indirection: the value indicates the option whose value,
 * in turn, indicates the option text to retrieve.
 */
public class OptionSubstituter implements ModuleOptionSubstituter {
    private final SubstituterHelper substituterHelper;
    public OptionSubstituter() {
        substituterHelper = new SubstituterHelper() {
            @Override
            public RedactableObject retrieveResult(String type, String key, boolean redact,
                    SubstitutableModuleOptions substitutableModuleOptions, Set<String> flags, Map<String, String> args)
                    throws IOException {
                if (!flags.isEmpty() || !args.isEmpty())
                    throw new IOException(String.format("Unknown extra flags/args: %s; %s", flags, args));
                RedactableObject substitutionResult = SubstitutableModuleOptionsCallbackHandler
                        .getSubstitutionResult(substitutableModuleOptions, key, false);
                return redact ? substitutionResult.redactedVersion() : substitutionResult;
            }
        };
    }
    @Override
    public RedactableObject doSubstitution(String type, List<String> modifiers, String value,
            SubstitutableModuleOptions substitutableModuleOptions) throws IOException {
        return substituterHelper.doSubstitution(type, modifiers, value, substitutableModuleOptions);
    }
}
org.apache.kafka.common.security.oauthbearer.smo.FileContentSubstituter
package org.apache.kafka.common.security.oauthbearer.smo;
 
/**
 * A {@code ModuleOptionSubstituter} that handles file content substitution.
 */
public class FileContentSubstituter implements ModuleOptionSubstituter {
    private final SubstituterHelper substituterHelper;
 
    public FileContentSubstituter() {
        substituterHelper = new SubstituterHelper() {
            @Override
            public RedactableObject retrieveResult(String type, String key, boolean redact,
                    SubstitutableModuleOptions substitutableModuleOptions, Set<String> flags, Map<String, String> args)
                    throws IOException {
                if (!flags.isEmpty() || !args.isEmpty())
                    throw new IOException(String.format("Unknown extra flags/args: %s; %s", flags, args));
                Path path = Paths.get(key);
                if (!Files.exists(path))
                    return null;
                BasicFileAttributes fileAttributes = Files.getFileAttributeView(path, BasicFileAttributeView.class)
                        .readAttributes();
                if (!fileAttributes.isRegularFile())
                    return null;
                long fileSize = fileAttributes.size();
                int maxSize = 1024 * 1024;
                if (fileSize > maxSize) {
                    throw new IOException(String.format("Type=%s: key=%s: file size exceeds max of %d: %d", type, key,
                            maxSize, fileSize));
                }
                String retval = new String(Files.readAllBytes(path), StandardCharsets.UTF_8);
                return new RedactableObject(retval, redact);
            }
        };
    }

    @Override
    public RedactableObject doSubstitution(String type, List<String> modifiers, String value,
            SubstitutableModuleOptions substitutableModuleOptions) throws IOException {
        return substituterHelper.doSubstitution(type, modifiers, value, substitutableModuleOptions);
    }
}
org.apache.kafka.common.security.oauthbearer.smo.SystemPropertySubstituter
package org.apache.kafka.common.security.oauthbearer.smo;
 
/**
 * A {@code ModuleOptionSubstituter} that handles system property substitution.
 * The JAAS config file already supports system property substitution via the
 * '{@code ${}' and '}' delimiters, but we support it here for syntactic
 * consistency as well as to add the functionality associated with modifiers.
 */
public class SystemPropertySubstituter implements ModuleOptionSubstituter {
    private final SubstituterHelper substituterHelper;

    public SystemPropertySubstituter() {
        substituterHelper = new SubstituterHelper() {
            @Override
            public RedactableObject retrieveResult(String type, String key, boolean redact,
                    SubstitutableModuleOptions substitutableModuleOptions, Set<String> flags, Map<String, String> args)
                    throws IOException {
                if (!flags.isEmpty() || !args.isEmpty())
                    throw new IOException(String.format("Unknown extra flags/args: %s; %s", flags, args));
                String retval = System.getProperty(key);
                return retval != null ? new RedactableObject(retval, redact) : null;
            }
        };
    }

    @Override
    public RedactableObject doSubstitution(String type, List<String> modifiers, String value,
            SubstitutableModuleOptions substitutableModuleOptions) throws IOException {
        return substituterHelper.doSubstitution(type, modifiers, value, substitutableModuleOptions);
    }
}
org.apache.kafka.common.security.oauthbearer.smo.EnvironmentVariableSubstituter
package org.apache.kafka.common.security.oauthbearer.smo;
 
/**
 * A {@code ModuleOptionSubstituter} that handles environment variable
 * substitution
 */
public class EnvironmentVariableSubstituter implements ModuleOptionSubstituter {
    private final SubstituterHelper substituterHelper;


    public EnvironmentVariableSubstituter() {
        substituterHelper = new SubstituterHelper() {
            @Override
            public RedactableObject retrieveResult(String type, String key, boolean redact,
                    SubstitutableModuleOptions substitutableModuleOptions, Set<String> flags, Map<String, String> args)
                    throws IOException {
                if (!flags.isEmpty() || !args.isEmpty())
                    throw new IOException(String.format("Unknown extra flags/args: %s; %s", flags, args));
                String retval = System.getenv(key);
                return retval != null ? new RedactableObject(retval, redact) : null;
            }
        };
    }

    @Override
    public RedactableObject doSubstitution(String type, List<String> modifiers, String value,
            SubstitutableModuleOptions substitutableModuleOptions) throws IOException {
        return substituterHelper.doSubstitution(type, modifiers, value, substitutableModuleOptions);
    }
}

Given the above code, and assuming the existence of the following mapping in the module options map:

    thePassword="$[file|redact|notBlank|=/path/to/secrets/the_secret]"

we can retrieve the contents of the file as follows:

Retrieving Values via a Callback
SubstitutableModuleOptions options = new SubstitutableModuleOptions(moduleOptionsMap);
boolean requiredToExist = true;
SubstitutableModuleOptionsCallback callback = new SubstitutableModuleOptionsCallback(options, "thePassword", requiredToExist);
CallbackHandler callbackHandler = new SubstitutableModuleOptionsCallbackHandler()
callbackHandler.handle(new Callback[] {callback});
String thePassword = callback.substitutionResult().value();

Alternatively, there is a convenience method that allows us to avoid using a callback if we wish:

Retrieving Values without a Callback
SubstitutableModuleOptions options = new SubstitutableModuleOptions(moduleOptionsMap);
boolean requiredToExist = true;
String thePassword = SubstitutableModuleOptionsCallbackHandler.getSubstitutionResult(substitutableModuleOptions,
        "thePassword", requiredToExist).value();

The initial set of supported substitution types and their specifiable constraints are as follows:

TypeDescriptionSpecifiable ConstraintsNotes
fileFile content substitution

notBlank, notEmpty, redact, fromOption

defaultValue=<value>,
defaultOption=<optionName>

It is an error if the file does not exist or is not readable unless defaultValue or defaultOption specified. If a defaultValue is specified then the literal default value specified will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If a defaultOption is specified then the value defined by the specified option will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the default option's value depends on substitutions that were marked redact then redact is implied. The fromOption modifier indicates that the value, instead of being the file to read, is instead the name of an option whose value is to be taken as the filename. This provides the ability to generate filenames from multiple substitutions as opposed to being forced to literally specify it.
envvarEnvironment variable substitutionsame as aboveIt is an error if the environment variable does not exist unless defaultValue or defaultOption specified. If a defaultValue is specified then the literal default value specified will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If a defaultOption is specified then the value defined by the specified option will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the default option's value depends on substitutions that were marked redact then redact is implied. The fromOption modifier indicates that the value, instead of being the environment variable to read, is instead the name of an option whose value is to be taken as the environment variable name. This provides the ability to generate environment variable names from multiple substitutions as opposed to being forced to literally specify it.
optionAnother module option value substitutionsame as aboveIt is an error if the option does not exist unless defaultValue or defaultOption specified. If a defaultValue is specified then the literal default value specified will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If a defaultOption is specified then the value defined by the specified option will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the default option's value depends on substitutions that were marked redact then redact is implied. The fromOption modifier indicates that the value, instead of being the option to read, is instead the name of an option whose value is to be taken as the option name. This provides the ability to generate option names from multiple substitutions as opposed to being forced to literally specify it.
syspropSystem property substitutionsame as aboveIt is an error if the system property does not exist unless defaultValue or defaultOption specified. If a defaultValue is specified then the literal default value specified will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If a defaultOption is specified then the value defined by the specified option will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the default option's value depends on substitutions that were marked redact then redact is implied. The fromOption modifier indicates that the value, instead of being the system property to read, is instead the name of an option whose value is to be taken as the system property name. This provides the ability to generate system property names from multiple substitutions as opposed to being forced to literally specify it.

To add new substitutions simply define a module option of the following form:

typeModuleOptionSubstituter = "fully.qualified.class.name"

For example:

fooModuleOptionSubstituter = "org.example.FooSubstituter"

The indicated class must implement the org.apache.kafka.common.security.oauthbearer.smo.ModuleOptionSubstituter interface.

Invoke the substitution with text in an option like this:

$[foo/optional/modifiers/=optionalValue]

 

Note that the above substitution functionality and the related classes and interfaces are not necessarily specific to OAuth Bearer Tokens, so an open question is whether this substitution-related code belongs in a sub-package of the main SASL/OAUTHBEARER one or if it should live somewhere else.


 

Token Retrieval

Token Retrieval

Token retrieval occurs on the client side of the SASL negotiation (in the producer/consumer, or on the broker when SASL/OAUTHBEARER is the inter-broker protocol), and the org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule class is the LoginModule implementation that creates and invokes an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenRetriever to perform the retrieval.  We provide the org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever class as a sample implementation that also provides value in development and testing situations.

org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
package org.apache.kafka.common.security.oauthbearer;

import org.apache.kafka.common.security.oauthbearer.internal.OAuthBearerSaslClientProvider;
import org.apache.kafka.common.security.oauthbearer.internal.OAuthBearerSaslServerProvider;
/**
 * A {@code LoginModule} for the SASL/OAUTHBEARER mechanism.
 * <p>
 * Example use on a non-broker client:
 *
 * <pre>
 * KafkaClient {
 *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
 *      reloginAllowedBeforeLogout="true"
 *      tokenRetriever="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever"
 *          stringClaim_sub="theClientPrincipalName"
 *          listClaim_scope="|scopeValue1|scopeValue2";
 * };
 * </pre>
 *
 * Example use on a broker:
 *
 * <pre>
 * KafkaServer {
 *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
 *      reloginAllowedBeforeLogout="true"
 *      tokenRetriever="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever"
 *          stringClaim_sub="theBrokerPrincipalName"
 *          listClaim_scope="|scopeValue1|scopeValue2"
 *      tokenValidator="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtValidator"
 *          principalClaimName="sub";
 * };
 * </pre>
 *
 * @see RefreshConfigProp#RELOGIN_ALLOWED_BEFORE_LOGOUT
 * @see OAuthBearerTokenRetriever
 * @see OAuthBearerTokenValidator
 */
public class OAuthBearerLoginModule implements ExpiringCredentialLoginModule {
    public static final String TOKEN_RETRIEVER_CLASS_NAME_OPTION = "tokenRetriever";
    public static final String TOKEN_VALIDATOR_CLASS_NAME_OPTION = "tokenValidator";
    private static final Set<String> MECHANISMS = Collections
            .unmodifiableSet(new HashSet<>(Arrays.asList("OAUTHBEARER")));
 
    static {
        OAuthBearerSaslClientProvider.initialize(); // not part of public API
        OAuthBearerSaslServerProvider.initialize(); // not part of public API
    }
 
    @Override
    public Set<String> mechanisms() {
        return MECHANISMS;
    }
    @Override
    public CallbackHandler newCallbackHandler() {
        return new SubstitutableModuleOptionsCallbackHandler();
    }
 
    // etc...

}
org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenRetriever
package org.apache.kafka.common.security.oauthbearer;
/**
 * An implementation of this interface must be configured for broker and
 * non-broker clients via JAAS when using the {@link OAuthBearerLoginModule}.
 * The configuration is done via the
 * {@value OAuthBearerLoginModule#TOKEN_RETRIEVER_CLASS_NAME_OPTION} option.
 */
public interface OAuthBearerTokenRetriever {
    /**
     * Retrieve a token using the given callback handler and JAAS login module
     * options.
     *
     * @param callbackHandler
     *            the mandatory callback handler. It will typically be capable of
     *            handling instances of {@link SubstitutableModuleOptionsCallback},
     *            though different implementations of this interface are free to
     *            differ in their requirements.
     * @param moduleOptionsMap
     *            the mandatory map representation of the <a href=
     *            "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
     *            options</a>
     * @return the retrieved token
     * @throws IOException
     *             if one or more networked resources required to perform the
     *             retrieval (e.g. a web service) is unavailable.
     * @throws UnsupportedCallbackException
     *             if the provided {@code CallbackHandler} cannot handle
     *             {@link SubstitutableModuleOptionsCallback}.
     * @throws OAuthBearerConfigException
     *             if there is a configuration problem that prevents this instance
     *             from functioning (a missing mandatory parameter, for example)
     * @throws LoginException
     *             if the retrieval fails for any other reason (if the token
     *             endpoint rejects the provided credentials, for example)
     */
    OAuthBearerToken retrieve(CallbackHandler callbackHandler, Map<String, String> moduleOptionsMap)
            throws IOException, UnsupportedCallbackException, OAuthBearerConfigException, LoginException;
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever
package org.apache.kafka.common.security.oauthbearer;

/**
 * An implementation of {@link OAuthBearerTokenRetriever} that generates an
 * unsecured JWT. Claims and their values can be specified using
 * stringClaim_&lt;claimname&gt;, numberClaim_&lt;claimname&gt;, and
 * listClaim_&lt;claimname&gt; options. The first character of the value is
 * taken as the delimiter for list claims. You may define any claim name and
 * value except '{@code iat}' and '{@code exp}', both of which are calculated
 * automatically.
 * <p>
 * This implementation also accepts the following options:
 * <ul>
 * <li><code>{@value #PRINCIPAL_CLAIM_NAME_OPTION}</code> set to a custom claim
 * name if you wish the name of the String claim holding the principal name to
 * be something other than <code>"sub"</code>.</li>
 * <li><code>{@value #LIFETIME_SECONDS_OPTION}</code> set to an integer value if
 * the token expiration is to be set to something other than the default value
 * of 3600 seconds (which is 1 hour). The 'exp' claim reflects the expiration
 * time.</li>
 * <li><code>{@value #SCOPE_CLAIM_NAME_OPTION}</code> set to a custom claim name
 * if you wish the name of the String or String List claim holding any token
 * scope to be something other than "<code>scope</code>"</li>
 * </ul>
 * For example:
 *
 * <pre>
 * KafkaClient {
 *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
 *      tokenRetriever="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever"
 *      reloginAllowedBeforeLogout="true"
 *         stringClaim_sub="thePrincipalName"
 *         listClaim_scope="|scopeValue1|scopeValue2"
 *         lifetimeSeconds="60";
 * };
 *
 * </pre>
 */
public class OAuthBearerUnsecuredJwtRetriever implements OAuthBearerTokenRetriever {
    public static final String PRINCIPAL_CLAIM_NAME_OPTION = "principalClaimName";
    public static final String LIFETIME_SECONDS_OPTION = "lifetimeSeconds";
    public static final String SCOPE_CLAIM_NAME_OPTION = "scopeClaimName";
 
    // etc...
 
}

 

Token Validation

Token Validation

Token validation occurs on the broker side of the SASL negotiation, and the SaslServer implementation registered by org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule creates and invokes an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenValidator to perform the validation.  We provide the org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtValidator class as a sample implementation that also provides value in development and testing situations.  There are additional utility classes as shown in the above diagram that are also reusable with other implementations.

org.apache.kafka.common.security.oauthbearer.OAuthBearerSaslServer
package org.apache.kafka.common.security.oauthbearer;

import javax.security.sasl.SaslServer;

/**
 * The public interface of SaslServer implementations of SASL/OAUTHBEARER for
 * Kafka. The validated access token will be available via the {@link #token()}
 * method. The existence of this interface keeps the actual implementation out
 * of the public API.
 */
public interface OAuthBearerSaslServer extends SaslServer {
    /**
     * Return the validated access token. Exposing the token here is necessary if we
     * wish to make it available for authorization decisions.
     *
     * @return the validated access token
     * @see <a href=
     *      "https://cwiki.apache.org/confluence/display/KAFKA/KIP-189%3A+Improve+principal+builder+interface+and+add+support+for+SASL">KIP-189:
     *      Improve principal builder interface and add support for SASL</a>
     */
    public OAuthBearerToken token();
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenValidator
package org.apache.kafka.common.security.oauthbearer;

/**
 * An implementation of this interface must be configured for brokers via JAAS
 * when using the {@link OAuthBearerLoginModule}. The configuration is done via
 * the {@value OAuthBearerLoginModule#TOKEN_VALIDATOR_CLASS_NAME_OPTION} option.
 */
public interface OAuthBearerTokenValidator {
    /**
     * Validate a token using the given callback handler and JAAS login module
     * options.
     *
     * @param tokenValue
     *            the <code>b64token</code> value as defined in
     *            <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750
     *            Section 2.1</a> to validate
     * @param callbackHandler
     *            the mandatory callback handler. It will typically be capable of
     *            handling instances of {@link SubstitutableModuleOptionsCallback},
     *            though different implementations of this interface are free to
     *            differ in their requirements. Set the callback handler via the
     *            {@code sasl.server.callback.handler.class.map} option in the
     *            broker properties file.
     * @param moduleOptionsMap
     *            the mandatory map representation of the <a href=
     *            "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
     *            options</a>
     * @return the successfully validated token
     * @throws IOException
     *             if one or more networked resources required to perform the
     *             validation (e.g. a web service) is unavailable.
     * @throws UnsupportedCallbackException
     *             if the provided {@code CallbackHandler} cannot handle
     *             {@link SubstitutableModuleOptionsCallback}.
     * @throws OAuthBearerConfigException
     *             if there is a configuration problem that prevents this instance
     *             from functioning (a missing mandatory parameter, for example)
     * @throws OAuthBearerIllegalTokenException
     *             if there is a problem with the token itself (it cannot be parsed
     *             or it otherwise fails validation)
     * @see BrokerSecurityConfigs#SASL_SERVER_CALLBACK_HANDLER_CLASS_MAP_DOC
     */
    OAuthBearerToken validate(String tokenValue, CallbackHandler callbackHandler, Map<String, String> moduleOptionsMap)
            throws IOException, UnsupportedCallbackException, OAuthBearerConfigException,
            OAuthBearerIllegalTokenException;
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtValidator
package org.apache.kafka.common.security.oauthbearer;

/**
 * An implementation of {@link OAuthBearerTokenValidator} that validates a JWS
 * without regard to its algorithm and signature. It requires there to be an
 * <code>"exp" (Expiration Time)</code> claim of type Number. If
 * <code>"iat" (Issued At)</code> or <code>"nbf" (Not Before)</code> claims are
 * present each must be a number that precedes the Expiration Time claim, and if
 * both are present the Not Before claim must not precede the Issued At claim.
 * It also accepts the following options, none of which are required:
 * <ul>
 * <li><code>{@value #PRINCIPAL_CLAIM_NAME_OPTION}</code> set to a non-empty
 * value if you wish a String claim holding a principal name to be checked for
 * existence; the default is to perform no such check</li>
 * <li><code>{@value #SCOPE_CLAIM_NAME_OPTION}</code> set to a custom claim name
 * if you wish the name of the String/String List claim holding any token scope
 * to be something other than <code>"scope"</code></li>
 * <li><code>{@value #REQUIRED_SCOPE_OPTION}</code> set to a space-delimited
 * list of scope values if you wish the String/String List claim holding the
 * token scope to be checked to make sure it contains certain values</li>
 * <li><code>{@value #ALLOWABLE_CLOCK_SKEW_MILLIS_OPTION}</code> set to a
 * positive integer value if you wish to allow up to some number of positive
 * milliseconds of clock skew (the default is 0)</li>
 * </ul>
 * For example:
 *
 * <pre>
 * KafkaServer {
 *      org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
 *      reloginAllowedBeforeLogout="true"
 *      tokenRetriever="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever"
 *         stringClaim_sub="thePrincipalName"
 *         listClaim_scope=",KAFKA_BROKER,LOGIN_TO_KAFKA"
 *      tokenValidator="org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtValidator"
 *          principalClaimName="sub"
 *          requiredScope="LOGIN_TO_KAFKA"
 *          allowableClockSkewMillis="3000";
 * };
 * </pre>
 */
public class OAuthBearerUnsecuredJwtValidator implements OAuthBearerTokenValidator {
    public static final String PRINCIPAL_CLAIM_NAME_OPTION = "principalClaimName";
    public static final String SCOPE_CLAIM_NAME_OPTION = "scopeClaimName";
    public static final String REQUIRED_SCOPE_OPTION = "requiredScope";
    public static final String ALLOWABLE_CLOCK_SKEW_MILLIS_OPTION = "allowableClockSkewMillis";
 
    // etc...
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidationUtils
package org.apache.kafka.common.security.oauthbearer;

public class OAuthBearerValidationUtils {
    /**
     * Validate the given claim for existence and type. It can be required to exist
     * in the given claims, and if it exists it must be one of the types indicated
     *
     * @param jwt
     *            the mandatory JWT to which the validation will be applied
     * @param required
     *            true if the claim is required to exist
     * @param claimName
     *            the required claim name identifying the clam to be checked
     * @param allowedTypes
     *            one or more of {@code String.class}, {@code Number.class}, and
     *            {@code List.class} identifying the type(s) that the claim value is
     *            allowed to be if it exists
     * @return the result of the validation
     */
    public static OAuthBearerValidationResult validateClaimForExistenceAndType(OAuthBearerJwt jwt, boolean required,
            String claimName, Class<?>... allowedTypes) {
        // etc...
    }
 
    /**
     * Validate the 'iat' (Issued At) claim. It can be required to exist in the
     * given claims, and if it exists it must be a (potentially fractional) number
     * of seconds since the epoch defining when the JWT was issued; it is a
     * validation error if the Issued At time is after the time at which the check
     * is being done (plus any allowable clock skew).
     *
     * @param jwt
     *            the mandatory JWT to which the validation will be applied
     * @param required
     *            true if the claim is required to exist
     * @param whenCheckTimeMillis
     *            the time relative to which the validation is to occur
     * @param allowableClockSkewMillis
     *            non-negative number to take into account some potential clock skew
     * @return the result of the validation
     * @throws OAuthBearerConfigException
     *             if the given allowable clock skew is negative
     */
    public static OAuthBearerValidationResult validateIssuedAt(OAuthBearerJwt jwt, boolean required,
            long whenCheckTimeMillis, int allowableClockSkewMillis) throws OAuthBearerConfigException {
        // etc...
    }
 
    /**
     * Validate the 'nbf' (Not Before) claim. It can be required to exist in the
     * given claims, and if it exists it must be a (potentially fractional) number
     * of seconds since the epoch defining the point at which the JWT becomes valid.
     * It is a validation error if the time at which the check is being done (plus
     * any allowable clock skew) is before the Not Before time.
     *
     * @param jwt
     *            the mandatory JWT to which the validation will be applied
     * @param required
     *            true if the claim is required to exist
     * @param whenCheckTimeMillis
     *            the time relative to which the validation is to occur
     * @param allowableClockSkewMillis
     *            non-negative number to take into account some potential clock skew
     * @return the result of the validation
     * @throws OAuthBearerConfigException
     *             if the given allowable clock skew is negative
     */
    public static OAuthBearerValidationResult validateNotBefore(OAuthBearerJwt jwt, boolean required,
            long whenCheckTimeMillis, int allowableClockSkewMillis) throws OAuthBearerConfigException {
        // etc...
    }
 
    /**
     * Validate the 'exp' (Expiration Time) claim. It can be required to exist in
     * the given claims, and if it exists it must be a (potentially fractional)
     * number of seconds defining the point at which the JWT expires. It is a
     * validation error if the time at which the check is being done (minus any
     * allowable clock skew) is on or after the Expiration Time time.
     *
     * @param jwt
     *            the mandatory JWT to which the validation will be applied
     * @param required
     *            true if the claim is required to exist
     * @param whenCheckTimeMillis
     *            the time relative to which the validation is to occur
     * @param allowableClockSkewMillis
     *            non-negative number to take into account some potential clock skew
     * @return the result of the validation
     * @throws OAuthBearerConfigException
     *             if the given allowable clock skew is negative
     */
    public static OAuthBearerValidationResult validateExpirationTime(OAuthBearerJwt jwt, boolean required,
            long whenCheckTimeMillis, int allowableClockSkewMillis) throws OAuthBearerConfigException {
        // etc...
    }
 
    /**
     * Validate the 'iat' (Issued At), 'nbf' (Not Before), and 'exp' (Expiration
     * Time) claims for internal consistency. For claim pairs that exist, the
     * following must be true:
     * <ul>
     * <li>nbf &gt;= iat</li>
     * <li>exp &gt; iat</li>
     * <li>exp &gt; nbf</li>
     * </ul>
     *
     * @param jwt
     *            the mandatory JWT to which the validation will be applied
     * @return the result of the validation
     */
    public static OAuthBearerValidationResult validateTimeConsistency(OAuthBearerJwt jwt) {
        // etc...
    }
 
    /**
     * Validate the given scope against the required scope. Every required scope
     * element (if any) must exist in the provided scope for the validation to
     * succeed.
     *
     * @param scope
     *            the optional scope to validate
     * @param requiredScope
     *            the optional required scope against which the given scope will be
     *            validated
     * @return the result of the validation
     * @throws IOException
     *             if one or more required networked resources (e.g. to re-hydrate
     *             an opaque token) is unavailable.
     * @throws OAuthBearerIllegalTokenException
     *             if there is something fundamentally wrong with the token (if it
     *             is malformed, for example)
     */
    public static OAuthBearerValidationResult validateScope(OAuthBearerToken token, List<String> requiredScope)
            throws IOException, OAuthBearerIllegalTokenException {
        // etc...
    }
 
    // etc...
 
    private OAuthBearerValidationUtils() {
        // empty
    }
}
org.apache.kafka.common.security.oauthbearer.OAuthBearerScopeUtils
package org.apache.kafka.common.security.oauthbearer;

/**
 * Utility class for help dealing with
 * <a href="https://tools.ietf.org/html/rfc6749#section-3.3">Access Token
 * Scopes</a>
 */
public class OAuthBearerScopeUtils {
    private static final Pattern INDIVIDUAL_SCOPE_ITEM_PATTERN = Pattern.compile("[\\x23-\\x5B\\x5D-\\x7E\\x21]+");

    /**
     * Return true if the given value meets the definition of a valid scope item as
     * per <a href="https://tools.ietf.org/html/rfc6749#section-3.3">RFC 6749
     * Section 3.3</a>, otherwise false
     *
     * @param scopeItem
     *            the mandatory scope item to check for validity
     * @return true if the given value meets the definition of a valid scope item,
     *         otherwise false
     */
    public static boolean isValidScopeItem(String scopeItem) {
        return INDIVIDUAL_SCOPE_ITEM_PATTERN.matcher(Objects.requireNonNull(scopeItem)).matches();
    }

    /**
     * Convert a space-delimited list of scope values (for example,
     * <code>"scope1 scope2"</code>) to a List containing the individual elements
     * (<code>"scope1"</code> and <code>"scope2"</code>)
     *
     * @param spaceDelimitedScope
     *            the mandatory (but possibly empty) space-delimited scope values,
     *            each of which must be valid according to
     *            {@link #isValidScopeItem(String)}
     * @return the list of the given (possibly empty) space-delimited values
     * @throws OAuthBearerConfigException
     *             if any of the individual scope values are malformed/illegal
     */
    public static List<String> parseScope(String spaceDelimitedScope) throws OAuthBearerConfigException {
        // etc...
    }


    private OAuthBearerScopeUtils() {
        // empty
    }
}

 

Proposed Changes

Thanks to KIP-86, no changes to the existing code base are required – all functionality represents additions (rather than changes) to the existing code base.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

None

  • If we are changing behavior how will we phase out the older behavior?

There is no change to existing behavior

Rejected Alternatives

Exceptions

<<<NOTE: Please consider this to be open for debate at this point>>>

Unchecked exceptions can be added or deleted without breaking binary compatibility, so it would seem that OAuthBearerException should be unchecked and we should forego the compiler help provided by the use of checked exceptions.  However, the set of exception types and the size of the overall code base are both small, and the risk that we would want to change the type(s) of exception(s) thrown is minimal.

Token Refresh

<<<NOTE: Please consider this to be open for debate at this point>>>

We implement org.apache.kafka.common.security.oauthbearer.ExpiringCredential as an interface rather than as a class because an interface creates less of a constraint – implementers of other types of expiring credentials remain free to extend any class they want.  The risk of having to add a new method while Java 7 is still a supported target (which would break forward compatibility) is low.

We considered associating refresh-related properties (such as the minimum refresh period in milliseconds) with the ExpiringCredential rather than the ExpiringCredentialRefreshingLogin instance because the ExpiringCredentialRefreshingLogin instance couldn't know which of the potentially multiple login modules actually applies (i.e. which is the one associated with the inter-broker protocol); it wouldn't always know how to find the JAAS config options, so it wouldn't always know how to get the refresh configuration.  There was problem with this aproach, though: we can't invoke login() on the LoginContext and get an ExpiringCredential instance without a CallbackHandler, so we needed to know the type of CallbackHandler to instantiate – and there is no way to know that.  It simply made sense to give the ExpiringCredentialRefreshingLogin instance the ability to discover the correct login module in all cases and be able to ask it for the CallbackHandler instance; hence we created the ExpiringCredentialLoginModule interface.

Substitutable Module Options

We do not support nested substitution because the combination of defaultValue, defaultOption, and fromOption modifiers provides the same cpability wthout the need for nesting.  Nesting can be difficult to read and difficult to implement correctly.

 

  • No labels