Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under DiscussionAccepted"

Discussion thread: here

JIRA: here

...

  • Channel wrapper for TransportLayer and AuthenticationLayer providing necessary handshake and authentication methods and also read(ByteBuffer buf) , write(ByteBuffer buf), write(ByteBuffer[] buf).
  • TransportLayer is an interface for network transportLayer.
  • PlainTextTransportLayer provides plain text socket channel methods
  • SSLTransportLayer providers provides ssl handshake and read/write methods.
  • Authenticator is an interface to providing  providing client/server authentication.
  • SaslServerAuthenticationLayer implements AuthenticationLayer, provides  provides authentication methods for server side.
  • SaslClientAuthenticationLayer implements AuthenticationLayer, provides client side authentication.


  • User: This class will be used to get the remoteUserId and add it to the Session Object (https://issues.apache.org/jira/browse/KAFKA-1683)
  • KafkaPrincipalToLocalPlugin: This is a pluggable class with a default implementation which translates a kerberos principal which looks like "testuser/node1.test.com@EXAMPLE.COM" to "testuser". Users can provide a their own customized version of PrincipalToLocalPlugin.
  • AuthUtils: This class will consists of any utilities needed for SASL and other auth related methods.
  • KerberosLoginFactory:  It will use jaas config to login and generates a subject. 
  • Protocol accepts the protocol type (PLAINTEXT, SSL )SecurityConfig , a config file for provider SecurityProtocol, PLAINTEXT+SASL,  SSL config and +SASL mechanisms.
  • BlockingChannel interface changes as it accepts the Protocol to create appropriate channels.

Proposed Changes

we will be using SASL to provide authentication and SSL to provider encryption in connection oriented protocols. 

 

As part of SASL implementation we will be using JAAS config to read kerberos ticket and authenticate. More info on JAAS Config

http://docs.oracle.com/javase/7/docs/technotes/guides/security/jaas/JAASRefGuide.html

Proposed JAAS Login config file will look like this.

Users needs to pass -Djava.security.auth.login.config=kafka_jaas.conf as part of JVM params .

This JAAS file along with Login.java can be used to login into LDAP or KERBEROS etc.. 

Here are some details on LdapLoginModule for JAAS https://docs.oracle.com/javase/8/docs/jre/api/security/jaas/spec/com/sun/security/auth/module/LdapLoginModule.html 

  • )
    • PLAINTEXT (non-authenticated, non-encrypted)
      • This channel will provide exact behavior for communication channels as previous releases
    • SSL
      •  SSL  implementation. Authenticated principal in the session will be from the certificate presented or the peer host. 
    • SASL+PLAINTEXT
      • SASL authentication will be used over plaintext channel. Once the sasl authentication established between client and server . Session will have client’s principal as authenticated user. There won’t be any wire encryption in this case as all the channel communication will be over plain text .
    • SASL+SSL
      • SSL will be established initially and  SASL authentication will be done over SSL. Once SASL authentication is established users principal will be used as authenticated user .  This option is useful if users want to use SASL authentication ( for example kerberos ) with wire encryption.

          

         

  • SecurityConfig , a config file for provider SecurityProtocol,  SSL config and SASL mechanisms.
  • BlockingChannel interface changes as it accepts the Protocol to create appropriate channels.

Proposed Changes

we will be using SASL to provide authentication and SSL to provider encryption in connection oriented protocols. 

 

As part of SASL implementation we will be using JAAS config to read kerberos ticket and authenticate. More info on JAAS Config

http://docs.oracle.com/javase/7/docs/technotes/guides/security/jaas/JAASRefGuide.html

Proposed JAAS Login config file will look like this.

Users needs to pass -Djava.security.auth.login.config=kafka_jaas.conf as part of JVM params .

This JAAS file along with Login.java can be used to login into LDAP or KERBEROS etc.. 

Here are some details on LdapLoginModule for JAAS https://docs.oracle.com/javase/8/docs/jre/api/security/jaas/spec/com/sun/security/auth/module/LdapLoginModule.html 

Code Block
languagejava
titleJaas Config
KafkaServer {
com.sun
Code Block
languagejava
titleJaas Config
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/keytabs/kafka.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka" // this will be used to connect to other brokers for replica management and also controller requests. This should be set to whatever principal that kafka brokers are running.
principal="kafka/_HOST@EXAMPLE.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/keytabs/kafka.keytab"
storeKey=true
useTicketCache=false
serviceName="zookeeper"
principal="kafka@EXAMPLE.COM";
}


KafkaServerkafka" // this will be used to authenticateconnect Kafkato brokerother againstbrokers kerberos
and Client section willfor replica management and also controller requests. This should be usedset forto zkClientwhatever toprincipal accessthat kerberoskafka enabledbrokers zookeeperare clusterrunning.

