You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current state: "Under Discussion"

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The goal is to add kerberos capability to Kafka brokers , to start a Kafka broker with valid
kerberos ticket and accept sasl connections from client with a valid kerberos ticket.

Public Interfaces

  • Channel is a wrapper for SocketChannel providing necessary handshake methods and also read(ByteBuffer buf) , write(ByteBuffer buf), write(ByteBuffer[] buf).
  • GSSChannel in similar to the work done for SSL here https://issues.apache.org/jira/browse/KAFKA-1684  , provides the necessary read, write operations .
  • GSSServerChannel extends GSSChannel to provide server side handshake methods
  • GSSBlockingClientChannel extends GSSChannel to provide client side blocking handshake methods. This will be used by BlockingChannel.scala
  • GSSClientChannel extends GSSChannel to provide non-blocking client side handshake methods. This will be used by new producer and consumer .

  • User: This class will be used to get the remoteUserId and add it to the Session Object (https://issues.apache.org/jira/browse/KAFKA-1683)
  • KafkaPrincipalToLocalPlugin: This is a pluggable class with a default implementation which translates a kerberos principal which looks like "testuser/node1.test.com@EXAMPLE.COM" to "testuser" . Users can provide a their own customized version of PrincipalToLocalPlugin.
  • AuthUtils: This class will consists of any utilities needed for SASL and other auth related methods.
  • KerberosLoginManager: This is a singleton object . It will use jaas config to login and generates a subject. 
  • Protocol accepts the protocol type (PLAINTEXT, KERBEROS, SSL)
  • BlockingChannel interface changes as it accepts the Protocol to create appropriate channels.

Proposed Changes

we will be using GSS-API to provide authentication and data security services in connection oriented protocols. 

 

As part of Kerberos/GSS-API implementation we will be using JAAS config to read kerberos ticket and authenticate. More info on JAAS Config

http://docs.oracle.com/javase/7/docs/technotes/guides/security/jaas/JAASRefGuide.html

Proposed JAAS Login config file will look like this.

Users needs to pass -Djava.security.auth.login.config=kafka_jaas.conf as part of JVM params .

Jaas Config
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/keytabs/kafka.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka" // this will be used to connect to other brokers for replica management and also controller requests. This should be set to whatever principal that kafka brokers are running.
principal="kafka/_HOST@EXAMPLE.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/vagrant/keytabs/storm.keytab"
storeKey=true
useTicketCache=false
serviceName="zookeeper"
principal="kafka@EXAMPLE.COM";
}


KafkaServer will be used to authenticate Kafka broker against kerberos
and Client section will be used for zkClient to access kerberos enabled zookeeper cluster.

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/keytabs/kafka.keytab"
storeKey=true
useTicketCache=true
serviceName="kafka"
principal="kafkaproducer/_HOST@EXAMPLE.COM";
};
 
The above config is for any client ( producer, consumer) connecting to kerberos enabled Kafka cluster.
Here serviceName must match the principal name used under KafkaServer.
 

Channel

package org.apache.kafka.common.network;
 
public class Channel implements ReadableByteChannel, GatheringByteChannel {
   private UserPrincipal userPrincipal = new UserPrincipal("ANONYMOUS");
   
   public Channel(SocketChannel socketChannel) throws IOException 
  
   /**
     * returns user principal for the session
     * Incase of PLAINTEXT returns ANONYMOUS as the UserPrincipal
     */
    public UserPrincipal getUserPrincipal() {
        return userPrincipal;
    }
    
    @Override
    public int write(ByteBuffer src) throws IOException
 
    @Override
    public int read(ByteBuffer dst) throws IOException
 
    /* returns the socketChannel */
    public SocketChannel getIOChannel()
 
	/* incase of PLAINTEXT channel this always returns true */
    public boolean isHandshakeComplete()
 
     /**
     * Performs SSL or GSS-API handshake hence is a no-op for the non-secure
     * implementation
     * @param read Unused in non-secure implementation
     * @param write Unused in non-secure implementation
     * @return Always return 0
     * @throws IOException
    */
    public int handshake(boolean read, boolean write) throws IOException {
        return 0;
    }


}

 

GSSChannel

package org.apache.kafka.common.network;
 
public class GSSChannel extends Channel {


 public GSSChannel(SocketChannel socketChannel, Subject subject) throws IOException 


@Overrride
public UserPrincipal getUserPrincipal()


 @Override
 public int handshake(boolean read, boolean write) throws IOException 


 @Override
 public int read(ByteBuffer dst) throws IOException 


 @Override
 public int write(ByteBuffer src) throws IOException 



