Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Channel wrapper for TransportLayer and AuthenticationLayer providing necessary handshake and authentication methods and also read(ByteBuffer buf) , write(ByteBuffer buf), write(ByteBuffer[] buf).
  • TransportLayer is an interface for network transportLayer.
  • PlainTextTransportLayer provides plain text socket channel methods
  • SSLTransportLayer providers ssl handshake and read/write methods.
  • AuthenticationLayer Authenticator is an interface to providing  client/server authentication.
  • SaslServerAuthenticationLayer implements AuthenticationLayer , provides  authentication methods for server side
  • SaslClientAuthenticationLayer implements AuthenticationLayer , provides client side authentication.


  • User: This class will be used to get the remoteUserId and add it to the Session Object (https://issues.apache.org/jira/browse/KAFKA-1683)
  • KafkaPrincipalToLocalPlugin: This is a pluggable class with a default implementation which translates a kerberos principal which looks like "testuser/node1.test.com@EXAMPLE.COM" to "testuser" . Users can provide a their own customized version of PrincipalToLocalPlugin.
  • AuthUtils: This class will consists of any utilities needed for SASL and other auth related methods.
  • KerberosLoginFactory:  It will use jaas config to login and generates a subject. 
  • Protocol accepts the protocol type (PLAINTEXT, SSL)
  • SecurityConfig , a config file for provider SecurityProtocol,  SSL config and SASL mechanisms.
  • BlockingChannel interface changes as it accepts the Protocol to create appropriate channels.

...

Users needs to pass -Djava.security.auth.login.config=kafka_jaas.conf as part of JVM params .

This JAAS file along with Login.java can be used to login into LDAP or KERBEROS etc.. 

Here are some details on LdapLoginModule for JAAS https://docs.oracle.com/javase/8/docs/jre/api/security/jaas/spec/com/sun/security/auth/module/LdapLoginModule.html 

Code Block
languagejava
titleJaas Config
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule 
Code Block
languagejava
titleJaas Config
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/keytabs/kafka.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka" // this will be used to connect to other brokers for replica management and also controller requests. This should be set to whatever principal that kafka brokers are running.
principal="kafka/_HOST@EXAMPLE.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/keytabs/kafka.keytab"
storeKey=true
useTicketCache=false
serviceName="zookeeper"
principal="kafka@EXAMPLE.COM";
}


KafkaServer will be used to authenticate Kafka broker against kerberos
and Client section will be used for zkClient to access kerberos enabled zookeeper cluster.

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/keytabs/kafkaclient.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka"
principal="kafkaproducer/_HOST@EXAMPLE.COM";
};
 
The above config is for any client ( producer, consumer) connecting to kerberos enabled Kafka cluster.
Here serviceName must match the principal name used under KafkaServer.
 

...

Code Block
languagejava
public class SSLTransportLayer implements TransportLayer {
	public SSLTransportLayer(SocketChannel socketChannel, SSLEngine sslEngine) throws IOException 
}  
 

...

 Authenticator

Code Block
languagejava
public interface Authenticator {

    /**
     * Closes any resources
     *
     * @throws IOException if any I/O error occurs
     */
    void close() throws IOException;

    /**
     *
     * @throws IOException
     */
    void init() throws IOException;

    /**
     * Returns UserPrincipal after authentication is established
     */
    UserPrincipal userPrincipal();


    /**
     * Does authentication in non-blocking way and returns SelectionKey.OP if further communication needed
     */
    int authenticate(boolean read, boolean write) throws IOException;

    /**
     * returns true if authentication is complete otherwise returns false;
     */

    boolean isComplete();

}
 
 

...

DefaultAuthenticator

Code Block
languagejava
public class DefaultAuthenticator implements Authenticator {

    TransportLayer transportLayer;

    public DefaultAuthenticator(TransportLayer transportLayer) {
        this.transportLayer = transportLayer;
    }

    public void init() {}

    public int authenticate(boolean read, boolean write) throws IOException {
        return 0;
    }

    /** returns peer host incase of SSL */
    public UserPrincipal userPrincipal() {
        return new UserPrincipal(transportLayer.getPeerPrincipal().toString());
    }

    public void close() throws IOException {}

    public boolean isComplete() {
        return true;
    }
}
  
 

SaslServerAuthenticator

Code Block
languagejava
public class SaslServerAuthenticator implements Authenticator {

    public SaslServerAuthenticator(final Subject subject, TransportLayer transportLayer) {
    }
}
  
 

SaslClientAuthenticator

Code Block
languagejava
public class SaslServerAuthenticator implements Authenticator {
   public SaslServerAuthenticator(final Subject subject, TransportLayer transportLayer) {
   }
}
  
 

Callbacks  for Login , SaslServer, SaslClient

    Callbacks for the above modules will help in grabbing the users authentication information.

AuthUtils

Code Block
languagejava
public class AuthUtils {
   /**
     * Construct a JAAS configuration object per kafka jaas configuration file
     * @param storm_conf Storm configuration
     * @return JAAS configuration object
     */
    public static Configuration getConfiguration(String jaasConfigFilePath)
 
    public static String getJaasConfig(String loginContextName, String key) throws IOException
 
    public static String getDefaultRealm()
 
}
 
 

...