...
SASL/PLAIN servers using the SaslServer implementation included in Kafka must handle NameCallback
and PlainAuthenticateCallback
. The username for authentication is provided in NameCallback
similar to other mechanisms in the JRE (eg. Digest-MD5). The password provided by the client during SASL authentication is provided in PlainAuthenticateCallback
. The callback handler sets authenticated flag in the callback after verifying username and password.
SASL/PLAIN server callback handler is requested to handle NameCallback
and PlainAuthenticateCallback
in that order. See the sample SASL/PLAIN callback handler for an example.
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.common.security.plain; import javax.security.auth.callback.Callback; public class PlainAuthenticateCallback implements Callback { private final char[] password; private boolean authenticated; public PlainAuthenticateCallback(char[] password) { this.password = password; } public char[] password() { return password; } public boolean authenticated() { return this.authenticated; } public void authenticated(boolean authenticated) { this.authenticated = authenticated; } } |
...
SASL/SCRAM servers using the SaslServer implementation included in Kafka must handle NameCallback
and ScramCredentialCallback
. The username for authentication is provided in NameCallback
similar to other mechanisms in the JRE (eg. Digest-MD5). The callback handler must return SCRAM credential for the user if credentials are available for the username for the configured SCRAM mechanism.
SASL/SCRAM server callback handler is requested to handle NameCallback
and
in that order. See the sample SASL/SCRAM callback handler for an example.ScramCredentialCallback
Code Block | ||||
---|---|---|---|---|
| ||||
Code Block | ||||
| ||||
package org.apache.kafka.common.security.scram; import javax.security.auth.callback.Callback; public class ScramCredentialCallback implements Callback { private ScramCredential scramCredential; public ScramCredential scramCredential() { return scramCredential; } public void scramCredential(ScramCredential scramCredential) { this.scramCredential = scramCredential; } } scramCredential) { this.scramCredential = scramCredential; } } |
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.common.security.scram;
public class ScramCredential {
private final byte[] salt;
private final byte[] serverKey;
private final byte[] storedKey;
private final int iterations;
public ScramCredential(byte[] salt, byte[] storedKey, byte[] serverKey, int iterations) {
this.salt = salt;
this.serverKey = serverKey;
this.storedKey = storedKey;
this.iterations = iterations;
}
public byte[] salt() {
return salt;
}
public byte[] serverKey() {
return serverKey;
}
public byte[] storedKey() {
return storedKey;
}
public int iterations() {
return iterations;
}
} |
SASL/PLAIN callbacks enable authentication of passwords using an external authentication server without requiring Kafka to have knowledge of actual passwords. The callback checks if the (user, password) combination provided during SASL/PLAIN exchange is valid. SASL/SCRAM callbacks do require the salted credentials to perform the SCRAM authentication and hence the credentials are requested in the callback.
Proposed Changes
ChannelBuilder
will create an instance of each configured callback handler using the default constructor. For mechanisms without a callback handler override, the existing default callback handlers (SaslServerCallbackHandler/SaslClientCallbackHandler
) will be created. Callback handler instances will be created once for each enabled mechanism in ChannelBuilder
, instead of per-connection. This enables callback handlers using external authentication servers to cache credentials or reuse connections if required. SaslClientCallbackHandler
will be modified to obtain Subject
using Subject.getSubject(AccessController.getContext())
to avoid the current per-connection state.
...
Callbacks defined for the mechanism in the Java implementation must be handled by custom callback handlers if the behaviour differs from the default callbacks in Kafka.
Anchor | ||||
---|---|---|---|---|
|
Code Block | ||||
---|---|---|---|---|
| ||||
public class PlainServerCallbackHandler implements AuthCallbackHandler { @Override public void configure(Map<String, ?> configs, String mechanism) { } @Override public Mode mode() { return Mode.SERVER; } @Override public Collection<String> supportedSaslMechanisms() { return Arrays.asList(PlainSaslServer.PLAIN_MECHANISM); } @Override public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { String username = null; for (Callback callback: callbacks) { if (callback instanceof NameCallback) username = ((NameCallback) callback).getDefaultName(); else if (callback instanceof PlainAuthenticateCallback) { PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback; boolean authenticated = authenticate(username, plainCallback.password()); plainCallback.authenticated(authenticated); } else throw new UnsupportedCallbackException(callback); } } protected boolean authenticate(String username, char[] password) throws IOException { if (username == null) return false; else { String expectedPassword = JaasUtils.jaasConfig(LoginType.SERVER.contextName(), "user_" + username, PlainLoginModule.class.getName()); return Arrays.equals(password, expectedPassword.toCharArray()); } } @Override public void close() throws KafkaException { } } |
...
For custom SASL/PLAIN authentication, override authenticate()
with custom implementation that verifies the given password
for username
.
Anchor | ||||
---|---|---|---|---|
|
Code Block | ||||
---|---|---|---|---|
| ||||
public class ScramServerCallbackHandler implements AuthCallbackHandler { @Override public void configure(Map<String, ?> configs, String mechanism) { } @Override public Mode mode() { return Mode.SERVER; } @Override public Collection<String> supportedSaslMechanisms() { return ScramMechanism.mechanismNames(); } @Override public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { String username = null; for (Callback callback : callbacks) { if (callback instanceof NameCallback) username = ((NameCallback) callback).getDefaultName(); else if (callback instanceof ScramCredentialCallback) ((ScramCredentialCallback) callback).scramCredential(credential(username)); else throw new UnsupportedCallbackException(callback); } } protected ScramCredential credential(String username) { // Return SCRAM credential from credential store } @Override public void close() { } } |
...