 @Override
 public long write(ByteBuffer[] srcs, int offset, int length) throws IOException 


@Override
 public long write(ByteBuffer[] srcs) throws IOException


@Override
public int handshake(boolean write, boolean read) throws IOException


@Override
 public boolean isHandshakeComplete() 


}

 

GSSServerChannel

public class GSSServerChannel extends GSSChannel {
   public GSSServerChannel(SocketChannel socketChannel, Subject subject) throws IOException 
   
   @Override
    public int handshake(boolean read, boolean write) throws IOException
}
 
 

GSSBlockingClientChannel (performs a blocking handshake)


public class GSSBlockingClientChannel extends GSSChannel {
   public GSSBlockingClientChannel(SocketChannel socketChannel, Subject subject) throws IOException 
   
   @Override
    public int handshake(boolean read, boolean write) throws IOException
}
 
 

GSSClientChannel (non-blocking)

public class GSSClientChannel extends GSSChannel {
   public GSSClientChannel(SocketChannel socketChannel, Subject subject) throws IOException 
   
   @Override
    public int handshake(boolean read, boolean write) throws IOException
}
 
 

AuthUtils

public class AuthUtils {
   /**
     * Construct a JAAS configuration object per kafka jaas configuration file
     * @param storm_conf Storm configuration
     * @return JAAS configuration object
     */
    public static Configuration getConfiguration(String jaasConfigFilePath)
 
    public static String getJaasConfig(String loginContextName, String key) throws IOException
 
    public static String getDefaultRealm()
 
}
 
 

 

Login

package org.apache.kafka.common.security.kerberos;
 
public class Login {
private volatile Subject subject = null;
 
//logs in based on jaas conf and starts a thread to renew it .
public Login(final String loginContextName)
            throws LoginException 
 
// returns generated subject after the login.
public Subject getSubject() 


}
 

KerberosLoginManager

package kafka.common
 
object KerberosLoginManager {
   var kerberosLogin: Login 
 
   //does kerberos login and generates the subject
   def init()
 
   // returns kerberosLogin.getSubject()
   def subject()
 
   def shutdown()
   
}

 

Protocol

package kafka.common
 
/*
 * In case of PLAINTEXT encryptionEnable will be false and for SSL encryptionEnable will always be true
 * For KERBEROS clients can request encryption or not
 */
case class Protocol(securityProtocol: SecurityProtocol, encryptionEnable: Boolean)

BlockingChannel changes

class BlockingChannel( val host: String,
                       val port: Int,
                       val readBufferSize: Int,
                       val writeBufferSize: Int,
                       val readTimeoutMs: Int,
                       val protocol: Protocol = Protocol(SecurityProtocol.PLAINTEXT, false))
 


GSS-API Authentication exchange

KafkaClient

1) KafkaClient picks the principal it wants to use by looking at KafkaClient jaas config (example above).

2) Authenticates against KDC (kerberos) using the KafkaClient principal.

3) KafkaClient constructs the service principal name based on the jaas config serviceName

4) KafkaClient initiates challenge/response with KafkaBroker along with KafkaClient principal and service principal . Depending on the KafkaBroker response these challenge/response might continue until it receives COMPLETE from the KafkaBroker.

KafkaBroker

1) KafkaBroker will accept the connection and accepts the client and service principal

2) checks if the service principal is same as the one KafkaBroker running with.

3) KafkaBroker accepts/rejects the clients token

4) Returns the response to the client if its authenticated or not.

5) Once client is authenticated they can send Kafka requests.

Compatibility, Deprecation, and Migration Plan

As per previous security discussions and multiport work being done as part of this JIRA Unable to render Jira issues macro, execution error. ,

Users/Clients can still communicate with non-secure/non-sasl kafka brokers.

Open Questions

1) Introducing kerberos.enable=true for KafkaConfig , ProducerConfig, ConsumerConfig. Is this ok ?

2) Users should be configuring -Djava.security.auth.login.config=jass_file.conf . after setting kerberos.enable to true in server and also in clients. Any objections on this approach?

3) We need to pass  Protocol to ReplicaManager -> ReplicaFetcherManager -> ReplicaFetcherThread -> SimpleConsumer -> BlockingChannel.  This is a necessary as BlockingChannel can based on the 

Protocol.securityProtocol initiate the appropriate Channel described earlier.  Since KerberosLoginManager is a singleton , BlockingChannel can grab Subject from KerberosLoginManager and pass it onto

GSSClientChannel to do the handshakes as PrivilegedAction . Any alternatives or objections on current approach.

4) Similarly Producer and Consumer does KerberosLoginManager.init() with "KafkaClient" section  if kerberos.enable set to true and also passes Protocol to BlockingChannel.  Any alternatives or objections on this approach?

Rejected Alternatives

None




  • No labels