...
Kafka currently supports SASL authentication using SASL/PLAIN mechanism and KIP-84 addresses the addition of SASL/SCRAM. Credential verification in SASL/PLAIN servers is currently based on hard-coded credentials in JAAS configuration similar to Digest-MD5 configuration in Zookeeper. This is useful as a sample, but not suitable for production use since clear passwords are stored on disk. KIP-84 proposes to add added SCRAM mechanism with Zookeeper as the password store. In production installations where Zookeeper is not secure (e.g. Kafka on the cloud), an alternative password store may be required.
...
This KIP proposes to enable customization of SASL server and clients using configurable callback handlers. Configurable callback handlers for SASL/PLAIN and SASL/SCRAM will enable credential providers to be replaced in a simple and consistent way. In addition to this, configurable callback handlers for both server and clients make it easier to configure new or custom SASL mechanisms that are not implemented in Kafka.
...
A new configuration property sasl.callback.handlers
will be added to enable new callback handlers to be specified for brokers and clients. This will be a list of classes that implement the org.apache.kafka.common.security.auth.AuthCallbackHandlerAuthenticateCallbackHandler
interface. A different handler may be provided for each enabled mechanism.
...
The callback handler interface AuthCallbackHandler
AuthenticateCallbackHandler
will extend the standard javax.security.auth.callback.CallbackHandler
interface, enabling the handler to be passed directly to SaslServer/SaslClient
implementations. The callback handler configured for a mechanism must include the callbacks as described below:
...
Callback handlers which require additional options at runtime (eg. URL of a credential server) may include arguments in the JAAS configuration using the config file or sasl.jaas.config
property (KIP-85). This is similar to the way keytab location is configured for GSSAPI. Client callback handlers can retrieve Subject
using Subject.getSubject(AccessController.getContext())
to obtain credentials populated by the login module.
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.common.security.auth;
import org.apache.kafka.common.network.Mode;
import java.util.Collection;
import javax.security.auth.callback.CallbackHandler;
public interface AuthCallbackHandler extends CallbackHandler {
/**
* Configures the callback handler for the specified SASL mechanism.
*/
void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries);
/**
* Returns the connection mode supported by this callback handler.
*/
Mode mode();
/**
* Returns the SASL mechanisms supported by this callback handler.
*/
Collection<String> supportedSaslMechanisms();
/**
* Closes this instance.
*/
void close();
} |
...
SASL/SCRAM servers using the SaslServer
implementation included in Kafka must handle NameCallback
and ScramCredentialCallback
. The username for authentication is provided in NameCallback
similar to other mechanisms in the JRE (eg. Digest-MD5). The callback handler must return SCRAM credential for the user if credentials are available for the username for the configured SCRAM mechanism.
...
Define a new class that implements AuthCallbackHandler
AuthenticateCallbackHandler
which handles NameCallback
and PlainAuthenticateCallback
and add the class to the broker's sasl.callback.handlers
property. A single instance of this callback handler will be created for the broker. The configured callback handler is responsible for validating the password provided by clients and this may use an external authentication server.
...
Set broker callback handler to a class that implements AuthCallbackHandler
AuthenticateCallbackHandler
which handles NameCallback
and ScramCredentialCallback
. SCRAM credentials from a custom store can be returned by the callback handler.
...
Callbacks defined for the mechanism in the Java implementation must be handled by custom callback handlers if the behaviour differs from the default callbacks in Kafka.
...
Configure callbacks for different mechanisms on different listeners in the broker
KIP-103 introduced support for multiple listeners in the broker for the same security protocol. This allows brokers to configure different SASL mechanisms for internal and external traffic. The listener name prefix can be applied to sasl.callback.handlers
to define different callback handlers for each of the listeners.
Anchor | ||||
---|---|---|---|---|
|
Code Block | ||||
---|---|---|---|---|
| ||||
public class PlainServerCallbackHandler implements AuthCallbackHandlerAuthenticateCallbackHandler { private List<AppConfigurationEntry> jaasConfigEntries; @Override public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) { this.jaasConfigEntries = jaasConfigEntries; } @Override public Mode mode() { return Mode.SERVER; } @Override public Collection<String> supportedSaslMechanisms() { return Arrays.asList(PlainSaslServer.PLAIN_MECHANISM); } @Override public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { String username = null; for (Callback callback: callbacks) { if (callback instanceof NameCallback) username = ((NameCallback) callback).getDefaultName(); else if (callback instanceof PlainAuthenticateCallback) { PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback; boolean authenticated = authenticate(username, plainCallback.password()); plainCallback.authenticated(authenticated); } else throw new UnsupportedCallbackException(callback); } } protected boolean authenticate(String username, char[] password) throws IOException { if (username == null) return false; else { String expectedPassword = JaasUtilsJaasContext.jaasConfig(LoginType.SERVER.contextName()configEntryOption(jaasConfigEntries, "user_" + username, PlainLoginModule.class.getName()); return Arrays.equals(password, expectedPassword.toCharArray()); } } @Override public void close() throws KafkaException { } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class ScramServerCallbackHandler implements AuthCallbackHandler {
@Override
public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
}
@Override
public Mode mode() {
return Mode.SERVER;
}
@Override
public Collection<String> supportedSaslMechanisms() {
return ScramMechanism.mechanismNames();
}
@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
String username = null;
for (Callback callback : callbacks) {
if (callback instanceof NameCallback)
username = ((NameCallback) callback).getDefaultName();
else if (callback instanceof ScramCredentialCallback)
((ScramCredentialCallback) callback).scramCredential(credential(username));
else
throw new UnsupportedCallbackException(callback);
}
}
protected ScramCredential credential(String username) {
// Return SCRAM credential from credential store
}
@Override
public void close() {
}
} |
...