Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleorg.apache.kafka.common.security.auth.AuthCallbackHandler
package org.apache.kafka.common.security.auth;
import org.apache.kafka.common.network.Mode;
import java.util.Collection;
import javax.security.auth.callback.CallbackHandler;

public interface AuthCallbackHandler extends CallbackHandler {
    /**
     * Configures the callback handler for the specified SASL mechanism.
     */
    void configure(Map<String, ?> configs, String saslMechanism);
    /**
     * Returns the connection mode supported by this callback handler.
     */
    Mode mode();
    /**
     * Returns the SASL mechanisms supported by this callback handler.
     */
    Collection<String> supportedSaslMechanisms();
    /**
     * Closes this instance.
     */
    void close();
}

 

SASL/PLAIN Server Callbacks

SASL/PLAIN servers using the SaslServer implementation included in Kafka must handle NameCallback and PlainAuthenticateCallback. The username for authentication is provided in NameCallback similar to other mechanisms in the JRE (eg. Digest-MD5). The password provided by the client during SASL authentication is provided in PlainAuthenticateCallback. The callback handler sets authenticated flag in the callback after verifying username and password.

Code Block
languagejava
titleorg.apache.kafka.common.security.plain.PlainCredentialCallback
package org.apache.kafka.common.security.plain;
import javax.security.auth.callback.Callback;

public class PlainAuthenticateCallback implements Callback {
    private final char[] password;
    private boolean authenticated;
    public PlainAuthenticateCallback(char[] password) {
        this.password = password;
    }
    public char[] password() {
        return password;
    }
    public boolean authenticated() {
        return this.authenticated;
    }
    public void authenticated(boolean authenticated) {
        this.authenticated = authenticated;
    }
}

SASL/SCRAM

...

Server Callbacks

SASL/SCRAM servers using the SaslServer implementation included in Kafka must handle NameCallback and ScramCredentialCallback. The username for authentication is provided in NameCallback similar to other mechanisms in the JRE (eg. Digest-MD5). The callback handler must return SCRAM credential for the user if credentials are available for the username for the configured SCRAM mechanism.

Code Block
languagejava
titleorg.apache.kafka.common.security.scram.ScramCredentialCallback
package org.apache.kafka.common.security.scram;
import javax.security.auth.callback.Callback;
public class ScramCredentialCallback implements Callback {
    private finalScramCredential String usernamescramCredential;
    private String salt;
    private byte[] serverKey;
    private byte[] storedKey;
    private int iterationCount;
    public ScramCredentialCallback(String username) {
        this.username = username;
    }
    public String usernamepublic ScramCredential scramCredential() {
        return username;
    }
    public String salt() {
        return saltscramCredential;
    }
    public void saltscramCredential(StringScramCredential saltscramCredential) {
        this.saltscramCredential = saltscramCredential;
    }
    public byte[] serverKey() {
        return serverKey;
    }
    public void serverKey(byte[] serverKey) {
        this.serverKey = serverKey;
    }
    public byte[] storedKey() {
        return storedKey;
    }
    public void storedKey(byte[] storedKey) {
        this.storedKey = storedKey;
    }
    public int iterationCount() {
        return iterationCount;
    }
    public void iterationCount(int iterationCount) {
        this.iterationCount = iterationCount;
    }
}

 

Proposed Changes

ChannelBuilder  will create an instance of each configured callback handler using the default constructor. For mechanisms without a callback handler override, the existing default callback handlers (SaslServerCallbackHandler/SaslClientCallbackHandler) will be created. Callback handler instances will be created once for each enabled mechanism in ChannelBuilder, instead of per-connection. This enables callback handlers using external authentication servers to cache credentials or reuse connections if required. SaslClientCallbackHandler will be modified to obtain Subject using Subject.getSubject(AccessController.getContext()) to avoid the current per-connection state.

Scenarios 

...

Use an external authentication server for SASL/PLAIN authentication using the SaslServer implementation for PLAIN included in Kafka 

 Define a new class that implements AuthCallbackHandler  which handles PlainCredentialCallback NameCallback and PlainAuthenticateCallback and add the class to the broker's sasl.callback.handlers property. A single instance of this callback handler will be created for the broker. The configured callback handler is responsible for validating the password provided by clients . SASL/SCRAM can be configured in a similar way with a callback handler that implements ScramCredentialCallbackand this may use an external authentication server.

Use custom credential store instead of Zookeeper for storing SCRAM credentials

Set broker callback handler to a class that implements AuthCallbackHandler  which handles NameCallback and ScramCredentialCallback. SCRAM credentials from a custom store can be returned by the callback handler.

Use a custom SaslServer implementation for SCRAM

...