Table of Contents |
---|
Status
Current state: "Under Discussion"
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The goal is to add kerberos sasl authentication capability to Kafka brokers , to start a Kafka broker with valid
kerberos ticket and accept sasl connections from client with a valid kerberos ticketand provide ssl for encryption.
Public Interfaces
- Channel is a wrapper for SocketChannel TransportLayer and AuthenticationLayer providing necessary handshake and authentication methods and also read(ByteBuffer buf) , write(ByteBuffer buf), write(ByteBuffer[] buf).
- GSSChannel in similar to the work done for SSL here https://issues.apache.org/jira/browse/KAFKA-1684 , provides the necessary read, write operations .
- GSSServerChannel extends GSSChannel to provide server side handshake methods
- GSSBlockingClientChannel extends GSSChannel to provide client side blocking handshake methods. This will be used by BlockingChannel.scala
- TransportLayer is an interface for network transportLayer.
- PlainTextTransportLayer provides plain text socket channel methods
- SSLTransportLayer providers ssl handshake and read/write methods.
- AuthenticationLayer is an interface to providing client/server authentication.
- SaslServerAuthenticationLayer implements AuthenticationLayer , provides authentication methods for server side
- SaslClientAuthenticationLayer implements AuthenticationLayer , provides client side authentication.
- GSSClientChannel extends GSSChannel to provide non-blocking client side handshake methods. This will be used by new producer and consumer .
User: This class will be used to get the remoteUserId and add it to the Session Object (https://issues.apache.org/jira/browse/KAFKA-1683) - KafkaPrincipalToLocalPlugin: This is a pluggable class with a default implementation which translates a kerberos principal which looks like "testuser/node1.test.com@EXAMPLE.COM" to "testuser" . Users can provide a their own customized version of PrincipalToLocalPlugin.
- AuthUtils: This class will consists of any utilities needed for SASL and other auth related methods.
- KerberosLoginManagerKerberosLoginFactory: This is a singleton object . It will use jaas config to login and generates a subject.
- Protocol accepts the protocol type (PLAINTEXT, KERBEROS, SSL)
- SecurityConfig , a config file for provider SecurityProtocol, SSL config and SASL mechanisms.
- BlockingChannel BlockingChannel interface changes as it accepts the Protocol to create appropriate channels.
Proposed Changes
we will be using GSS-API SASL to provide authentication and data security services SSL to provider encryption in connection oriented protocols.
As part of Kerberos/GSS-API SASL implementation we will be using JAAS config to read kerberos ticket and authenticate. More info on JAAS Config
...
Code Block | ||||
---|---|---|---|---|
| ||||
KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/keytabs/kafka.keytab" storeKey=true useTicketCache=false serviceName="kafka" // this will be used to connect to other brokers for replica management and also controller requests. This should be set to whatever principal that kafka brokers are running. principal="kafka/_HOST@EXAMPLE.COM"; }; Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/keytabs/kafka.keytab" storeKey=true useTicketCache=false serviceName="zookeeper" principal="kafka@EXAMPLE.COM"; } KafkaServer will be used to authenticate Kafka broker against kerberos and Client section will be used for zkClient to access kerberos enabled zookeeper cluster. KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/keytabs/kafkaclient.keytab" storeKey=true useTicketCache=false serviceName="kafka" principal="kafkaproducer/_HOST@EXAMPLE.COM"; }; The above config is for any client ( producer, consumer) connecting to kerberos enabled Kafka cluster. Here serviceName must match the principal name used under KafkaServer. |
Channel
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.common.network; public class Channel implements ReadableByteChannelScatteringByteChannel, GatheringByteChannel { private UserPrincipal userPrincipal = new UserPrincipal("ANONYMOUS"); public Channel(SocketChannel socketChannel TransportLayer transportLayer; private Authenticator authenticator; public Channel(TransportLayer transportLayer, Authenticator authenticator) throws IOException /** * returns user principal for the session * Incase of PLAINTEXT and No Authentication returns ANONYMOUS as the UserPrincipaluserPrincipal * If SSL used without any SASL Authentication returns SSLSession.peerPrincipal */ public UserPrincipal getUserPrincipaluserPrincipal() { return authenticator.userPrincipal(); } @Override/** public int* write(ByteBuffer src) throws IOException starts transportLayer handshake @Override * If transportLayer publichandshake int read(ByteBuffer dst) throws IOException finishes, than initiates AuthenticationLayer.authenticate /* returns@returns the0 socketChannelif */ handshake and authentication finishes publicotherwise returns SocketChannel getIOChannel() /* incase of PLAINTEXT channel this always returns true appropriate SelctionKey.OP */ public int connect(boolean isHandshakeComplete() /**read, boolean write) throws IOException * Performs SSL or GSS-API handshake hence is a no-op for the non-secure/** * * implementation/ public *void @param read Unused in non-secure implementation blockingConnect(long timeout) throws IOException @Override * @parampublic int write Unused in non-secure implementation(ByteBuffer src) throws IOException @Override public *int @return Always return 0read(ByteBuffer dst) throws IOException /* @throwsreturns IOException the socketChannel */ public intSocketChannel handshake(boolean read, boolean writesocketChannel(); @Override public long write(ByteBuffer[] srcs) throws IOException { @Override public long write(ByteBuffer[] srcs, return 0; } } |
GSSChannel
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.common.network;
public class GSSChannel extends Channel {
public GSSChannel(SocketChannel socketChannel, Subject subject) throws IOException
@Overrride
public UserPrincipal getUserPrincipal()
@Override
public int handshake(boolean read, boolean write) throws IOException
@Override
public int read(ByteBuffer dst) throws IOException
@Override
public int write(ByteBuffer src) throws IOException
@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException
@Override
public long write(ByteBuffer[] srcs) throws IOException
@Override
public int handshake(boolean write, boolean read) throws IOException
@Override
public boolean isHandshakeComplete()
}
|
GSSServerChannel
Code Block | ||
---|---|---|
| ||
public class GSSServerChannel extends GSSChannel {
public GSSServerChannel(SocketChannel socketChannel, Subject subject) throws IOException
@Override
public int handshake(boolean read, boolean write) throws IOException
}
|
GSSBlockingClientChannel (performs a blocking handshake)
Code Block | ||
---|---|---|
| ||
public class GSSBlockingClientChannel extends GSSChannel {
public GSSBlockingClientChannel(SocketChannel socketChannel, Subject subject) throws IOException
@Override
public int handshake(boolean read, boolean write) throws IOException
}
|
GSSClientChannel (non-blocking)
Code Block | ||
---|---|---|
| ||
public class GSSClientChannel extends GSSChannel {
public GSSClientChannel(SocketChannel socketChannel, Subject subject) throws IOException
@Override
public int handshake(boolean read, boolean write) throws IOException
}
|
AuthUtils
Code Block | ||
---|---|---|
| ||
public class AuthUtils {
/**
* Construct a JAAS configuration object per kafka jaas configuration file
* @param storm_conf Storm configuration
* @return JAAS configuration object
*/
public static Configuration getConfiguration(String jaasConfigFilePath)
public static String getJaasConfig(String loginContextName, String key) throws IOException
public static String getDefaultRealm()
}
|
Login
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.common.security.kerberos;
public class Login {
private volatile Subject subject = null;
//logs in based on jaas conf and starts a thread to renew it .
public Login(final String loginContextName)
throws LoginException
// returns generated subject after the login.
public Subject getSubject()
}
|
KerberosLoginManager
Code Block | ||
---|---|---|
| ||
package kafka.common
object KerberosLoginManager {
var kerberosLogin: Login
//does kerberos login and generates the subject
def init()
// returns kerberosLogin.getSubject()
def subject()
def shutdown()
} |
Protocol
Code Block | ||
---|---|---|
| ||
package kafka.common
/*
* In case of PLAINTEXT encryptionEnable will be false and for SSL encryptionEnable will always be true
* For KERBEROS clients can request encryption or not
*/
case class Protocol(securityProtocol: SecurityProtocol, encryptionEnable: Boolean) |
BlockingChannel changes
Code Block | ||
---|---|---|
| ||
class BlockingChannel( val host: String,
val port: Int,
val readBufferSize: Int,
val writeBufferSize: Int,
val readTimeoutMs: Int,
val protocol: Protocol = Protocol(SecurityProtocol.PLAINTEXT, false))
|
...
int offset, int length) throws IOException
@Override
public int read(ByteBuffer dst) throws IOException
@Override
public long read(ByteBuffer[] dsts) throws IOException
@Override
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException
public boolean finishConnect() throws IOException
public DataInputStream getInputStream() throws IOException
public DataOutputStream getOutputStream() throws IOException
public void close() throws IOException
} |
TransportLayer
Code Block | ||
---|---|---|
| ||
public interface TransportLayer {
/**
* Closes this channel
*
* @throws IOException If and I/O error occurs
*/
void close() throws IOException;
/**
* Tells wheather or not this channel is open.
*/
boolean isOpen();
/**
* Writes a sequence of bytes to this channel from the given buffer.
*/
int write(ByteBuffer src) throws IOException;
long write(ByteBuffer[] srcs) throws IOException;
long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
int read(ByteBuffer dst) throws IOException;
long read(ByteBuffer[] dsts) throws IOException;
long read(ByteBuffer[] dsts, int offset, int length) throws IOException;
boolean isReady();
boolean finishConnect() throws IOException;
SocketChannel socketChannel();
/**
* Performs SSL handshake hence is a no-op for the non-secure
* implementation
* @param read Unused in non-secure implementation
* @param write Unused in non-secure implementation
* @return Always return 0
* @throws IOException
*/
int handshake(boolean read, boolean write) throws IOException;
DataInputStream inStream() throws IOException;
DataOutputStream outStream() throws IOException;
boolean flush(ByteBuffer buffer) throws IOException;
Principal getPeerPrincipal();
} |
PlainTextTransportLayer
Code Block | ||
---|---|---|
| ||
public class PlainTextTransportLayer implements TransportLayer {
public PlainTextTransportLayer(SocketChannel socketChannel) throws IOException
}
|
SSLTransportLayer
Code Block | ||
---|---|---|
| ||
public class SSLTransportLayer implements TransportLayer {
public SSLTransportLayer(SocketChannel socketChannel, SSLEngine sslEngine) throws IOException
}
|
AuthenticationLayer
Code Block | ||
---|---|---|
| ||
public interface Authenticator {
/**
* Closes any resources
*
* @throws IOException if any I/O error occurs
*/
void close() throws IOException;
/**
*
* @throws IOException
*/
void init() throws IOException;
/**
* Returns UserPrincipal after authentication is established
*/
UserPrincipal userPrincipal();
/**
* Does authentication in non-blocking way and returns SelectionKey.OP if further communication needed
*/
int authenticate(boolean read, boolean write) throws IOException;
/**
* returns true if authentication is complete otherwise returns false;
*/
boolean isComplete();
}
|
DefaultAuthenticationLayer
Code Block | ||
---|---|---|
| ||
public class DefaultAuthenticator implements Authenticator {
TransportLayer transportLayer;
public DefaultAuthenticator(TransportLayer transportLayer) {
this.transportLayer = transportLayer;
}
public void init() {}
public int authenticate(boolean read, boolean write) throws IOException {
return 0;
}
/** returns peer host incase of SSL */
public UserPrincipal userPrincipal() {
return new UserPrincipal(transportLayer.getPeerPrincipal().toString());
}
public void close() throws IOException {}
public boolean isComplete() {
return true;
}
}
|
AuthUtils
Code Block | ||
---|---|---|
| ||
public class AuthUtils {
/**
* Construct a JAAS configuration object per kafka jaas configuration file
* @param storm_conf Storm configuration
* @return JAAS configuration object
*/
public static Configuration getConfiguration(String jaasConfigFilePath)
public static String getJaasConfig(String loginContextName, String key) throws IOException
public static String getDefaultRealm()
}
|
Login
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.common.security.kerberos;
public class Login {
private volatile Subject subject = null;
//logs in based on jaas conf and starts a thread to renew it .
public Login(final String loginContextName)
throws LoginException
// returns generated subject after the login.
public Subject getSubject()
}
|
KerberosLoginManager
Code Block | ||
---|---|---|
| ||
package kafka.common
object KerberosLoginManager {
var kerberosLogin: Login
//does kerberos login and generates the subject
def init()
// returns kerberosLogin.getSubject()
def subject()
def shutdown()
} |
SASL Authentication exchange
KafkaClient
1) KafkaClient picks the principal it wants to use by looking at KafkaClient jaas config (example above).
...
4) KafkaClient initiates challenge/response with KafkaBroker along with KafkaClient principal and service principal . Depending on the KafkaBroker response these challenge/response might continue until it receives COMPLETE from the KafkaBroker.
KafkaBroker
1) KafkaBroker will accept the connection and accepts the client and service principal
2) checks if the service principal is same as the one KafkaBroker running with.
3) KafkaBroker accepts/rejects the clients token
4) Returns the response to the client if its authenticated or not.
5) Once client and service principal
2) checks if the service principal is same as the one KafkaBroker running with.
3) KafkaBroker accepts/rejects the clients token
4) Returns the response to the client if its authenticated or not.
5) Once client is authenticated they can send Kafka requests.
Compatibility, Deprecation, and Migration Plan
As per previous security discussions and multiport work being done as part of this JIRA
Jira | ||||
---|---|---|---|---|
|
Users/Clients can still communicate with non-secure/non-sasl kafka brokers.
Open Questions
1) Introducing kerberos.enable=true for KafkaConfig , ProducerConfig, ConsumerConfig. Is this ok ?
2) Users should be configuring -Djava.security.auth.login.config=jass_file.conf . after setting kerberos.enable to true in server and also in clients. Any objections on this approach?
3) We need to pass Protocol to ReplicaManager -> ReplicaFetcherManager -> ReplicaFetcherThread -> SimpleConsumer -> BlockingChannel. This is a necessary as BlockingChannel can based on the
Protocol.securityProtocol initiate the appropriate Channel described earlier. Since KerberosLoginManager is a singleton , BlockingChannel can grab Subject from KerberosLoginManager and pass it onto
GSSClientChannel to do the handshakes as PrivilegedAction . Any alternatives or objections on current approach.
4) Similarly Producer and Consumer does KerberosLoginManager.init() with "KafkaClient" section if kerberos.enable set to true and also passes Protocol to BlockingChannel. Any alternatives or objections on this approach?
Rejected Alternatives
is authenticated they can send Kafka requests.
Compatibility, Deprecation, and Migration Plan
As per previous security discussions and multiport work being done as part of this JIRA
Jira | ||||
---|---|---|---|---|
|
Users/Clients can still communicate with non-secure/non-sasl kafka brokers.
Open Questions
1) Users should be configuring -Djava.security.auth.login.config=jass_file.conf . after setting kerberos.enable to true in server and also in clients. Any objections on this approach?
2) We need to pass SecurityConfig to ReplicaManager -> ReplicaFetcherManager -> ReplicaFetcherThread -> SimpleConsumer -> BlockingChannel. This is a necessary as BlockingChannel can based on the
Protocol.securityProtocol initiate the appropriate Channel described earlier. Any better approach passing down this information to BlockingChannel to create appropriate TransportLayer and AuthenticationLayer.
3) Similarly Producer and Consumer does KerberosLoginManager.init() with "KafkaClient" section if kerberos.enable set to true and also passes Protocol to BlockingChannel. Any alternatives or objections on this approach?
Rejected Alternatives
1) Using GSS-API as an authentication layer and security layer. This can only be used with Kerberos.
2) Using SASL to provide encryption of network data. After talking to other projects like HDFS (HBASE as well) they noticed 10x slow down when encryption is enabled on SASL. Hence the reason to separate transportLayer and authentication layer . SASL as authentication users can choose PLAINTEXT for no encryption and SSL for encryption and still be performant.
3) Using TLS for kerberos authentication. https://tools.ietf.org/html/rfc6251 . In this case clients expects server to be running with "host/hostname@realm" keytab. Here "host" cannot be changed.
HDFS noticed security issues as well with TLS for kerberos.
None