Current state: "Accepted"
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Kafka currently supports non-configurable SASL extensions in its SCRAM authentication protocol for delegation token validation.
It would be useful to provide configurable SASL extensions for the OAuthBearer authentication mechanism as well, such that clients could attach arbitrary data for the principal authenticating into Kafka. Even though the JWT token standard supports customizable fields (in the form of claims), there are use cases where the client is unable to add additional ones (e.g: Kafka client receives a signed JWT token from a third-party).
This way, a custom principal can hold information derived from the authentication mechanism, which could prove useful for better tracing and troubleshooting, for example. This can be done in a way which allows for easier extendability in future SASL mechanisms.
It is worth noting that these extensions would lack a digital signature and therefore should not be used for critical use-cases where security is a concern.
New JAAS config option for default, unsecured bearer tokens - `unsecuredLoginExtension_<extensionname>` (as shown in the "Example" paragraph). The name "auth" is not supported as a custom extension name with any SASL/OAUTHBEARER mechanism, including the unsecured one, since it is reserved by the specification for what is normally sent in the HTTP Authorization header. An attempt to use it will result in an exception on the client. There are also additional regex validations for extension name and values to ensure they conform to the SASL/OAUTHBEARER standard (specifically, https://tools.ietf.org/html/rfc7628#section-3.1)
The server can further validate the extensions via its pluggable callback handler.
`OAuthBearerExtensionsValidatorCallback` - callback for OAuth extension validation, providing access to the token
package org.apache.kafka.common.security.oauthbearer; /** * A {@code Callback} for use by the {@code SaslServer} implementation when it * needs to validate the SASL extensions for the OAUTHBEARER mechanism * Callback handlers should use the {@link #validate(String)} * method to communicate valid extensions back to the SASL server. * Callback handlers should use the * {@link #error(String, String)} method to communicate validation errors back to * the SASL Server. * As per RFC-7628 (https://tools.ietf.org/html/rfc7628#section-3.1), unknown extensions must be ignored by the server. * The callback handler implementation should simply ignore unknown extensions, * not calling {@link #error(String, String)} nor {@link #validate(String)}. * Callback handlers should communicate other problems by raising an {@code IOException}. * <p> * The OAuth bearer token is provided in the callback for better context in extension validation. * It is very important that token validation is done in its own {@link OAuthBearerValidatorCallback} * irregardless of provided extensions, as they are inherently insecure. */ public class OAuthBearerExtensionsValidatorCallback implements Callback { public OAuthBearerExtensionsValidatorCallback(OAuthBearerToken token, SaslExtensions extensions) /** * @return {@link OAuthBearerToken} the OAuth bearer token of the client */ public OAuthBearerToken token() /** * @return {@link SaslExtensions} consisting of the unvalidated extension names and values that were sent by the client */ public SaslExtensions inputExtensions() /** * @return an unmodifiable {@link Map} consisting of the validated and recognized by the server extension names and values */ public Map<String, String> validatedExtensions() /** * @return An immutable {@link Map} consisting of the name->error messages of extensions which failed validation */ public Map<String, String> invalidExtensions() /** * Validates a specific extension in the original {@code inputExtensions} map * @param extensionName - the name of the extension which was validated */ public void validate(String extensionName) /** * Set the error value for a specific extension key-value pair if validation has failed * * @param invalidExtensionName * the mandatory extension name which caused the validation failure * @param errorMessage * error message describing why the validation failed */ public void error(String invalidExtensionName, String errorMessage) } |
`SaslExtensionsCallback` - generic callback to hold extensions
package org.apache.kafka.common.security.auth; public class SaslExtensionsCallback implements Callback { /** * Returns a {@link SaslExtensions} consisting of the extension names and values that are sent by the client to * the server in the initial client SASL authentication message. */ public SaslExtensions extensions() /** * Sets the SASL extensions on this callback. */ public void extensions(SaslExtensions extensions) } |
`SaslExtensions` - class for holding extensions data
package org.apache.kafka.common.security.auth; /** * A simple value object class holding customizable SASL extensions */ public class SaslExtensions { public SaslExtensions(Map<String, String> extensionMap) /** * Returns an <strong>immutable</strong> map of the extension names and their values */ public Map<String, String> map() } |
The default `OAuthBearerLoginModule` and the `OAuthBearerSaslClient` will be changed to request the extensions from their callback handler. For backwards compatibility it is not necessary for the callback handler to support `SaslExtensionsCallback`. Any UnsupportedCallbackException that is thrown will be ignored and no extensions will be added.
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Create a new public `SaslExtensions` class that takes most of the generalizable logic from `ScramExtensions`. `ScramExtensions` will extend `SaslExtensions`
Create a new public `SaslExtensionsCallback` class which will be similar to `ScramExtensionsCallback`. `ScramExtensionsCallback` will NOT extend `SaslExtensionsCallback` since it will not support the new `SaslExtensions` class.
Create a new public `OAuthBearerExtensionsValidatorCallback` class.
A user would make use of the changes in this KIP in the following way:
Add the extension names to your JAAS configuration in the client
KafkaClient { org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required unsecuredLoginStringClaim_sub="thePrincipalName" unsecuredLoginExtension_traceId="123" unsecuredLoginExtension_logLevel="WARN"; }; |
A custom principal builder can then make use of the new extension
public class CustomPrincipalBuilder implements KafkaPrincipalBuilder { @Override public KafkaPrincipal build(AuthenticationContext context) { if (context instanceof SaslAuthenticationContext) { SaslServer saslServer = ((SaslAuthenticationContext) context).server(); String traceId = saslServer.getNegotiatedPropery("traceId"); return new CustomPrincipal("", saslServer, traceId); } throw new Exception(); } } |
As such, I decided it is best we limit the scope of this KIP while still implementing it in a way which would support future extensions by other SASL mechanisms