...
- Channel wrapper for TransportLayer and AuthenticationLayer providing necessary handshake and authentication methods and also read(ByteBuffer buf) , write(ByteBuffer buf), write(ByteBuffer[] buf).
- TransportLayer is an interface for network transportLayer.
- PlainTextTransportLayer provides plain text socket channel methods
- SSLTransportLayer providers ssl handshake and read/write methods.
- AuthenticationLayer Authenticator is an interface to providing client/server authentication.
- SaslServerAuthenticationLayer implements AuthenticationLayer , provides authentication methods for server side
- SaslClientAuthenticationLayer implements AuthenticationLayer , provides client side authentication.
- User: This class will be used to get the remoteUserId and add it to the Session Object (https://issues.apache.org/jira/browse/KAFKA-1683)
- KafkaPrincipalToLocalPlugin: This is a pluggable class with a default implementation which translates a kerberos principal which looks like "testuser/node1.test.com@EXAMPLE.COM" to "testuser" . Users can provide a their own customized version of PrincipalToLocalPlugin.
- AuthUtils: This class will consists of any utilities needed for SASL and other auth related methods.
- KerberosLoginFactory: It will use jaas config to login and generates a subject.
- Protocol accepts the protocol type (PLAINTEXT, SSL)
- SecurityConfig , a config file for provider SecurityProtocol, SSL config and SASL mechanisms.
- BlockingChannel interface changes as it accepts the Protocol to create appropriate channels.
...
Users needs to pass -Djava.security.auth.login.config=kafka_jaas.conf as part of JVM params .
This JAAS file along with Login.java can be used to login into LDAP or KERBEROS etc..
Here are some details on LdapLoginModule for JAAS https://docs.oracle.com/javase/8/docs/jre/api/security/jaas/spec/com/sun/security/auth/module/LdapLoginModule.html
Code Block |
---|
language | java |
---|
title | Jaas Config |
---|
|
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule |
Code Block |
---|
language | java |
---|
title | Jaas Config |
---|
|
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/keytabs/kafka.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka" // this will be used to connect to other brokers for replica management and also controller requests. This should be set to whatever principal that kafka brokers are running.
principal="kafka/_HOST@EXAMPLE.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/keytabs/kafka.keytab"
storeKey=true
useTicketCache=false
serviceName="zookeeper"
principal="kafka@EXAMPLE.COM";
}
KafkaServer will be used to authenticate Kafka broker against kerberos
and Client section will be used for zkClient to access kerberos enabled zookeeper cluster.
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/keytabs/kafkaclient.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka"
principal="kafkaproducer/_HOST@EXAMPLE.COM";
};
The above config is for any client ( producer, consumer) connecting to kerberos enabled Kafka cluster.
Here serviceName must match the principal name used under KafkaServer.
|
...
Code Block |
---|
|
public class SSLTransportLayer implements TransportLayer {
public SSLTransportLayer(SocketChannel socketChannel, SSLEngine sslEngine) throws IOException
}
|
...
Authenticator
Code Block |
---|
|
public interface Authenticator {
/**
* Closes any resources
*
* @throws IOException if any I/O error occurs
*/
void close() throws IOException;
/**
*
* @throws IOException
*/
void init() throws IOException;
/**
* Returns UserPrincipal after authentication is established
*/
UserPrincipal userPrincipal();
/**
* Does authentication in non-blocking way and returns SelectionKey.OP if further communication needed
*/
int authenticate(boolean read, boolean write) throws IOException;
/**
* returns true if authentication is complete otherwise returns false;
*/
boolean isComplete();
}
|
...
DefaultAuthenticator
Code Block |
---|
|
public class DefaultAuthenticator implements Authenticator {
TransportLayer transportLayer;
public DefaultAuthenticator(TransportLayer transportLayer) {
this.transportLayer = transportLayer;
}
public void init() {}
public int authenticate(boolean read, boolean write) throws IOException {
return 0;
}
/** returns peer host incase of SSL */
public UserPrincipal userPrincipal() {
return new UserPrincipal(transportLayer.getPeerPrincipal().toString());
}
public void close() throws IOException {}
public boolean isComplete() {
return true;
}
}
|
SaslServerAuthenticator
Code Block |
---|
|
public class SaslServerAuthenticator implements Authenticator {
public SaslServerAuthenticator(final Subject subject, TransportLayer transportLayer) {
}
}
|
SaslClientAuthenticator
Code Block |
---|
|
public class SaslServerAuthenticator implements Authenticator {
public SaslServerAuthenticator(final Subject subject, TransportLayer transportLayer) {
}
}
|
Callbacks for Login , SaslServer, SaslClient
Callbacks for the above modules will help in grabbing the users authentication information.
AuthUtils
Code Block |
---|
|
public class AuthUtils {
/**
* Construct a JAAS configuration object per kafka jaas configuration file
* @param storm_conf Storm configuration
* @return JAAS configuration object
*/
public static Configuration getConfiguration(String jaasConfigFilePath)
public static String getJaasConfig(String loginContextName, String key) throws IOException
public static String getDefaultRealm()
}
|
...