Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Checked (rather than unchecked) exceptions; referred to KIP-269 Substitution Within Configuration Values

...

Note that the access token can be made available to the broker for authorization decisions due to KIP-189 (by exposing the access token on the SaslServer implementation), but detailed discussion of this possibility is outside the scope of this proposal.  It is noted, however, that if access tokens are somehow used for authorization decisions, it is conceivable due to the long-lived nature of Kafka connections that authorization decisions will sometimes be made using expired access tokens.  For example, it is up to the broker to validate the token upon authentication, but the token will not be replaced for that particular connection as long as it remains intact; if the token expires in an hour then authorization decisions for that first hour will be made using the still-valid token, but after an hour the expired token would remain associated with the connection, and authorization decisions from that point forward for that particular connection would be made using the expired token.  This would have to be addressed via a separate KIP if it turns out to be problematic, but that seems unlikely (code signing certificates that have been timestamped remain valid after their expiration, for example, and access tokens are indeed timestamped). 

Note also that the implementation of flexible, substitution-aware configuration that was originally proposed in an early draft of this KIP was deemed more generally useful and has been separated out into its own KIP-269 Substitution Within Configuration Values, which is now a prerequisite for this one.

Public Interfaces

Note that most of the implementation of this KIP will be public-facing.  The following sections define the various parts, and each includes an overall UML diagram as well as important code details (with Javadoc).

...

See Rejected Alternatives: Exceptions

ExceptionsImage RemovedImage Added

We define a small exception hierarchy to cover the various cases related to the SASL/OAUTHBEARER code.

Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerException
collapsetrue
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * Base class for all exceptions thrown by the SASL/OAUTHBEARER code. Unlike the
 * unchecked {@link KafkaException} hierarchy, this hierarchy can be checked to
 * obtain compiler help because instances are never exposed to the core Kafka
 * code base and there is minimal risk of changing the set of thrown checked
 * exceptions and impacting contract/source code compatibility given the small
 * size of this code base.
 */
public abstract class OAuthBearerException extends ExceptionKafkaException {
    // etc...
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.OAuthBearerConfigException
collapsetrue
package org.apache.kafka.common.security.oauthbearer;
 
/**
 * Exception thrown when there is a problem with the configuration (an invalid
 * option in a JAAS config, for example).
 */
public class OAuthBearerConfigException extends OAuthBearerException {
    // etc...
}

...

Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.JwtClaim
collapsetrue
package org.apache.kafka.common.security.oauthbearer;

/**
 * JSON Web Token Claims
 *
 * @see <a href="https://www.iana.org/assignments/jwt/jwt.xhtml">JSON Web Token
 *      Claims</a>
 */
public enum JwtClaim {
    ISSUER("iss"),
    SUBJECT("sub"),
    AUDIENCE("aud"),
    EXPIRATION_TIME("exp"),
    NOT_BEFORE("nbf"),
    ISSUED_AT("iat"),
    JWT_ID("jti");

    private String claimName;

    private JwtClaim(String claimName) {
        this.claimName = claimName;
    }

    public String claimName() {
        return claimName;
    }
}

Substitutable Module Options

See Rejected Alternatives: Substitutable Module Options

Substitutable Module OptionsImage Removed

The mechanics of token retrieval and token validation (both described later) will differ across OAuth deployments.  For example, for token retrieval, each deployment will inject credentials to the token endpoint differently, and the parameters sent to the token endpoint may also differ.  Token validation will also differ because OAuth supports multiple methods of validation.  Configuration of the retrieval and validation mechanisms, which are done via JAAS configuration, must therefore be flexible.  In particular, while we collaboratively use instances implementing javax.security.auth.Callback and javax.security.auth.CallbackHandler to retrieve information, we can't know in advance what information will be required in order to retrieve or validate a token; a username and password might – or might not – be required, for example, and retrieval and validation will likely require much more as well.  We also don't know where this information will come from: a file?  an environment variable?  a database?  Somewhere else?  We need implementations of the Callback and CallbackHandler interfaces that are just as flexible as we need the JAAS configuration to be.

JAAS config options (each of which is an individual element of the space-separated ModuleOptions list), in combination with appropriate Callback and CallbackHandler implementations, will support arbitrarily complex retrieval and substitution inside the option value.  The JAAS Configuration spec already supports system property substitution via the ${system.property} syntax; we will implement support for arbitrarily complex substitutions.  For example, the following would support substitution of the contents of a file (which is a common way to store secrets, especially within containers) into the option value:

thePassword="$[file|redact|notBlank|defaultOption=fileDefault|=/path/to/secrets/the_secret]"

There are several features here that deserve comment:

  • The "$[" and "]" delimiters are the signal to perform a substitution (we can't use "${" and "}" because that is already defined by the JAAS Configuration spec to mean system property substitution).  Note that we will not allow substitution within a substitution, and in fact it is not needed as described below.  We will also support "$[[" and "]]" as delimiters (all the way up to 5 brackets, actually) to allow  "$[" and "]" to appear in text without causing substitution. 
  • Immediately inside the opening delimiter is the type of substitution followed by any optional modifiers we wish to apply.  In the above, we identify this as a file substitution and we indicate three modifiers: the resulting value should never be logged (i.e. store it such that its value wil be redacted when logged); the contents of the file must not be blank (meaning it must not be empty or only contain whitespace); if the file does not exist or its contents are blank then use the value of the "fileDefault" option (which could itself have substitutions).  It is an error if any constraints implied by the modifiers are violated.  Any punctuation character except the equal sign (=) can be used to delimit the modifiers.
  • Immediately after the type of substitution and any optional modifiers is an equal sign ("=") followed by the value (which in the case of the "file" type is interpreted as the filename to read); then ultimately the closing delimiter appears.

This scheme is flexible and powerful; it handles most cases, but it remains relatively easy to create and read.  Importantly, the types of replacements can be expanded in the future without breaking compatibility.  See the Javadoc for the org.apache.kafka.common.security.oauthbearer.smo.SubstitutableModuleOptionsCallbackHandler class for more details.

How would we retrieve the above value?  We define the following representation of the module options and their substitution state along with associated Callback and CallbackHandler implementations and the org.apache.kafka.common.security.oauthbearer.smo.RedactableObject classs for holding the results.  We also define several built-in substitutions and a mechanism for adding new ones.

Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.smo.SubstitutableModuleOptions
collapsetrue
package org.apache.kafka.common.security.oauthbearer.smo;

/**
 * Holds state regarding which {@code LoginModule} <a href=
 * "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
 * options</a> have been substituted -- both completely (via
 * {@link #substitutionResults}) as well as in-progress (via
 * {@link #withOptionEvaluationCausingCircularReference(String)}). Instances of
 * this class are thread-safe.
 *
 * @see SubstitutableModuleOptionsCallbackHandler
 * @see SubstitutableModuleOptionsCallback
 * @see RedactableObject
 */
public class SubstitutableModuleOptions {
    private final Map<String, String> moduleOptionsMap;
    private final ConcurrentHashMap<String, RedactableObject> substitutionResults;
    private final SubstitutableModuleOptions delegate;
    private final String optionCausingCircularReference;
 
    /**
     * Constructor
     *
     * @param moduleOptionsMap
     *            the mandatory map representation of the <a href=
     *            "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
     *            options</a>
     */
    public SubstitutableModuleOptions(Map<String, String> moduleOptionsMap) {
        this.moduleOptionsMap = Collections.unmodifiableMap(Objects.requireNonNull(moduleOptionsMap));
        this.substitutionResults = new ConcurrentHashMap<>();
        this.delegate = null;
        this.optionCausingCircularReference = null;
    }
 
    /**
     * Constructor to create a new instance based on the given instance except that
     * the given option is marked as causing a circular reference if an attempt is
     * made to evaluate it
     *
     * @param delegate
     *            the mandatory instance from which the view will be created
     * @param optionCausingCircularReference
     *            the mandatory option that causes a circular reference if it is
     *            evaluated
     * @see #withOptionEvaluationCausingCircularReference(String)
     * @see #optionEvaluationCausesCircularReference(String)
     */
    public SubstitutableModuleOptions(SubstitutableModuleOptions delegate, String optionCausingCircularReference) {
        this.moduleOptionsMap = delegate.moduleOptionsMap;
        this.substitutionResults = delegate.substitutionResults;
        this.delegate = delegate;
        this.optionCausingCircularReference = optionCausingCircularReference;
    }
 
    /**
     * Return (an unmodifiable copy of) the original module options map provided
     * during construction
     *
     * @return (an unmodifiable copy of) the original module options map provided
     *         during construction
     */
    public Map<String, String> moduleOptionsMap() {
        return moduleOptionsMap;
    }
 
    /**
     * Convenience method to indicate if a particular option name exists, returning
     * true if it does, otherwise false
     *
     * @param optionName
     *            the mandatory option name
     * @return true if the indicated option exists, otherwise false
     */
    public boolean optionExists(String optionName) {
        return moduleOptionsMap.containsKey(Objects.requireNonNull(optionName));
    }
 
    /**
     * Create and return a new view of this instance where in that context the given
     * option is in the process of being evaluated such that an attempt to evaluate
     * its substitution value again within the same context would cause a circular
     * reference
     *
     * @param optionName
     *            the mandatory option name
     * @return a new view of this instance where in that context the given option is
     *         in the process of being evaluated such that an attempt to evaluate
     *         its substitution value again within the same context would cause a
     *         circular reference
     * @see #optionEvaluationCausesCircularReference(String)
     */
    public SubstitutableModuleOptions withOptionEvaluationCausingCircularReference(String optionName) {
        return new SubstitutableModuleOptions(this, Objects.requireNonNull(optionName));
    }
 
    /**
     * Indicate if, in the context of this instance's view, the given option is in
     * the process of being evaluated; return true if an attempt to evaluate the
     * option's substitution value would cause a circular reference, otherwise false
     *
     * @param optionName
     *            the mandatory option name
     * @return true if an attempt to evaluate the option's substitution value would
     *         cause a circular reference, otherwise false
     * @see #withOptionEvaluationCausingCircularReference(String)
     */
    public boolean optionEvaluationCausesCircularReference(String optionName) {
        return Objects.requireNonNull(optionName).equals(optionCausingCircularReference)
                || (delegate != null && delegate.optionEvaluationCausesCircularReference(optionName));
    }
 
    /**
     * Return an unmodifiable map identifying which module options have been
     * processed for substitution and the result (if any). A module option is
     * guaranteed to have been processed for substitution and its name will appear
     * as a key in the returned map only after
     * {@link #setSubstitutionResult(String, RedactableObject)} is called.
     *
     * @return an unmodifiable map identifying which module options have been
     *         processed for substitution and the result (if any)
     */
    public Map<String, RedactableObject> substitutionResults() {
        return Collections.unmodifiableMap(substitutionResults);
    }
 
    /**
     * Identify that the option with the given name has had substitution performed
     * for it yielding the given result. This method is idempotent; invoking it with
     * a substitution result equal to the current substitution result (as defined by
     * {@code Object.equals()}) has no effect. The substitution result for an option
     * cannot be changed (again, as defined by {@code Object.equals()}) once it has
     * been set; an attempt to do so will raise an exception.
     *
     * @param optionName
     *            the mandatory option name, which must exist in the map returned by
     *            {@link #moduleOptionsMap()}
     * @param substitutionResult
     *            the mandatory substitution result to set
     */
    public void setSubstitutionResult(String optionName, RedactableObject substitutionResult) {
        if (!moduleOptionsMap.containsKey(Objects.requireNonNull(optionName))) {
            throw new IllegalArgumentException(String.format("Unknown module option name: %s", optionName));
        }
        RedactableObject priorSubstitutionResult = substitutionResults.putIfAbsent(optionName,
                Objects.requireNonNull(substitutionResult));
        if (priorSubstitutionResult != null && !priorSubstitutionResult.equals(substitutionResult))
            throw new IllegalArgumentException(String
                    .format("Cannot change the substitution result for the module option with name '%s'", optionName));
    }
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.smo.SubstitutableModuleOptionsCallback
collapsetrue
package org.apache.kafka.common.security.oauthbearer.smo;

/**
 * A {@code Callback} related to introspection requests against a JAAS
 * configuration
 *
 * @see SubstitutableModuleOptionsCallbackHandler
 * @see SubstitutableModuleOptions
 */
public class SubstitutableModuleOptionsCallback implements Callback {
    private final SubstitutableModuleOptions substitutableModuleOptions;
    private final String optionName;
    private final boolean optionRequiredToExist;

    /**
     * Constructor
     *
     * @param substitutableModuleOptions
     *            the mandatory substitutable module options
     * @param optionName
     *            the requested option
     * @param optionRequiredToExist
     *            if true then the requested option is required to exist
     */
    public SubstitutableModuleOptionsCallback(SubstitutableModuleOptions substitutableModuleOptions, String optionName,
            boolean optionRequiredToExist) {
        this.substitutableModuleOptions = Objects.requireNonNull(substitutableModuleOptions);
        this.optionName = Objects.requireNonNull(optionName);
        this.optionRequiredToExist = optionRequiredToExist;
    }
 
    /**
     * Return the substitutable module options provided at construction time
     *
     * @return the substitutable module options provided at construction time
     */
    public SubstitutableModuleOptions substitutableModuleOptions() {
        return substitutableModuleOptions;
    }

    /**
     * Return the option name provided at construction time
     *
     * @return the option name provided at construction time
     */
    public String optionName() {
        return optionName;
    }
 
    /**
     * Return true if the the requested option is required to exist, otherwise false
     * 
     * @return true if the the requested option is required to exist, otherwise
     *         false
     */
    public boolean optionRequiredToExist() {
        return optionRequiredToExist;
    }
 
    /**
     * Identify that the option identified by {@link #optionName()}, on the instance
     * returned by {@link #substitutableModuleOptions()}, has had substitution
     * performed for it yielding the given result. This method is idempotent;
     * invoking it with a substitution result equal to the option's current
     * substitution result (as defined by {@code Object.equals()}) has no effect.
     * The substitution result for an option cannot be changed (again, as defined by
     * {@code Object.equals()}) once it has been set; an attempt to do so will raise
     * an exception.
     *
     * @param substitutionResult
     *            the mandatory substitution result to set
     */
    public void setSubstitutionResult(RedactableObject substitutionResult) {
        substitutableModuleOptions.setSubstitutionResult(optionName, Objects.requireNonNull(substitutionResult));
    }

    /**
     * Return the substitution result, if any has been set, otherwise null. Note
     * that the result may not have been set via a call to
     * {@link #setSubstitutionResult(RedactableObject)}; it is possible that the
     * result was already set prior to construction of this instance, in which case
     * that result will be returned here.
     *
     * @return the substitution result, if any has been set, otherwise null
     */
    public RedactableObject substitutionResult() {
        return substitutableModuleOptions.substitutionResults().get(optionName);
    } 
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.smo.SubstitutableModuleOptionsCallbackHandler
collapsetrue
package org.apache.kafka.common.security.oauthbearer.smo;

/**
 * A {@code CallbackHandler} that handles introspection requests against a JAAS
 * configuration and supports substitution into option values via delimited text
 * of the following form:
 *
 * <pre>
 * &lt;OPENING_DELIMITER&gt;&lt;TYPE&gt;&lt;OPTIONAL_MODIFIERS&gt;=&lt;OPTIONAL_VALUE&gt;&lt;CLOSING_DELIMITER&gt;
 * </pre>
 *
 * Where the above elements are defined as follows:
 *
 * <pre>
 * OPENING_DELIMITER: $[, $[[, $[[[, $[[[[, or $[[[[[
 * CLLOSING_DELIMITER: ],  ]],  ]]],  ]]]], or  ]]]]] (number of brackets must match)
 *
 * TYPE: everything up to (but not including) the first punctuation character
 *
 * OPTIONAL_MODIFIERS: the optional section immediately after the TYPE, starting with any
 *                     punctuation character except for the equal sign (=), and ending
 *                     with the same punctuation character followed immediately by an
 *                     equal sign. The same punctuation character delimits individual
 *                     modifiers, which come in two flavors: flags, which do not contain
 *                     an equal sign, and name=value arguments, which do.
 *
 * OPTIONAL_VALUE: the optional section immediately after the modifier section-delimiting
 *                 punctuation character (if any) and the equal sign (=). It is not trimmed.
 * </pre>
 *
 * For example:
 *
 * <pre>
 * $[envvar=THE_ENV_VAR]
 * $[envvar/notBlank/redact/=THE_ENV_VAR]
 * $[envvar/defaultValue = theDefaultValue/=THE_ENV_VAR]
 * $[envvar/defaultOption = theOptionName/=THE_ENV_VAR]
 * $[file|redact|notBlank|=/the/path/to/the/file]
 * </pre>
 *
 * Working left to right, once the delimiter is defined for a module option (for
 * example, {@code $[} and {@code ]}), only that delimiter is recognized for the
 * rest of that option (and it is always recognized as meaning substitution for
 * the rest of that module option). A special "empty" substitution does nothing
 * except, when it appears to the left of every other occurrence of matching
 * delimiters, it serves to force the delimiter for that module option to the
 * one indicated . For example, to force the delimiter to {@code $[[} and
 * {@code ]]} (and prevent {@code $[} and {@code ]} from causing substitution)
 * for a module option:
 *
 * <pre>
 * optionName = "$[[]]These $[ and ] delimiters do not cause substitution"
 * </pre>
 *
 * The following substitutions are supported, though it is straightforward to
 * add others (see below):
 *
 * <ul>
 * <li>{@code envvar}: substitute the value of an environment variable</li>
 * <li>{@code sysprop}: substitute the value of a system property</li>
 * <li>{@code file}: substitute the contents of a file</li>
 * <li>{@code option}: substitute the contents of another option</li>
 * </ul>
 *
 * The above substitutions support the following modifiers:
 *
 * <ul>
 * <li>{@code redact}: prevent values from being logged</li>
 * <li>{@code notEmpty}: the value must not be empty</li>
 * <li>{@code notBlank}: the value must not be blank (i.e. consisting only of
 * whitespace); implies {@code notEmpty}.</li>
 * <li>{@code defaultValue=<value>}: substitute the given literal value if the
 * substitution cannot otherwise be made (either because the value does not
 * exist or the determined value was disallowed because it was empty or blank).
 * The substituted value must satisfy any modifiers that act as
 * constraints.</li>
 * <li>{@code defaultOption=<optionName>}: substitute the value of the indicated
 * option if the substitution cannot otherwise be made (either because the value
 * does not exist or the determined value was disallowed because it was empty or
 * blank). The substituted value must satisfy any modifiers that act as
 * constraints.</li>
 * <li>{@code fromOption}: provides a level of indirection so that the value,
 * instead of always being literally specified (i.e. read this file, or this
 * environment variable), can be determined via some other option. This allows,
 * for example, the filename, system property name, etc. to be generated
 * potentially from multiple substitutions concatenated together.</li>
 * </ul>
 *
 * To add new substitutions simply define a module option of the following form:
 *
 * <pre>
 * typeModuleOptionSubstituter = "fully.qualified.class.name"
 * </pre>
 *
 * For example:
 *
 * <pre>
 * fooModuleOptionSubstituter = "org.example.FooSubstituter"
 * </pre>
 *
 * The indicated class must implement {@link ModuleOptionSubstituter}, and you
 * can invoke the substitution like this:
 *
 * <pre>
 * $[foo/optional/modifiers/=optionalValue]
 * </pre>
 *
 * @see SubstitutableModuleOptionsCallback
 * @see SubstitutableModuleOptions
 * @see ModuleOptionSubstituter
 */
public class SubstitutableModuleOptionsCallbackHandler implements AuthenticateCallbackHandler {
    // etc...
 
    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        for (Callback callback : callbacks) {
            if (callback instanceof SubstitutableModuleOptionsCallback) {
                SubstitutableModuleOptionsCallback substitutableModuleOptionsCallback = (SubstitutableModuleOptionsCallback) callback;
                RedactableObject substitutionResult = getSubstitutionResult(
                        substitutableModuleOptionsCallback.substitutableModuleOptions(),
                        substitutableModuleOptionsCallback.optionName(),
                        substitutableModuleOptionsCallback.optionRequiredToExist(), forceDebugLogForTesting);
                if (substitutionObject != null)
                    // depend on idempotence here because the substitution object has already been
                    // set on the underlying SubstitutableModuleOptions instance
                    substitutableModuleOptionsCallback.setSubstitutionResult(substitutionObject);
            } else
                throw new UnsupportedCallbackException(callback,
                        String.format("Unrecognized Callback type: %s", callback.getClass().getName()));
        }
    }
 
    /**
     * Perform substitution without using callbacks and a callback handler.
     *
     * @param substitutableModuleOptions
     *            the mandatory substitutable module options to query
     * @param optionName
     *            the mandatory requested option name
     * @param optionRequiredToExist
     *            if true then the requested option is required to exist
     * @return the given option's substitution result, after any required
     *         substitution is applied, or null if the option does not exist and it
     *         was not required to exist
     * @throws IOException
     *             if a required substitution cannot be performed, including if the
     *             given (or any other) required option does not exist
     */
    public static RedactableObject getSubstitutionResult(SubstitutableModuleOptions substitutableModuleOptions,
            String optionName, boolean optionRequiredToExist) throws IOException {
        // etc...
    }
    /**
     * Return the server configuration provided during
     * {@link #configure(Map, String, List)}, if any, otherwise null
     *
     * @return the server configuration provided during
     *         {@link #configure(Map, String, List)}, if any, otherwise null
     */
    public Map<String, ?> serverConfig() {
        return serverConfig;
    }

    /**
     * Return the SASL mechanism provided during
     * {@link #configure(Map, String, List)}, if any, otherwise null
     *
     * @return the SASL mechanism provided during
     *         {@link #configure(Map, String, List)}, if any, otherwise null
     */
    public String mechanism() {
        return mechanism;
    }

    /**
     * Return the JAAS login module configurations provided during
     * {@link #configure(Map, String, List)}, if any, otherwise null
     *
     * @return the JAAS login module configurations provided during
     *         {@link #configure(Map, String, List)}, if any, otherwise null
     */
    public List<AppConfigurationEntry> jaasConfigEntries() {
        return jaasConfigEntries;
    }
 
    // etc...
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.smo.RedactableObject
collapsetrue
package org.apache.kafka.common.security.oauthbearer.smo;

/**
 * An object whose text value can be redacted
 */
public class RedactableObject {
    static final String REDACTED = "[redacted]";
    private final Object object;
    private final boolean redacted;

    /**
     * Constructor
     *
     * @param object
     *            the mandatory object
     * @param redact
     *            when true the object's value will be redacted in
     *            {@link #redactedText()}
     */
    public RedactableObject(Object object, boolean redact) {
        this.object = Objects.requireNonNull(object);
        this.redacted = redact;
    }


    /**
     * Return the (always non-null) underlying object provided during instance
     * construction
     *
     * @return the (always non-null) underlying object provided during instance
     *         construction
     */
    public Object object() {
        return object;
    }
 
    /**
     * Return true if this instance contains information that will be redacted when
     * {@link #redactedText()} is invoked, otherwise false
     *
     * @return true if this instance contains information that will be redacted when
     *         {@link #redactedText()} is invoked, otherwise false
     */
    public boolean isRedacted() {
        return redacted;
    }

    /**
     * Return the redacted text for this instance, if redaction is required,
     * otherwise return the {@link #value()}
     *
     * @return the redacted text for this instance, if redaction is required,
     *         otherwise return the {@link #value()}
     */
    public String redactedText() {
        return redacted ? REDACTED : value();
    }

    /**
     * Return the {@code String} value of this instance, including information that
     * would otherwise be redacted
     *
     * @return the {@code String} value of this instance, including information that
     *         would otherwise be redacted
     */
    public String value() {
        if (object instanceof String)
            return (String) object;
        throw new IllegalStateException(
                String.format("Unknown substitution result object type: %s", object.getClass().getName()));
    }
 
    /**
     * Return true if this result is considered to be empty, otherwise false
     *
     * @return true if this result is considered to be empty, otherwise false
     */
    public boolean isEmpty() {
        return value().isEmpty();
    }

    /**
     * Return true if this result is considered to be blank (containing at most just
     * whitespace), otherwise false
     *
     * @return true if this result is considered to be blank (containing at most
     *         just whitespace), otherwise false
     */
    public boolean isBlank() {
        return value().trim().isEmpty();
    }

    /**
     * Return this instance if it is redacted according to {@link #isRedacted()},
     * otherwise return a new, redacted instance with the same underlying object
     *
     * @return this instance if it is redacted according to {@link #isRedacted()},
     *         otherwise return a new, redacted instance with the same underlying
     *         object
     */
    public RedactableObject redactedVersion() {
        return redacted ? this : new RedactableObject(object, true);
    }
 
    @Override
    public String toString() {
        // be sure to redact information as required
        return redactedText();
    }


    @Override
    public boolean equals(Object obj) {
        if (!(obj instanceof RedactableObject))
            return false;
        RedactableObject other = (RedactableObject) obj;
        /*
         * Note that differences in redaction cause inequality
         */
        return redacted == other.redacted && object.equals(other.object);
    }


    @Override
    public int hashCode() {
        return object.hashCode();
    }
} 
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.smo.ModuleOptionSubstituter
collapsetrue
package org.apache.kafka.common.security.oauthbearer.smo;

/**
 * The interface that pluggable module option substituter classes must
 * implement.
 *
 * @see SubstitutableModuleOptionsCallbackHandler
 */
public interface ModuleOptionSubstituter {
    /**
     * Perform the substitution of the given type on the given options using the
     * given modifiers and value
     *
     * @param type
     *            the (always non-null) type of substitution to perform
     * @param modifiers
     *            the (always non-null but potentially empty) modifiers to apply, if
     *            any. They are presented exactly as they appear in the
     *            configuration, with no whitespace trimming applied.
     * @param value
     *            the always non-null (but potentially empty) value
     * @param substitutableModuleOptions
     *            the module options and their current substitution state
     * @return the (always non-null) result of performing the substitution
     * @throws IOException
     *             if the substitution cannot be performed
     */
    RedactableObject doSubstitution(String type, List<String> modifiers, String value,
            SubstitutableModuleOptions substitutableModuleOptions) throws IOException;
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.smo.SubstituterHelper
collapsetrue
package org.apache.kafka.common.security.oauthbearer.smo;

/**
 * A template {@code ModuleOptionSubstituter} that handles the following
 * modifiers:
 * <ul>
 * <li>{@code redact} -- when enabled, values are stored such that they are
 * prevented from being logged</li>
 * <li>{@code notBlank} -- when enabled, blank (only whitespace) or non-existent
 * values are replaced by default values. Implies {@code notEmpty}.</li>
 * <li>{@code notEmpty} -- when enabled, either explicitly or via
 * {@code notBlank}, empty {@code ""} or non-existent values are replaced by
 * default values.</li>
 * <li>{@code fromOption} -- when enabled, provides a level of indirection such
 * that the option's value is taken as an option name from which the actual
 * value will be retrieved</li>
 * <li>{@code defaultValue=<value>} -- when enabled, the provided value is used
 * as a default value in case the requested value either does not exist or is
 * disallowed via {@code notBlank} or {@code notEmpty}</li>
 * <li>{@code defaultOption=<value>} -- when enabled, the indicated option is
 * evaluated as a default value in case the requested value either does not
 * exist or is disallowed via {@code notBlank} or {@code notEmpty}</li>
 * </ul>
 *
 * Flags (modifiers without an equal sign) are trimmed, so "{@code redact}" and
 * "{@code  redact }" are recognized as being the same. Arguments (modifiers
 * with an equal sign) have their name trimmed but not their value, so
 * "{@code name=value}" and "{@code  name = value }" are both recognized as
 * setting the {@code name} argument (though their values do not match due to
 * whitespace differences).
 * <p>
 * It is an error to set the same named argument multiple times (even if the
 * values are the same). Redundantly specifying the same flag is acceptable.
 * <p>
 * Flags and arguments are presented to the substitution's implementation via
 * the
 * {@link #retrieveResult(String, String, boolean, SubstitutableModuleOptions, Set, Map)}
 * method.
 */
public abstract class SubstituterHelper implements ModuleOptionSubstituter {
    /**
     * Retrieve the substitution result associated with the given key, if any,
     * otherwise null
     *
     * @param type
     *            the (always non-null) type of substitution to perform
     * @param key
     *            the required key
     * @param redact
     *            if the result must be redacted regardless of any information to
     *            the contrary
     * @param substitutableModuleOptions
     *            the module options and their current substitution state
     * @param flags
     *            the flags specified, if any, beyond the standard {@code redact},
     *            {@code notBlank}, {@code notEmpty}, and {@code fromOption} flags
     * @param args
     *            the arguments specified, if any, beyond the standard
     *            {@code defaultValue} and {@code defaultOption} arguments
     * @return the substitution result associated with the given key, if any,
     *         otherwise null
     * @throws IOException
     *             if the request cannot be performed such that the use of a default
     *             value would be inappropriate
     */
    public abstract RedactableObject retrieveResult(String type, String key, boolean redact,
            SubstitutableModuleOptions substitutableModuleOptions, Set<String> flags, Map<String, String> args)
            throws IOException;


    @Override
    public RedactableObject doSubstitution(String type, List<String> modifiers, String value,
            SubstitutableModuleOptions substitutableModuleOptions) throws IOException {
        // etc...
    }
 
    // etc...
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.smo.OptionSubstituter
collapsetrue
package org.apache.kafka.common.security.oauthbearer.smo;

/**
 * A {@code ModuleOptionSubstituter} that handles module option substitution.
 * <p>
 * Note that the {@code fromOption} modifier, if present, still adds an
 * additional level of indirection: the value indicates the option whose value,
 * in turn, indicates the option text to retrieve.
 */
public class OptionSubstituter implements ModuleOptionSubstituter {
    private final SubstituterHelper substituterHelper;
    public OptionSubstituter() {
        substituterHelper = new SubstituterHelper() {
            @Override
            public RedactableObject retrieveResult(String type, String key, boolean redact,
                    SubstitutableModuleOptions substitutableModuleOptions, Set<String> flags, Map<String, String> args)
                    throws IOException {
                if (!flags.isEmpty() || !args.isEmpty())
                    throw new IOException(String.format("Unknown extra flags/args: %s; %s", flags, args));
                RedactableObject substitutionResult = SubstitutableModuleOptionsCallbackHandler
                        .getSubstitutionResult(substitutableModuleOptions, key, false);
                return redact ? substitutionResult.redactedVersion() : substitutionResult;
            }
        };
    }
    @Override
    public RedactableObject doSubstitution(String type, List<String> modifiers, String value,
            SubstitutableModuleOptions substitutableModuleOptions) throws IOException {
        return substituterHelper.doSubstitution(type, modifiers, value, substitutableModuleOptions);
    }
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.smo.FileContentSubstituter
collapsetrue
package org.apache.kafka.common.security.oauthbearer.smo;
 
/**
 * A {@code ModuleOptionSubstituter} that handles file content substitution.
 */
public class FileContentSubstituter implements ModuleOptionSubstituter {
    private final SubstituterHelper substituterHelper;
 
    public FileContentSubstituter() {
        substituterHelper = new SubstituterHelper() {
            @Override
            public RedactableObject retrieveResult(String type, String key, boolean redact,
                    SubstitutableModuleOptions substitutableModuleOptions, Set<String> flags, Map<String, String> args)
                    throws IOException {
                if (!flags.isEmpty() || !args.isEmpty())
                    throw new IOException(String.format("Unknown extra flags/args: %s; %s", flags, args));
                Path path = Paths.get(key);
                if (!Files.exists(path))
                    return null;
                BasicFileAttributes fileAttributes = Files.getFileAttributeView(path, BasicFileAttributeView.class)
                        .readAttributes();
                if (!fileAttributes.isRegularFile())
                    return null;
                long fileSize = fileAttributes.size();
                int maxSize = 1024 * 1024;
                if (fileSize > maxSize) {
                    throw new IOException(String.format("Type=%s: key=%s: file size exceeds max of %d: %d", type, key,
                            maxSize, fileSize));
                }
                String retval = new String(Files.readAllBytes(path), StandardCharsets.UTF_8);
                return new RedactableObject(retval, redact);
            }
        };
    }

    @Override
    public RedactableObject doSubstitution(String type, List<String> modifiers, String value,
            SubstitutableModuleOptions substitutableModuleOptions) throws IOException {
        return substituterHelper.doSubstitution(type, modifiers, value, substitutableModuleOptions);
    }
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.smo.SystemPropertySubstituter
collapsetrue
package org.apache.kafka.common.security.oauthbearer.smo;
 
/**
 * A {@code ModuleOptionSubstituter} that handles system property substitution.
 * The JAAS config file already supports system property substitution via the
 * '{@code ${}' and '}' delimiters, but we support it here for syntactic
 * consistency as well as to add the functionality associated with modifiers.
 */
public class SystemPropertySubstituter implements ModuleOptionSubstituter {
    private final SubstituterHelper substituterHelper;

    public SystemPropertySubstituter() {
        substituterHelper = new SubstituterHelper() {
            @Override
            public RedactableObject retrieveResult(String type, String key, boolean redact,
                    SubstitutableModuleOptions substitutableModuleOptions, Set<String> flags, Map<String, String> args)
                    throws IOException {
                if (!flags.isEmpty() || !args.isEmpty())
                    throw new IOException(String.format("Unknown extra flags/args: %s; %s", flags, args));
                String retval = System.getProperty(key);
                return retval != null ? new RedactableObject(retval, redact) : null;
            }
        };
    }

    @Override
    public RedactableObject doSubstitution(String type, List<String> modifiers, String value,
            SubstitutableModuleOptions substitutableModuleOptions) throws IOException {
        return substituterHelper.doSubstitution(type, modifiers, value, substitutableModuleOptions);
    }
}
Code Block
languagejava
titleorg.apache.kafka.common.security.oauthbearer.smo.EnvironmentVariableSubstituter
collapsetrue
package org.apache.kafka.common.security.oauthbearer.smo;
 
/**
 * A {@code ModuleOptionSubstituter} that handles environment variable
 * substitution
 */
public class EnvironmentVariableSubstituter implements ModuleOptionSubstituter {
    private final SubstituterHelper substituterHelper;


    public EnvironmentVariableSubstituter() {
        substituterHelper = new SubstituterHelper() {
            @Override
            public RedactableObject retrieveResult(String type, String key, boolean redact,
                    SubstitutableModuleOptions substitutableModuleOptions, Set<String> flags, Map<String, String> args)
                    throws IOException {
                if (!flags.isEmpty() || !args.isEmpty())
                    throw new IOException(String.format("Unknown extra flags/args: %s; %s", flags, args));
                String retval = System.getenv(key);
                return retval != null ? new RedactableObject(retval, redact) : null;
            }
        };
    }

    @Override
    public RedactableObject doSubstitution(String type, List<String> modifiers, String value,
            SubstitutableModuleOptions substitutableModuleOptions) throws IOException {
        return substituterHelper.doSubstitution(type, modifiers, value, substitutableModuleOptions);
    }
}

Given the above code, and assuming the existence of the following mapping in the module options map:

    thePassword="$[file|redact|notBlank|=/path/to/secrets/the_secret]"

we can retrieve the contents of the file as follows:

Code Block
languagejava
titleRetrieving Values via a Callback
SubstitutableModuleOptions options = new SubstitutableModuleOptions(moduleOptionsMap);
boolean requiredToExist = true;
SubstitutableModuleOptionsCallback callback = new SubstitutableModuleOptionsCallback(options, "thePassword", requiredToExist);
CallbackHandler callbackHandler = new SubstitutableModuleOptionsCallbackHandler()
callbackHandler.handle(new Callback[] {callback});
String thePassword = callback.substitutionResult().value();

Alternatively, there is a convenience method that allows us to avoid using a callback if we wish:

Code Block
languagejava
titleRetrieving Values without a Callback
SubstitutableModuleOptions options = new SubstitutableModuleOptions(moduleOptionsMap);
boolean requiredToExist = true;
String thePassword = SubstitutableModuleOptionsCallbackHandler.getSubstitutionResult(substitutableModuleOptions,
        "thePassword", requiredToExist).value();

The initial set of supported substitution types and their specifiable constraints are as follows:

TypeDescriptionSpecifiable ConstraintsNotes
fileFile content substitution

notBlank, notEmpty, redact, fromOption

defaultValue=<value>,
defaultOption=<optionName>

It is an error if the file does not exist or is not readable unless defaultValue or defaultOption specified. If a defaultValue is specified then the literal default value specified will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If a defaultOption is specified then the value defined by the specified option will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the default option's value depends on substitutions that were marked redact then redact is implied. The fromOption modifier indicates that the value, instead of being the file to read, is instead the name of an option whose value is to be taken as the filename. This provides the ability to generate filenames from multiple substitutions as opposed to being forced to literally specify it.
envvarEnvironment variable substitutionsame as aboveIt is an error if the environment variable does not exist unless defaultValue or defaultOption specified. If a defaultValue is specified then the literal default value specified will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If a defaultOption is specified then the value defined by the specified option will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the default option's value depends on substitutions that were marked redact then redact is implied. The fromOption modifier indicates that the value, instead of being the environment variable to read, is instead the name of an option whose value is to be taken as the environment variable name. This provides the ability to generate environment variable names from multiple substitutions as opposed to being forced to literally specify it.
optionAnother module option value substitutionsame as aboveIt is an error if the option does not exist unless defaultValue or defaultOption specified. If a defaultValue is specified then the literal default value specified will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If a defaultOption is specified then the value defined by the specified option will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the default option's value depends on substitutions that were marked redact then redact is implied. The fromOption modifier indicates that the value, instead of being the option to read, is instead the name of an option whose value is to be taken as the option name. This provides the ability to generate option names from multiple substitutions as opposed to being forced to literally specify it.
syspropSystem property substitutionsame as aboveIt is an error if the system property does not exist unless defaultValue or defaultOption specified. If a defaultValue is specified then the literal default value specified will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If a defaultOption is specified then the value defined by the specified option will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the default option's value depends on substitutions that were marked redact then redact is implied. The fromOption modifier indicates that the value, instead of being the system property to read, is instead the name of an option whose value is to be taken as the system property name. This provides the ability to generate system property names from multiple substitutions as opposed to being forced to literally specify it.

To add new substitutions simply define a module option of the following form:

typeModuleOptionSubstituter = "fully.qualified.class.name"

For example:

fooModuleOptionSubstituter = "org.example.FooSubstituter"

The indicated class must implement the org.apache.kafka.common.security.oauthbearer.smo.ModuleOptionSubstituter interface.

Invoke the substitution with text in an option like this:

$[foo/optional/modifiers/=optionalValue]

 

Note that the above substitution functionality and the related classes and interfaces are not necessarily specific to OAuth Bearer Tokens, so an open question is whether this substitution-related code belongs in a sub-package of the main SASL/OAUTHBEARER one or if it should live somewhere else.

 

Token Retrieval

Token Retrieval

Token Retrieval

Token retrieval occurs on the client side of the SASL negotiation (in the producer/consumer, or on the broker when SASL/OAUTHBEARER is the inter-broker protocol), and the org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule class is the LoginModule implementation that creates and invokes an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenRetriever to perform the retrieval.  We provide the org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever class as a sample implementation that also provides value in development and testing situations.

...

Exceptions
Anchor
Rejected Alternatives Exceptions
Rejected Alternatives Exceptions

...

Unchecked exceptions can be added or deleted without breaking binary compatibility, and Kafka uses unchecked exceptions exclusively, so

...

OAuthBearerException is unchecked.  We considered making OAuthBearerException a checked exception to get the compiler help provided by the use of checked exceptions

...

because the set of exception types and the size of the overall code base are both small

...

such that the risk that we would want to change the type(s) of exception(s) thrown is minimal.  Nonetheless, we agreed the benefit of consistency and eliminating the risk of breaking binary compatibility outweighed the benefit of the compiler help provided by checked exceptions.

Token Refresh
Anchor
Rejected Alternatives Token Refresh
Rejected Alternatives Token Refresh

...

We considered associating refresh-related properties (such as the minimum refresh period in milliseconds) with the ExpiringCredential rather than the ExpiringCredentialRefreshingLogin instance because the ExpiringCredentialRefreshingLogin instance couldn't know which of the potentially multiple login modules actually applies (i.e. which is the one associated with the inter-broker protocol); it wouldn't always know how to find the JAAS config options, so it wouldn't always know how to get the refresh configuration.  There was problem with this aproach, though: we can't invoke login() on the LoginContext and get an ExpiringCredential instance without a CallbackHandler, so we needed to know the type of CallbackHandler to instantiate – and there is no way to know that.  It simply made sense to give the ExpiringCredentialRefreshingLogin instance the ability to discover the correct login module in all cases and be able to ask it for the CallbackHandler instance; hence we created the ExpiringCredentialLoginModule interface.

...

.