KafkaClientprincipal="kafka/_HOST@EXAMPLE.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/keytabs/kafkaclientkafka.keytab"
storeKey=true
useTicketCache=false
serviceName="kafkazookeeper"
principal="kafkaproducer/_HOST@EXAMPLEkafka@EXAMPLE.COM";
};

 
TheKafkaServer abovewill configbe isused forto anyauthenticate clientKafka (broker producer, consumer) connecting to kerberos enabled Kafka cluster.
Here serviceName must match the principal name used under KafkaServer.
 

Channel

against kerberos
and Client section will be used for zkClient to access kerberos enabled zookeeper cluster.

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/keytabs/kafkaclient.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka"
principal="kafkaproducer/_HOST@EXAMPLE.COM";
};
 
The above config is for any client ( producer, consumer) connecting to kerberos enabled Kafka cluster.
Here serviceName must match the principal name used under KafkaServer.
 

Channel

Code Block
languagejava
package org.apache.kafka.common.network;
 
public class Channel implements ScatteringByteChannel, GatheringByteChannel {
   private TransportLayer transportLayer;
   private Authenticator authenticator;

   public Channel(TransportLayer transportLayer, Authenticator authenticator) throws IOException 
  
   /**
    * returns user principal for the session
    * Incase of PLAINTEXT and No Authentication returns ANONYMOUS as the userPrincipal
    * If SSL used without any SASL Authentication returns SSLSession.peerPrincipal
    */
    public UserPrincipal userPrincipal() {
       return authenticator.userPrincipal();
    }

    /**
     * starts transportLayer handshake
     * If transportLayer handshake finishes, than initiates AuthenticationLayer.authenticate
     * @returns 0 if handshake and authentication finishes otherwise returns appropriate SelctionKey.OP
     */
    public int connect(boolean read, boolean write) throws IOException 
    
    /**
Code Block
languagejava
package org.apache.kafka.common.network;
 
public class Channel implements ScatteringByteChannel, GatheringByteChannel {
   private TransportLayer transportLayer;
   private Authenticator authenticator;

   public Channel(TransportLayer transportLayer, Authenticator authenticator) throws IOException 
  
   /**
    * returns user principal for the session
    * Incase of PLAINTEXT and No Authentication returns ANONYMOUS as the userPrincipal
    * If SSL used without any SASL Authentication returns SSLSession.peerPrincipal
    */
    public UserPrincipal userPrincipal() {
       return authenticator.userPrincipal();
    }

    /**
     * starts transportLayer handshake
     * If transportLayer handshake finishes, than initiates AuthenticationLayer.authenticate
     * @returns 0 if handshake and authentication finishes otherwise returns appropriate SelctionKey.OP
     */
    public int connect(boolean read, boolean write) throws IOException 
    
    /**
     * 
     */
    public void blockingConnect(long timeout) throws IOException
  
    @Override
    public int write(ByteBuffer src) throws IOException
 
    @Override
    public int read(ByteBuffer dst) throws IOException
 
    /* returns the socketChannel */
    public SocketChannel socketChannel();
 
	@Override
    public long write(ByteBuffer[] srcs) throws IOException 
   
    @Override
    public long write(ByteBuffer[] srcs, int offset, int length) throws IOException

    @Override
    public int read(ByteBuffer dst) throws IOException

    @Override
    public long read(ByteBuffer[] dsts) throws IOException 

    @Override
    public long read(ByteBuffer[] dsts, int offset, int length) throws IOException

    public boolean finishConnect() throws IOException
    
    public DataInputStream getInputStream() throws IOException

    public DataOutputStream getOutputStream() throws IOException

    public void close() throws IOException
    
 }

...

2) checks if the service principal is same as the one KafkaBroker running with.

3) KafkaBroker accepts/rejects the clients token

4) Returns the response to the client if its authenticated or not.

running with.

3) KafkaBroker accepts/rejects the clients token

4) Returns the response to the client if its authenticated or not.

5) Once client is authenticated they can send Kafka requests.

 

Security Config

SecurityConfig will be shared across clients and brokers. If not provided communication channels fall back to PLAINTEXT . Here are proposed configs

Code Block
languagejava
sasl.authentication.mechanism (KERBEROS will be supported for revision1) 
ssl.protocol
ssl.cipher.suites
ssl.enabled.protocols
ssl.keystore.type
ssl.keystore.location
ssl.keystore.password
ssl.key.password
ssl.truststore.type
ssl.truststore.location
ssl.truststore.password
ssl.client.require.cert
ssl.keymanager.algorithm
ssl.trustmanager.algorithm
 

 

 5) Once client is authenticated they can send Kafka requests.

Compatibility, Deprecation, and Migration Plan

...

1) Users should be configuring -Djava.security.auth.login.config=jass_file.conf . after setting kerberosauthentication.enable to true in server and also in clients. Any objections on this approach?

...

3) Similarly Producer and Consumer does KerberosLoginManager.init() with "KafkaClient" section  if kerberosauthentication.enable set to true and also passes Protocol to BlockingChannel.  Any alternatives or objections on this approach?

...