...
Note that the access token can be made available to the broker for authorization decisions due to KIP-189 (by exposing the access token on the SaslServer
implementation), but detailed discussion of this possibility is outside the scope of this proposal. It is noted, however, that if access tokens are somehow used for authorization decisions, it is conceivable due to the long-lived nature of Kafka connections that authorization decisions will sometimes be made using expired access tokens. For example, it is up to the broker to validate the token upon authentication, but the token will not be replaced for that particular connection as long as it remains intact; if the token expires in an hour then authorization decisions for that first hour will be made using the still-valid token, but after an hour the expired token would remain associated with the connection, and authorization decisions from that point forward for that particular connection would be made using the expired token. This would have to be addressed via a separate KIP if it turns out to be problematic, but that seems unlikely (code signing certificates that have been timestamped remain valid after their expiration, for example, and access tokens are indeed timestamped).
Note also that the implementation of flexible, substitution-aware configuration that was originally proposed in an early draft of this KIP was deemed more generally useful and has been separated out into its own KIP-269 Substitution Within Configuration Values, which is now a prerequisite for this one.
Public Interfaces
Note that most of the implementation of this KIP will be public-facing. The following sections define the various parts, and each includes an overall UML diagram as well as important code details (with Javadoc).
...
See Rejected Alternatives: Exceptions
We define a small exception hierarchy to cover the various cases related to the SASL/OAUTHBEARER code.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer; /** * Base class for all exceptions thrown by the SASL/OAUTHBEARER code. Unlike the * unchecked {@link KafkaException} hierarchy, this hierarchy can be checked to * obtain compiler help because instances are never exposed to the core Kafka * code base and there is minimal risk of changing the set of thrown checked * exceptions and impacting contract/source code compatibility given the small * size of this code base. */ public abstract class OAuthBearerException extends ExceptionKafkaException { // etc... } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer; /** * Exception thrown when there is a problem with the configuration (an invalid * option in a JAAS config, for example). */ public class OAuthBearerConfigException extends OAuthBearerException { // etc... } |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer; /** * JSON Web Token Claims * * @see <a href="https://www.iana.org/assignments/jwt/jwt.xhtml">JSON Web Token * Claims</a> */ public enum JwtClaim { ISSUER("iss"), SUBJECT("sub"), AUDIENCE("aud"), EXPIRATION_TIME("exp"), NOT_BEFORE("nbf"), ISSUED_AT("iat"), JWT_ID("jti"); private String claimName; private JwtClaim(String claimName) { this.claimName = claimName; } public String claimName() { return claimName; } } |
Substitutable Module Options
See Rejected Alternatives: Substitutable Module Options
The mechanics of token retrieval and token validation (both described later) will differ across OAuth deployments. For example, for token retrieval, each deployment will inject credentials to the token endpoint differently, and the parameters sent to the token endpoint may also differ. Token validation will also differ because OAuth supports multiple methods of validation. Configuration of the retrieval and validation mechanisms, which are done via JAAS configuration, must therefore be flexible. In particular, while we collaboratively use instances implementing javax.security.auth.Callback
and javax.security.auth.CallbackHandler
to retrieve information, we can't know in advance what information will be required in order to retrieve or validate a token; a username and password might – or might not – be required, for example, and retrieval and validation will likely require much more as well. We also don't know where this information will come from: a file? an environment variable? a database? Somewhere else? We need implementations of the Callback
and CallbackHandler
interfaces that are just as flexible as we need the JAAS configuration to be.
JAAS config options (each of which is an individual element of the space-separated ModuleOptions list), in combination with appropriate Callback
and CallbackHandler
implementations, will support arbitrarily complex retrieval and substitution inside the option value. The JAAS Configuration spec already supports system property substitution via the ${system.property}
syntax; we will implement support for arbitrarily complex substitutions. For example, the following would support substitution of the contents of a file (which is a common way to store secrets, especially within containers) into the option value:
thePassword="$[file|redact|notBlank|defaultOption=fileDefault|=/path/to/secrets/the_secret]"
There are several features here that deserve comment:
- The "
$[
" and "]
" delimiters are the signal to perform a substitution (we can't use "${
" and "}
" because that is already defined by the JAAS Configuration spec to mean system property substitution). Note that we will not allow substitution within a substitution, and in fact it is not needed as described below. We will also support "$[[
" and "]]
" as delimiters (all the way up to 5 brackets, actually) to allow "$[
" and "]
" to appear in text without causing substitution. - Immediately inside the opening delimiter is the type of substitution followed by any optional modifiers we wish to apply. In the above, we identify this as a file substitution and we indicate three modifiers: the resulting value should never be logged (i.e. store it such that its value wil be redacted when logged); the contents of the file must not be blank (meaning it must not be empty or only contain whitespace); if the file does not exist or its contents are blank then use the value of the "
fileDefault
" option (which could itself have substitutions). It is an error if any constraints implied by the modifiers are violated. Any punctuation character except the equal sign (=
) can be used to delimit the modifiers. - Immediately after the type of substitution and any optional modifiers is an equal sign ("
=
") followed by the value (which in the case of the "file
" type is interpreted as the filename to read); then ultimately the closing delimiter appears.
This scheme is flexible and powerful; it handles most cases, but it remains relatively easy to create and read. Importantly, the types of replacements can be expanded in the future without breaking compatibility. See the Javadoc for the org.apache.kafka.common.security.oauthbearer.smo.SubstitutableModuleOptionsCallbackHandler
class for more details.
How would we retrieve the above value? We define the following representation of the module options and their substitution state along with associated Callback
and CallbackHandler
implementations and the org.apache.kafka.common.security.oauthbearer.smo.RedactableObject
classs for holding the results. We also define several built-in substitutions and a mechanism for adding new ones.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer.smo;
/**
* Holds state regarding which {@code LoginModule} <a href=
* "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
* options</a> have been substituted -- both completely (via
* {@link #substitutionResults}) as well as in-progress (via
* {@link #withOptionEvaluationCausingCircularReference(String)}). Instances of
* this class are thread-safe.
*
* @see SubstitutableModuleOptionsCallbackHandler
* @see SubstitutableModuleOptionsCallback
* @see RedactableObject
*/
public class SubstitutableModuleOptions {
private final Map<String, String> moduleOptionsMap;
private final ConcurrentHashMap<String, RedactableObject> substitutionResults;
private final SubstitutableModuleOptions delegate;
private final String optionCausingCircularReference;
/**
* Constructor
*
* @param moduleOptionsMap
* the mandatory map representation of the <a href=
* "https://docs.oracle.com/javase/9/docs/api/javax/security/auth/login/Configuration.html">module
* options</a>
*/
public SubstitutableModuleOptions(Map<String, String> moduleOptionsMap) {
this.moduleOptionsMap = Collections.unmodifiableMap(Objects.requireNonNull(moduleOptionsMap));
this.substitutionResults = new ConcurrentHashMap<>();
this.delegate = null;
this.optionCausingCircularReference = null;
}
/**
* Constructor to create a new instance based on the given instance except that
* the given option is marked as causing a circular reference if an attempt is
* made to evaluate it
*
* @param delegate
* the mandatory instance from which the view will be created
* @param optionCausingCircularReference
* the mandatory option that causes a circular reference if it is
* evaluated
* @see #withOptionEvaluationCausingCircularReference(String)
* @see #optionEvaluationCausesCircularReference(String)
*/
public SubstitutableModuleOptions(SubstitutableModuleOptions delegate, String optionCausingCircularReference) {
this.moduleOptionsMap = delegate.moduleOptionsMap;
this.substitutionResults = delegate.substitutionResults;
this.delegate = delegate;
this.optionCausingCircularReference = optionCausingCircularReference;
}
/**
* Return (an unmodifiable copy of) the original module options map provided
* during construction
*
* @return (an unmodifiable copy of) the original module options map provided
* during construction
*/
public Map<String, String> moduleOptionsMap() {
return moduleOptionsMap;
}
/**
* Convenience method to indicate if a particular option name exists, returning
* true if it does, otherwise false
*
* @param optionName
* the mandatory option name
* @return true if the indicated option exists, otherwise false
*/
public boolean optionExists(String optionName) {
return moduleOptionsMap.containsKey(Objects.requireNonNull(optionName));
}
/**
* Create and return a new view of this instance where in that context the given
* option is in the process of being evaluated such that an attempt to evaluate
* its substitution value again within the same context would cause a circular
* reference
*
* @param optionName
* the mandatory option name
* @return a new view of this instance where in that context the given option is
* in the process of being evaluated such that an attempt to evaluate
* its substitution value again within the same context would cause a
* circular reference
* @see #optionEvaluationCausesCircularReference(String)
*/
public SubstitutableModuleOptions withOptionEvaluationCausingCircularReference(String optionName) {
return new SubstitutableModuleOptions(this, Objects.requireNonNull(optionName));
}
/**
* Indicate if, in the context of this instance's view, the given option is in
* the process of being evaluated; return true if an attempt to evaluate the
* option's substitution value would cause a circular reference, otherwise false
*
* @param optionName
* the mandatory option name
* @return true if an attempt to evaluate the option's substitution value would
* cause a circular reference, otherwise false
* @see #withOptionEvaluationCausingCircularReference(String)
*/
public boolean optionEvaluationCausesCircularReference(String optionName) {
return Objects.requireNonNull(optionName).equals(optionCausingCircularReference)
|| (delegate != null && delegate.optionEvaluationCausesCircularReference(optionName));
}
/**
* Return an unmodifiable map identifying which module options have been
* processed for substitution and the result (if any). A module option is
* guaranteed to have been processed for substitution and its name will appear
* as a key in the returned map only after
* {@link #setSubstitutionResult(String, RedactableObject)} is called.
*
* @return an unmodifiable map identifying which module options have been
* processed for substitution and the result (if any)
*/
public Map<String, RedactableObject> substitutionResults() {
return Collections.unmodifiableMap(substitutionResults);
}
/**
* Identify that the option with the given name has had substitution performed
* for it yielding the given result. This method is idempotent; invoking it with
* a substitution result equal to the current substitution result (as defined by
* {@code Object.equals()}) has no effect. The substitution result for an option
* cannot be changed (again, as defined by {@code Object.equals()}) once it has
* been set; an attempt to do so will raise an exception.
*
* @param optionName
* the mandatory option name, which must exist in the map returned by
* {@link #moduleOptionsMap()}
* @param substitutionResult
* the mandatory substitution result to set
*/
public void setSubstitutionResult(String optionName, RedactableObject substitutionResult) {
if (!moduleOptionsMap.containsKey(Objects.requireNonNull(optionName))) {
throw new IllegalArgumentException(String.format("Unknown module option name: %s", optionName));
}
RedactableObject priorSubstitutionResult = substitutionResults.putIfAbsent(optionName,
Objects.requireNonNull(substitutionResult));
if (priorSubstitutionResult != null && !priorSubstitutionResult.equals(substitutionResult))
throw new IllegalArgumentException(String
.format("Cannot change the substitution result for the module option with name '%s'", optionName));
}
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer.smo;
/**
* A {@code Callback} related to introspection requests against a JAAS
* configuration
*
* @see SubstitutableModuleOptionsCallbackHandler
* @see SubstitutableModuleOptions
*/
public class SubstitutableModuleOptionsCallback implements Callback {
private final SubstitutableModuleOptions substitutableModuleOptions;
private final String optionName;
private final boolean optionRequiredToExist;
/**
* Constructor
*
* @param substitutableModuleOptions
* the mandatory substitutable module options
* @param optionName
* the requested option
* @param optionRequiredToExist
* if true then the requested option is required to exist
*/
public SubstitutableModuleOptionsCallback(SubstitutableModuleOptions substitutableModuleOptions, String optionName,
boolean optionRequiredToExist) {
this.substitutableModuleOptions = Objects.requireNonNull(substitutableModuleOptions);
this.optionName = Objects.requireNonNull(optionName);
this.optionRequiredToExist = optionRequiredToExist;
}
/**
* Return the substitutable module options provided at construction time
*
* @return the substitutable module options provided at construction time
*/
public SubstitutableModuleOptions substitutableModuleOptions() {
return substitutableModuleOptions;
}
/**
* Return the option name provided at construction time
*
* @return the option name provided at construction time
*/
public String optionName() {
return optionName;
}
/**
* Return true if the the requested option is required to exist, otherwise false
*
* @return true if the the requested option is required to exist, otherwise
* false
*/
public boolean optionRequiredToExist() {
return optionRequiredToExist;
}
/**
* Identify that the option identified by {@link #optionName()}, on the instance
* returned by {@link #substitutableModuleOptions()}, has had substitution
* performed for it yielding the given result. This method is idempotent;
* invoking it with a substitution result equal to the option's current
* substitution result (as defined by {@code Object.equals()}) has no effect.
* The substitution result for an option cannot be changed (again, as defined by
* {@code Object.equals()}) once it has been set; an attempt to do so will raise
* an exception.
*
* @param substitutionResult
* the mandatory substitution result to set
*/
public void setSubstitutionResult(RedactableObject substitutionResult) {
substitutableModuleOptions.setSubstitutionResult(optionName, Objects.requireNonNull(substitutionResult));
}
/**
* Return the substitution result, if any has been set, otherwise null. Note
* that the result may not have been set via a call to
* {@link #setSubstitutionResult(RedactableObject)}; it is possible that the
* result was already set prior to construction of this instance, in which case
* that result will be returned here.
*
* @return the substitution result, if any has been set, otherwise null
*/
public RedactableObject substitutionResult() {
return substitutableModuleOptions.substitutionResults().get(optionName);
}
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer.smo;
/**
* A {@code CallbackHandler} that handles introspection requests against a JAAS
* configuration and supports substitution into option values via delimited text
* of the following form:
*
* <pre>
* <OPENING_DELIMITER><TYPE><OPTIONAL_MODIFIERS>=<OPTIONAL_VALUE><CLOSING_DELIMITER>
* </pre>
*
* Where the above elements are defined as follows:
*
* <pre>
* OPENING_DELIMITER: $[, $[[, $[[[, $[[[[, or $[[[[[
* CLLOSING_DELIMITER: ], ]], ]]], ]]]], or ]]]]] (number of brackets must match)
*
* TYPE: everything up to (but not including) the first punctuation character
*
* OPTIONAL_MODIFIERS: the optional section immediately after the TYPE, starting with any
* punctuation character except for the equal sign (=), and ending
* with the same punctuation character followed immediately by an
* equal sign. The same punctuation character delimits individual
* modifiers, which come in two flavors: flags, which do not contain
* an equal sign, and name=value arguments, which do.
*
* OPTIONAL_VALUE: the optional section immediately after the modifier section-delimiting
* punctuation character (if any) and the equal sign (=). It is not trimmed.
* </pre>
*
* For example:
*
* <pre>
* $[envvar=THE_ENV_VAR]
* $[envvar/notBlank/redact/=THE_ENV_VAR]
* $[envvar/defaultValue = theDefaultValue/=THE_ENV_VAR]
* $[envvar/defaultOption = theOptionName/=THE_ENV_VAR]
* $[file|redact|notBlank|=/the/path/to/the/file]
* </pre>
*
* Working left to right, once the delimiter is defined for a module option (for
* example, {@code $[} and {@code ]}), only that delimiter is recognized for the
* rest of that option (and it is always recognized as meaning substitution for
* the rest of that module option). A special "empty" substitution does nothing
* except, when it appears to the left of every other occurrence of matching
* delimiters, it serves to force the delimiter for that module option to the
* one indicated . For example, to force the delimiter to {@code $[[} and
* {@code ]]} (and prevent {@code $[} and {@code ]} from causing substitution)
* for a module option:
*
* <pre>
* optionName = "$[[]]These $[ and ] delimiters do not cause substitution"
* </pre>
*
* The following substitutions are supported, though it is straightforward to
* add others (see below):
*
* <ul>
* <li>{@code envvar}: substitute the value of an environment variable</li>
* <li>{@code sysprop}: substitute the value of a system property</li>
* <li>{@code file}: substitute the contents of a file</li>
* <li>{@code option}: substitute the contents of another option</li>
* </ul>
*
* The above substitutions support the following modifiers:
*
* <ul>
* <li>{@code redact}: prevent values from being logged</li>
* <li>{@code notEmpty}: the value must not be empty</li>
* <li>{@code notBlank}: the value must not be blank (i.e. consisting only of
* whitespace); implies {@code notEmpty}.</li>
* <li>{@code defaultValue=<value>}: substitute the given literal value if the
* substitution cannot otherwise be made (either because the value does not
* exist or the determined value was disallowed because it was empty or blank).
* The substituted value must satisfy any modifiers that act as
* constraints.</li>
* <li>{@code defaultOption=<optionName>}: substitute the value of the indicated
* option if the substitution cannot otherwise be made (either because the value
* does not exist or the determined value was disallowed because it was empty or
* blank). The substituted value must satisfy any modifiers that act as
* constraints.</li>
* <li>{@code fromOption}: provides a level of indirection so that the value,
* instead of always being literally specified (i.e. read this file, or this
* environment variable), can be determined via some other option. This allows,
* for example, the filename, system property name, etc. to be generated
* potentially from multiple substitutions concatenated together.</li>
* </ul>
*
* To add new substitutions simply define a module option of the following form:
*
* <pre>
* typeModuleOptionSubstituter = "fully.qualified.class.name"
* </pre>
*
* For example:
*
* <pre>
* fooModuleOptionSubstituter = "org.example.FooSubstituter"
* </pre>
*
* The indicated class must implement {@link ModuleOptionSubstituter}, and you
* can invoke the substitution like this:
*
* <pre>
* $[foo/optional/modifiers/=optionalValue]
* </pre>
*
* @see SubstitutableModuleOptionsCallback
* @see SubstitutableModuleOptions
* @see ModuleOptionSubstituter
*/
public class SubstitutableModuleOptionsCallbackHandler implements AuthenticateCallbackHandler {
// etc...
@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (Callback callback : callbacks) {
if (callback instanceof SubstitutableModuleOptionsCallback) {
SubstitutableModuleOptionsCallback substitutableModuleOptionsCallback = (SubstitutableModuleOptionsCallback) callback;
RedactableObject substitutionResult = getSubstitutionResult(
substitutableModuleOptionsCallback.substitutableModuleOptions(),
substitutableModuleOptionsCallback.optionName(),
substitutableModuleOptionsCallback.optionRequiredToExist(), forceDebugLogForTesting);
if (substitutionObject != null)
// depend on idempotence here because the substitution object has already been
// set on the underlying SubstitutableModuleOptions instance
substitutableModuleOptionsCallback.setSubstitutionResult(substitutionObject);
} else
throw new UnsupportedCallbackException(callback,
String.format("Unrecognized Callback type: %s", callback.getClass().getName()));
}
}
/**
* Perform substitution without using callbacks and a callback handler.
*
* @param substitutableModuleOptions
* the mandatory substitutable module options to query
* @param optionName
* the mandatory requested option name
* @param optionRequiredToExist
* if true then the requested option is required to exist
* @return the given option's substitution result, after any required
* substitution is applied, or null if the option does not exist and it
* was not required to exist
* @throws IOException
* if a required substitution cannot be performed, including if the
* given (or any other) required option does not exist
*/
public static RedactableObject getSubstitutionResult(SubstitutableModuleOptions substitutableModuleOptions,
String optionName, boolean optionRequiredToExist) throws IOException {
// etc...
}
/**
* Return the server configuration provided during
* {@link #configure(Map, String, List)}, if any, otherwise null
*
* @return the server configuration provided during
* {@link #configure(Map, String, List)}, if any, otherwise null
*/
public Map<String, ?> serverConfig() {
return serverConfig;
}
/**
* Return the SASL mechanism provided during
* {@link #configure(Map, String, List)}, if any, otherwise null
*
* @return the SASL mechanism provided during
* {@link #configure(Map, String, List)}, if any, otherwise null
*/
public String mechanism() {
return mechanism;
}
/**
* Return the JAAS login module configurations provided during
* {@link #configure(Map, String, List)}, if any, otherwise null
*
* @return the JAAS login module configurations provided during
* {@link #configure(Map, String, List)}, if any, otherwise null
*/
public List<AppConfigurationEntry> jaasConfigEntries() {
return jaasConfigEntries;
}
// etc...
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer.smo;
/**
* An object whose text value can be redacted
*/
public class RedactableObject {
static final String REDACTED = "[redacted]";
private final Object object;
private final boolean redacted;
/**
* Constructor
*
* @param object
* the mandatory object
* @param redact
* when true the object's value will be redacted in
* {@link #redactedText()}
*/
public RedactableObject(Object object, boolean redact) {
this.object = Objects.requireNonNull(object);
this.redacted = redact;
}
/**
* Return the (always non-null) underlying object provided during instance
* construction
*
* @return the (always non-null) underlying object provided during instance
* construction
*/
public Object object() {
return object;
}
/**
* Return true if this instance contains information that will be redacted when
* {@link #redactedText()} is invoked, otherwise false
*
* @return true if this instance contains information that will be redacted when
* {@link #redactedText()} is invoked, otherwise false
*/
public boolean isRedacted() {
return redacted;
}
/**
* Return the redacted text for this instance, if redaction is required,
* otherwise return the {@link #value()}
*
* @return the redacted text for this instance, if redaction is required,
* otherwise return the {@link #value()}
*/
public String redactedText() {
return redacted ? REDACTED : value();
}
/**
* Return the {@code String} value of this instance, including information that
* would otherwise be redacted
*
* @return the {@code String} value of this instance, including information that
* would otherwise be redacted
*/
public String value() {
if (object instanceof String)
return (String) object;
throw new IllegalStateException(
String.format("Unknown substitution result object type: %s", object.getClass().getName()));
}
/**
* Return true if this result is considered to be empty, otherwise false
*
* @return true if this result is considered to be empty, otherwise false
*/
public boolean isEmpty() {
return value().isEmpty();
}
/**
* Return true if this result is considered to be blank (containing at most just
* whitespace), otherwise false
*
* @return true if this result is considered to be blank (containing at most
* just whitespace), otherwise false
*/
public boolean isBlank() {
return value().trim().isEmpty();
}
/**
* Return this instance if it is redacted according to {@link #isRedacted()},
* otherwise return a new, redacted instance with the same underlying object
*
* @return this instance if it is redacted according to {@link #isRedacted()},
* otherwise return a new, redacted instance with the same underlying
* object
*/
public RedactableObject redactedVersion() {
return redacted ? this : new RedactableObject(object, true);
}
@Override
public String toString() {
// be sure to redact information as required
return redactedText();
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof RedactableObject))
return false;
RedactableObject other = (RedactableObject) obj;
/*
* Note that differences in redaction cause inequality
*/
return redacted == other.redacted && object.equals(other.object);
}
@Override
public int hashCode() {
return object.hashCode();
}
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer.smo;
/**
* The interface that pluggable module option substituter classes must
* implement.
*
* @see SubstitutableModuleOptionsCallbackHandler
*/
public interface ModuleOptionSubstituter {
/**
* Perform the substitution of the given type on the given options using the
* given modifiers and value
*
* @param type
* the (always non-null) type of substitution to perform
* @param modifiers
* the (always non-null but potentially empty) modifiers to apply, if
* any. They are presented exactly as they appear in the
* configuration, with no whitespace trimming applied.
* @param value
* the always non-null (but potentially empty) value
* @param substitutableModuleOptions
* the module options and their current substitution state
* @return the (always non-null) result of performing the substitution
* @throws IOException
* if the substitution cannot be performed
*/
RedactableObject doSubstitution(String type, List<String> modifiers, String value,
SubstitutableModuleOptions substitutableModuleOptions) throws IOException;
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer.smo;
/**
* A template {@code ModuleOptionSubstituter} that handles the following
* modifiers:
* <ul>
* <li>{@code redact} -- when enabled, values are stored such that they are
* prevented from being logged</li>
* <li>{@code notBlank} -- when enabled, blank (only whitespace) or non-existent
* values are replaced by default values. Implies {@code notEmpty}.</li>
* <li>{@code notEmpty} -- when enabled, either explicitly or via
* {@code notBlank}, empty {@code ""} or non-existent values are replaced by
* default values.</li>
* <li>{@code fromOption} -- when enabled, provides a level of indirection such
* that the option's value is taken as an option name from which the actual
* value will be retrieved</li>
* <li>{@code defaultValue=<value>} -- when enabled, the provided value is used
* as a default value in case the requested value either does not exist or is
* disallowed via {@code notBlank} or {@code notEmpty}</li>
* <li>{@code defaultOption=<value>} -- when enabled, the indicated option is
* evaluated as a default value in case the requested value either does not
* exist or is disallowed via {@code notBlank} or {@code notEmpty}</li>
* </ul>
*
* Flags (modifiers without an equal sign) are trimmed, so "{@code redact}" and
* "{@code redact }" are recognized as being the same. Arguments (modifiers
* with an equal sign) have their name trimmed but not their value, so
* "{@code name=value}" and "{@code name = value }" are both recognized as
* setting the {@code name} argument (though their values do not match due to
* whitespace differences).
* <p>
* It is an error to set the same named argument multiple times (even if the
* values are the same). Redundantly specifying the same flag is acceptable.
* <p>
* Flags and arguments are presented to the substitution's implementation via
* the
* {@link #retrieveResult(String, String, boolean, SubstitutableModuleOptions, Set, Map)}
* method.
*/
public abstract class SubstituterHelper implements ModuleOptionSubstituter {
/**
* Retrieve the substitution result associated with the given key, if any,
* otherwise null
*
* @param type
* the (always non-null) type of substitution to perform
* @param key
* the required key
* @param redact
* if the result must be redacted regardless of any information to
* the contrary
* @param substitutableModuleOptions
* the module options and their current substitution state
* @param flags
* the flags specified, if any, beyond the standard {@code redact},
* {@code notBlank}, {@code notEmpty}, and {@code fromOption} flags
* @param args
* the arguments specified, if any, beyond the standard
* {@code defaultValue} and {@code defaultOption} arguments
* @return the substitution result associated with the given key, if any,
* otherwise null
* @throws IOException
* if the request cannot be performed such that the use of a default
* value would be inappropriate
*/
public abstract RedactableObject retrieveResult(String type, String key, boolean redact,
SubstitutableModuleOptions substitutableModuleOptions, Set<String> flags, Map<String, String> args)
throws IOException;
@Override
public RedactableObject doSubstitution(String type, List<String> modifiers, String value,
SubstitutableModuleOptions substitutableModuleOptions) throws IOException {
// etc...
}
// etc...
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer.smo;
/**
* A {@code ModuleOptionSubstituter} that handles module option substitution.
* <p>
* Note that the {@code fromOption} modifier, if present, still adds an
* additional level of indirection: the value indicates the option whose value,
* in turn, indicates the option text to retrieve.
*/
public class OptionSubstituter implements ModuleOptionSubstituter {
private final SubstituterHelper substituterHelper;
public OptionSubstituter() {
substituterHelper = new SubstituterHelper() {
@Override
public RedactableObject retrieveResult(String type, String key, boolean redact,
SubstitutableModuleOptions substitutableModuleOptions, Set<String> flags, Map<String, String> args)
throws IOException {
if (!flags.isEmpty() || !args.isEmpty())
throw new IOException(String.format("Unknown extra flags/args: %s; %s", flags, args));
RedactableObject substitutionResult = SubstitutableModuleOptionsCallbackHandler
.getSubstitutionResult(substitutableModuleOptions, key, false);
return redact ? substitutionResult.redactedVersion() : substitutionResult;
}
};
}
@Override
public RedactableObject doSubstitution(String type, List<String> modifiers, String value,
SubstitutableModuleOptions substitutableModuleOptions) throws IOException {
return substituterHelper.doSubstitution(type, modifiers, value, substitutableModuleOptions);
}
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer.smo;
/**
* A {@code ModuleOptionSubstituter} that handles file content substitution.
*/
public class FileContentSubstituter implements ModuleOptionSubstituter {
private final SubstituterHelper substituterHelper;
public FileContentSubstituter() {
substituterHelper = new SubstituterHelper() {
@Override
public RedactableObject retrieveResult(String type, String key, boolean redact,
SubstitutableModuleOptions substitutableModuleOptions, Set<String> flags, Map<String, String> args)
throws IOException {
if (!flags.isEmpty() || !args.isEmpty())
throw new IOException(String.format("Unknown extra flags/args: %s; %s", flags, args));
Path path = Paths.get(key);
if (!Files.exists(path))
return null;
BasicFileAttributes fileAttributes = Files.getFileAttributeView(path, BasicFileAttributeView.class)
.readAttributes();
if (!fileAttributes.isRegularFile())
return null;
long fileSize = fileAttributes.size();
int maxSize = 1024 * 1024;
if (fileSize > maxSize) {
throw new IOException(String.format("Type=%s: key=%s: file size exceeds max of %d: %d", type, key,
maxSize, fileSize));
}
String retval = new String(Files.readAllBytes(path), StandardCharsets.UTF_8);
return new RedactableObject(retval, redact);
}
};
}
@Override
public RedactableObject doSubstitution(String type, List<String> modifiers, String value,
SubstitutableModuleOptions substitutableModuleOptions) throws IOException {
return substituterHelper.doSubstitution(type, modifiers, value, substitutableModuleOptions);
}
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer.smo;
/**
* A {@code ModuleOptionSubstituter} that handles system property substitution.
* The JAAS config file already supports system property substitution via the
* '{@code ${}' and '}' delimiters, but we support it here for syntactic
* consistency as well as to add the functionality associated with modifiers.
*/
public class SystemPropertySubstituter implements ModuleOptionSubstituter {
private final SubstituterHelper substituterHelper;
public SystemPropertySubstituter() {
substituterHelper = new SubstituterHelper() {
@Override
public RedactableObject retrieveResult(String type, String key, boolean redact,
SubstitutableModuleOptions substitutableModuleOptions, Set<String> flags, Map<String, String> args)
throws IOException {
if (!flags.isEmpty() || !args.isEmpty())
throw new IOException(String.format("Unknown extra flags/args: %s; %s", flags, args));
String retval = System.getProperty(key);
return retval != null ? new RedactableObject(retval, redact) : null;
}
};
}
@Override
public RedactableObject doSubstitution(String type, List<String> modifiers, String value,
SubstitutableModuleOptions substitutableModuleOptions) throws IOException {
return substituterHelper.doSubstitution(type, modifiers, value, substitutableModuleOptions);
}
} |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.common.security.oauthbearer.smo;
/**
* A {@code ModuleOptionSubstituter} that handles environment variable
* substitution
*/
public class EnvironmentVariableSubstituter implements ModuleOptionSubstituter {
private final SubstituterHelper substituterHelper;
public EnvironmentVariableSubstituter() {
substituterHelper = new SubstituterHelper() {
@Override
public RedactableObject retrieveResult(String type, String key, boolean redact,
SubstitutableModuleOptions substitutableModuleOptions, Set<String> flags, Map<String, String> args)
throws IOException {
if (!flags.isEmpty() || !args.isEmpty())
throw new IOException(String.format("Unknown extra flags/args: %s; %s", flags, args));
String retval = System.getenv(key);
return retval != null ? new RedactableObject(retval, redact) : null;
}
};
}
@Override
public RedactableObject doSubstitution(String type, List<String> modifiers, String value,
SubstitutableModuleOptions substitutableModuleOptions) throws IOException {
return substituterHelper.doSubstitution(type, modifiers, value, substitutableModuleOptions);
}
} |
Given the above code, and assuming the existence of the following mapping in the module options map:
thePassword="$[file|redact|notBlank|=/path/to/secrets/the_secret]"
we can retrieve the contents of the file as follows:
Code Block | ||||
---|---|---|---|---|
| ||||
SubstitutableModuleOptions options = new SubstitutableModuleOptions(moduleOptionsMap);
boolean requiredToExist = true;
SubstitutableModuleOptionsCallback callback = new SubstitutableModuleOptionsCallback(options, "thePassword", requiredToExist);
CallbackHandler callbackHandler = new SubstitutableModuleOptionsCallbackHandler()
callbackHandler.handle(new Callback[] {callback});
String thePassword = callback.substitutionResult().value(); |
Alternatively, there is a convenience method that allows us to avoid using a callback if we wish:
Code Block | ||||
---|---|---|---|---|
| ||||
SubstitutableModuleOptions options = new SubstitutableModuleOptions(moduleOptionsMap);
boolean requiredToExist = true;
String thePassword = SubstitutableModuleOptionsCallbackHandler.getSubstitutionResult(substitutableModuleOptions,
"thePassword", requiredToExist).value(); |
The initial set of supported substitution types and their specifiable constraints are as follows:
Type | Description | Specifiable Constraints | Notes |
---|---|---|---|
file | File content substitution |
| It is an error if the file does not exist or is not readable unless defaultValue or defaultOption specified. If a defaultValue is specified then the literal default value specified will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If a defaultOption is specified then the value defined by the specified option will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the default option's value depends on substitutions that were marked redact then redact is implied. The fromOption modifier indicates that the value, instead of being the file to read, is instead the name of an option whose value is to be taken as the filename. This provides the ability to generate filenames from multiple substitutions as opposed to being forced to literally specify it. |
envvar | Environment variable substitution | same as above | It is an error if the environment variable does not exist unless defaultValue or defaultOption specified. If a defaultValue is specified then the literal default value specified will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If a defaultOption is specified then the value defined by the specified option will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the default option's value depends on substitutions that were marked redact then redact is implied. The fromOption modifier indicates that the value, instead of being the environment variable to read, is instead the name of an option whose value is to be taken as the environment variable name. This provides the ability to generate environment variable names from multiple substitutions as opposed to being forced to literally specify it. |
option | Another module option value substitution | same as above | It is an error if the option does not exist unless defaultValue or defaultOption specified. If a defaultValue is specified then the literal default value specified will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If a defaultOption is specified then the value defined by the specified option will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the default option's value depends on substitutions that were marked redact then redact is implied. The fromOption modifier indicates that the value, instead of being the option to read, is instead the name of an option whose value is to be taken as the option name. This provides the ability to generate option names from multiple substitutions as opposed to being forced to literally specify it. |
sysprop | System property substitution | same as above | It is an error if the system property does not exist unless defaultValue or defaultOption specified. If a defaultValue is specified then the literal default value specified will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If a defaultOption is specified then the value defined by the specified option will be used and checked against any notBlank or notEmpty constraints that exist if those constraints are violated by the previously-determined value. If the default option's value depends on substitutions that were marked redact then redact is implied. The fromOption modifier indicates that the value, instead of being the system property to read, is instead the name of an option whose value is to be taken as the system property name. This provides the ability to generate system property names from multiple substitutions as opposed to being forced to literally specify it. |
To add new substitutions simply define a module option of the following form:
typeModuleOptionSubstituter = "fully.qualified.class.name"
For example:
fooModuleOptionSubstituter = "org.example.FooSubstituter"
The indicated class must implement the org.apache.kafka.common.security.oauthbearer.smo.ModuleOptionSubstituter
interface.
Invoke the substitution with text in an option like this:
$[foo/optional/modifiers/=optionalValue]
Note that the above substitution functionality and the related classes and interfaces are not necessarily specific to OAuth Bearer Tokens, so an open question is whether this substitution-related code belongs in a sub-package of the main SASL/OAUTHBEARER one or if it should live somewhere else.
Token Retrieval
Token Retrieval
Token retrieval occurs on the client side of the SASL negotiation (in the producer/consumer, or on the broker when SASL/OAUTHBEARER is the inter-broker protocol), and the org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
class is the LoginModule
implementation that creates and invokes an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenRetriever
to perform the retrieval. We provide the org.apache.kafka.common.security.oauthbearer.OAuthBearerUnsecuredJwtRetriever
class as a sample implementation that also provides value in development and testing situations.
...
Exceptions
Anchor | ||||
---|---|---|---|---|
|
...
Unchecked exceptions can be added or deleted without breaking binary compatibility, and Kafka uses unchecked exceptions exclusively, so
...
OAuthBearerException
is unchecked. We considered making OAuthBearerException
a checked exception to get the compiler help provided by the use of checked exceptions
...
because the set of exception types and the size of the overall code base are both small
...
such that the risk that we would want to change the type(s) of exception(s) thrown is minimal. Nonetheless, we agreed the benefit of consistency and eliminating the risk of breaking binary compatibility outweighed the benefit of the compiler help provided by checked exceptions.
Token Refresh
Anchor | ||||
---|---|---|---|---|
|
...
We considered associating refresh-related properties (such as the minimum refresh period in milliseconds) with the ExpiringCredential
rather than the ExpiringCredentialRefreshingLogin
instance because the ExpiringCredentialRefreshingLogin
instance couldn't know which of the potentially multiple login modules actually applies (i.e. which is the one associated with the inter-broker protocol); it wouldn't always know how to find the JAAS config options, so it wouldn't always know how to get the refresh configuration. There was problem with this aproach, though: we can't invoke login()
on the LoginContext
and get an ExpiringCredential
instance without a CallbackHandler
, so we needed to know the type of CallbackHandler
to instantiate – and there is no way to know that. It simply made sense to give the ExpiringCredentialRefreshingLogin
instance the ability to discover the correct login module in all cases and be able to ask it for the CallbackHandler
instance; hence we created the ExpiringCredentialLoginModule
interface.
...
.