You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-4454

Released: <Kafka Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka allows users to plugin a custom PrincipalBuilder and a custom Authorizer by specifying the classpath of the corresponding classes in the config.

When a Kafka broker receives request bytes from clients over the socket channel, it reconstructs the request which is then handed over to the request handler threads (a.k.a API threads) to process the request. In the process of doing this, the Kafka broker constructs a Session object that holds a KafkaPrincipal and client's socket IP address. The KafkaPrincipal includes the type of the client principal ("User" as of now) and the name of the Principal, generated by the PrincipalBuilder. The Authorizer interface includes an authorize(...) method, that is invoked on every request that is received by the Kafka broker. If the broker has a custom Authorizer configured, it will delegate the authorize(...) call to the custom implementation. The authorize(...) method takes in the Session object,  Operation requested and the Resource on which the operation is requested, as method parameters and returns true or false depending on the configured ACLs as follows :

def authorize(session: Session, operation: Operation, resource: Resource): Boolean

However, the principals generated by the plugged in PrincipalBuilder may contain additional custom fields, and the user's Authorizer implementation may need to access those fields in order to enforce ACLs correctly for those principals. Unfortunately, Kafka currently only extracts the name of the Principal when constructing the Session object as shown below, and loses the additional information at runtime:

val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) // (custom fields in principal if any are not passed through)

It is important to note that Java's Principal API is opaque and different Kafka service providers can have custom implementations of the Principal interface with additional features as per their requirements.  Since Kafka allows users to plug in a custom PrincipalBuilder and a custom Authorizer, it does not make sense to extract only the name of the Principal and ignore the other fields in the generated Principal (which may be required by the custom Authorizer).

This issue can be addressed if Kafka stores the original Principal object when it processes the incoming request, before handing it over to the API threads. The Authorizer will then be able to access this Principal object and use it to verify the ACLs.

Public Interfaces

This KIP introduces a new constructor in the KafkaPrincipal class, that takes in an additional parameter of type Java Principal. It also introduces a new API in PrincipalBuilder, that would build a Java Principal from the passed in map of configs. kafka-acls.sh would take in 2 optional command line parameters specifying the PrincipalBuilder class name and the properties used to create the PrincpalBuilder.

Beyond the proposed new configuration key this KIP makes no changes to client or server public APIs.

Proposed Changes

  • Add a new field to KafkaPrincipal called "channelPrincipal" of type java.security.Principal

  • Add a new constructor to KafkaPrincipal that takes in an additional parameter of type java.security.Principal as follows :

    public class KafkaPrincipal implements Principal {
    ...
     
     private Principal channelPrincipal;
     
     // New Constructor
     public KafkaPrincipal(String principalType, String name, Principal channelPrincipal) {
     if (principalType == null || name == null) {
     throw new IllegalArgumentException("principalType and name can not be null");
     }
     this.principalType = principalType;
     this.name = name;
     this.channelPrincipal = channelPrincipal;
     }
     
     public KafkaPrincipal(String principalType, String name) {
     this(principalType, name, null);
     }
     
    ...
     
     public Principal getChannelPrincipal() {
     return this.channelPrincipal;
     }
    }
  • The Authorizer can access this principal object as follows :

    public boolean authorize(RequestChannel.Session session, Operation operation, Resource resource) {
    ...
     
     KafkaPrincipal principal = session.principal();
     // User_Defined_Principal should implement java.security.Principal 
     User_Defined_Principal principal = (User_Defined_Principal) principal.getChannelPrincipal(); 
     
    ...
    }
  • Changes to kafka-acls.sh

    • Kafka-acls.sh will allow to specify a custom PrincipalBuilder class using a new command line parameter "-- principalBuilder" and PrincipalBuilder configs using a new command line parameter "--principalBuilder-properties".
    • The "--allow-principal" will take list of properties as follows :

      bin/kafka-acls.sh ...... --principalBuilder <PrincipalBuilder-class> --principalBuilder-properties <PrincipalBuilder-properties> --add --allow-principal <principal-properties> --allow-principal <principal-properties> ...... --operations Read,Write --topic Test-topic
    • Add a new API to PrincipalBuilder :

      public interface PrincipalBuilder extends Configurable {
      ...
      
        /**
         * Build a Principal using the provided configs.
         *
         * @param  principalConfigs  configs used to create the Principal
         * @return Principal
         */
        Principal buildPrincipal(Map<String, ?> principalConfigs);
      
      ...
      }
    • The specified PrincipalBuilder class will be responsible for building the Principal using the <principal-properties>.
    • The Principal generated by this PrincipalBuilder can then be included in KafkaPrincipal using the new constructor specified above.
    • The "--principalBuilder" and "--principalBuilder-properties" parameters are optional. If its not specified, the Kafka-acls.sh would still work as it does today.

Compatibility, Deprecation, and Migration Plan

This KIP is a pure addition to existing functionality and does not include any backward incompatible changes.

Test Plan

- Unit tests to validate that new changes work as expected without affecting the existing behavior.

Rejected Alternatives

Kafka-acls.sh will allow to specify a custom PrincipalBuilder using a new command line parameter "-- principalBuilder" and PrincipalBuilder configs using a new command line parameter "--principalBuilder-properties". Users can use these to build their custom Principal (that implements Java Principal). Add a new API to PrincipalBuilder Interface :

public interface PrincipalBuilder extends Configurable {
...
 /**
 * Build a Principal using name.
 *
 * @param name Principal name
 * @return Principal
 */
 Principal buildPrincipal(String name);
 
...
}

This PrincipalBuilder API will then be used to generate a Principal using the names specified in --allow-principal and --deny-principal parameters. This Principal can be included in KafkaPrincipal using the new constructor specified above.

This alternative was rejected due to following reasons :

  1. Since the Principal is built using the "--principalBuilder-properties", users can only specify a particular type of Principal(s) (using --allow-principal / --deny-principal) at a time.

  2. If users want to specify multiple types of Principals, they will have to run the kafka-acls.sh multiple times with different "--principalBuilder-properties", even if the Principals might have the same name. For example, we can have a service Principal with name "XYZ" and a user Principal with name "XYZ".

Due to above reasons, it is quite clear that it is less user friendly and not intuitive.

  • No labels