Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

SASL/PLAIN servers using the SaslServer implementation included in Kafka must handle NameCallback and PlainAuthenticateCallback. The username for authentication is provided in NameCallback similar to other mechanisms in the JRE (eg. Digest-MD5). The password provided by the client during SASL authentication is provided in PlainAuthenticateCallback. The callback handler sets authenticated flag in the callback after verifying username and password.

SASL/PLAIN server callback handler is requested to handle NameCallback and PlainAuthenticateCallback in that order. See the sample SASL/PLAIN callback handler for an example.

Code Block
languagejava
titleorg.apache.kafka.common.security.plain.PlainCredentialCallback
package org.apache.kafka.common.security.plain;
import javax.security.auth.callback.Callback;

public class PlainAuthenticateCallback implements Callback {
    private final char[] password;
    private boolean authenticated;
    public PlainAuthenticateCallback(char[] password) {
        this.password = password;
    }
    public char[] password() {
        return password;
    }
    public boolean authenticated() {
        return this.authenticated;
    }
    public void authenticated(boolean authenticated) {
        this.authenticated = authenticated;
    }
}

...

SASL/SCRAM servers using the SaslServer implementation included in Kafka must handle NameCallback and ScramCredentialCallback. The username for authentication is provided in NameCallback similar to other mechanisms in the JRE (eg. Digest-MD5). The callback handler must return SCRAM credential for the user if credentials are available for the username for the configured SCRAM mechanism.

SASL/SCRAM server callback handler is requested to handle NameCallback and ScramCredentialCallback in that order. See the sample SASL/SCRAM callback handler for an example.

Code Block
languagejava
titleorg.apache.
Code Block
languagejava
titleorg.apache.kafka.common.security.scram.ScramCredentialCallback
package org.apache.kafka.common.security.scram;
import javax.security.auth.callback.Callback;
public class ScramCredentialCallback implements Callback {
    private ScramCredential scramCredential;
    public ScramCredential scramCredential() {
        return scramCredential;
    }
    public void scramCredential(ScramCredential scramCredential) {
        this.scramCredential = scramCredential;
    }
} scramCredential) {
        this.scramCredential = scramCredential;
    }
}
Code Block
languagejava
titleorg.apache.kafka.common.security.scram.ScramCredential
package org.apache.kafka.common.security.scram;
public class ScramCredential {
    private final byte[] salt;
    private final byte[] serverKey;
    private final byte[] storedKey;
    private final int iterations;
    public ScramCredential(byte[] salt, byte[] storedKey, byte[] serverKey, int iterations) {
        this.salt = salt;
        this.serverKey = serverKey;
        this.storedKey = storedKey;
        this.iterations = iterations;
    }
    public byte[] salt() {
        return salt;
    }
    public byte[] serverKey() {
        return serverKey;
    }
    public byte[] storedKey() {
        return storedKey;
    }
    public int iterations() {
        return iterations;
    }
}

 

SASL/PLAIN callbacks enable authentication of passwords using an external authentication server without requiring Kafka to have knowledge of actual passwords. The callback checks if the (user, password) combination provided during SASL/PLAIN exchange is valid. SASL/SCRAM callbacks do require the salted credentials to perform the SCRAM authentication and hence the credentials are requested in the callback.

 

Proposed Changes

ChannelBuilder  will create an instance of each configured callback handler using the default constructor. For mechanisms without a callback handler override, the existing default callback handlers (SaslServerCallbackHandler/SaslClientCallbackHandler) will be created. Callback handler instances will be created once for each enabled mechanism in ChannelBuilder, instead of per-connection. This enables callback handlers using external authentication servers to cache credentials or reuse connections if required. SaslClientCallbackHandler will be modified to obtain Subject using Subject.getSubject(AccessController.getContext()) to avoid the current per-connection state.

...

Callbacks defined for the mechanism in the Java implementation must be handled by custom callback handlers if the behaviour differs from the default callbacks in Kafka.

 

Anchor
sample plain
sample plain
Sample Callback Handler for SASL/PLAIN

Code Block
languagejava
titleSample SASL/PLAIN Callback Handler
public class PlainServerCallbackHandler implements AuthCallbackHandler {
    @Override
    public void configure(Map<String, ?> configs, String mechanism) {
    }
    @Override
    public Mode mode() {
        return Mode.SERVER;
    }
    @Override
    public Collection<String> supportedSaslMechanisms() {
        return Arrays.asList(PlainSaslServer.PLAIN_MECHANISM);
    }
    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        String username = null;
        for (Callback callback: callbacks) {
            if (callback instanceof NameCallback)
                username = ((NameCallback) callback).getDefaultName();
            else if (callback instanceof PlainAuthenticateCallback) {
                PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;
                boolean authenticated = authenticate(username, plainCallback.password());
                plainCallback.authenticated(authenticated);
            } else
                throw new UnsupportedCallbackException(callback);
        }
    }
    protected boolean authenticate(String username, char[] password) throws IOException {
        if (username == null)
            return false;
        else {
            String expectedPassword = JaasUtils.jaasConfig(LoginType.SERVER.contextName(), "user_" + username, PlainLoginModule.class.getName());
            return Arrays.equals(password, expectedPassword.toCharArray());
        }
    }
    @Override
    public void close() throws KafkaException {
    }
}

...

For custom SASL/PLAIN authentication, override authenticate() with custom implementation that verifies the given password for username.

Anchor
sample scram
sample scram
Sample Callback Handler for SASL/SCRAM

Code Block
languagejava
titleSample SASL/SCRAM Callback Handler
public class ScramServerCallbackHandler implements AuthCallbackHandler {
    @Override
    public void configure(Map<String, ?> configs, String mechanism) {
    }
    @Override
    public Mode mode() {
        return Mode.SERVER;
    }
    @Override
    public Collection<String> supportedSaslMechanisms() {
        return ScramMechanism.mechanismNames();
    }
    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        String username = null;
        for (Callback callback : callbacks) {
            if (callback instanceof NameCallback)
                username = ((NameCallback) callback).getDefaultName();
            else if (callback instanceof ScramCredentialCallback)
                ((ScramCredentialCallback) callback).scramCredential(credential(username));
            else
                throw new UnsupportedCallbackException(callback);
        }
    }
    protected ScramCredential credential(String username) {
        // Return SCRAM credential from credential store
    }
    @Override
    public void close() {
    }
}

...