...
Callbacks defined for the mechanism in the Java implementation must be handled by custom callback handlers if the behaviour differs from the default callbacks in Kafka.
Sample Callback Handler for SASL/PLAIN
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class PlainServerCallbackHandler implements AuthCallbackHandler {
@Override
public void configure(Map<String, ?> configs, String mechanism) {
}
@Override
public Mode mode() {
return Mode.SERVER;
}
@Override
public Collection<String> supportedSaslMechanisms() {
return Arrays.asList(PlainSaslServer.PLAIN_MECHANISM);
}
@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
String username = null;
for (Callback callback: callbacks) {
if (callback instanceof NameCallback)
username = ((NameCallback) callback).getDefaultName();
else if (callback instanceof PlainAuthenticateCallback) {
PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;
boolean authenticated = authenticate(username, plainCallback.password());
plainCallback.authenticated(authenticated);
} else
throw new UnsupportedCallbackException(callback);
}
}
protected boolean authenticate(String username, char[] password) throws IOException {
if (username == null)
return false;
else {
String expectedPassword = JaasUtils.jaasConfig(LoginType.SERVER.contextName(), "user_" + username, PlainLoginModule.class.getName());
return Arrays.equals(password, expectedPassword.toCharArray());
}
}
@Override
public void close() throws KafkaException {
}
} |
For custom SASL/PLAIN authentication, override authenticate()
with custom implementation that verifies the given password
for username
.
Sample Callback Handler for SASL/SCRAM
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class ScramServerCallbackHandler implements AuthCallbackHandler {
@Override
public void configure(Map<String, ?> configs, String mechanism) {
}
@Override
public Mode mode() {
return Mode.SERVER;
}
@Override
public Collection<String> supportedSaslMechanisms() {
return ScramMechanism.mechanismNames();
}
@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
String username = null;
for (Callback callback : callbacks) {
if (callback instanceof NameCallback)
username = ((NameCallback) callback).getDefaultName();
else if (callback instanceof ScramCredentialCallback)
((ScramCredentialCallback) callback).scramCredential(credential(username));
else
throw new UnsupportedCallbackException(callback);
}
}
protected ScramCredential credential(String username) {
// Return SCRAM credential from credential store
}
@Override
public void close() {
}
} |
For custom credential store for SCRAM, override credential()
with alternative method that obtains credential from the custom store. If custom credential store supports a smaller subset of SCRAM mechanisms (eg. only SCRAM-SHA-256), override `supportedSaslMechanisms()`.
Compatibility, Deprecation, and Migration Plan
...