Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-4454
Released: <Kafka Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka allows users to plugin a custom PrincipalBuilder
and a custom Authorizer
by specifying the classpath of the corresponding classes in the config.
When a Kafka broker receives request bytes from clients over the socket channel, it reconstructs the request which is then handed over to the request handler threads (a.k.a API threads) to process the request. In the process of doing this, the Kafka broker constructs a Session
object that holds a KafkaPrincipal
and client's socket IP address. The KafkaPrincipal
includes the type of the client principal ("User" as of now) and the name of the Principal
, generated by the PrincipalBuilder
. The Authorizer
interface includes an authorize(...) method, that is invoked on every request that is received by the Kafka broker. If the broker has a custom Authorizer
configured, it will delegate the authorize(...) call to the custom implementation. The authorize(...) method takes in the Session
object, Operation
requested and the Resource
on which the operation is requested, as method parameters and returns true or false depending on the configured ACLs as follows :
def authorize(session: Session, operation: Operation, resource: Resource): Boolean
However, the principals generated by the plugged in PrincipalBuilder
may contain additional custom fields, and the user's Authorizer
implementation may need to access those fields in order to enforce ACLs correctly for those principals. Unfortunately, Kafka currently only extracts the name of the Principal
when constructing the Session
object as shown below, and loses the additional information at runtime:
val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) // (custom fields in principal if any are not passed through)
It is important to note that Java's Principal
API is opaque and different Kafka service providers can have custom implementations of the Principal
interface with additional features as per their requirements. Since Kafka allows users to plug in a custom PrincipalBuilder
and a custom Authorizer
, it does not make sense to extract only the name of the Principal
and ignore the other fields in the generated Principal
(which may be required by the custom Authorizer).
This issue can be addressed if Kafka stores the original Principal
object when it processes the incoming request, before handing it over to the API threads. The Authorizer
will then be able to access this Principal
object and use it to verify the ACLs.
New or Changed Public Interfaces
This KIP introduces a new constructor in the KafkaPrincipal
class, that takes in an additional parameter of type Java Principal
. It also introduces a new API in PrincipalBuilder,
that would build a Java Principal
from the passed in map of configs. kafka-acls.sh would take in 2 optional command line parameters specifying the PrincipalBuilder
class name and the properties used to create the PrincpalBuilder
.
Beyond the proposed new configuration key this KIP makes no changes to client or server public APIs.
Proposed Changes
Add a new field to
KafkaPrincipal
called "channelPrincipal" of typejava.security.Principal
.Add a new constructor to
KafkaPrincipal
that takes in an additional parameter of typejava.security.Principal
as follows :public class KafkaPrincipal implements Principal { ... private Principal channelPrincipal; // New Constructor public KafkaPrincipal(String principalType, String name, Principal channelPrincipal) { if (principalType == null || name == null) { throw new IllegalArgumentException("principalType and name can not be null"); } this.principalType = principalType; this.name = name; this.channelPrincipal = channelPrincipal; } public KafkaPrincipal(String principalType, String name) { this(principalType, name, null); } ... public Principal getChannelPrincipal() { return this.channelPrincipal; } }
The
Authorizer
can access this principal object as follows :public boolean authorize(RequestChannel.Session session, Operation operation, Resource resource) { ... KafkaPrincipal principal = session.principal(); // User_Defined_Principal should implement java.security.Principal User_Defined_Principal principal = (User_Defined_Principal) principal.getChannelPrincipal(); ... }
Changes to kafka-acls.sh
- Kafka-acls.sh will allow to specify a custom
PrincipalBuilder
class using a new command line parameter "-- principalBuilder" andPrincipalBuilder
configs using a new command line parameter "--principalBuilder-properties". The "--allow-principal" will take list of properties as follows :
bin/kafka-acls.sh ...... --principalBuilder <PrincipalBuilder-class> --principalBuilder-properties <PrincipalBuilder-properties> --add --allow-principal <principal-properties> --allow-principal <principal-properties> ...... --operations Read,Write --topic Test-topic
Add a new API to
PrincipalBuilder
:public interface PrincipalBuilder extends Configurable { ... /** * Build a Principal using the provided configs. * * @param principalConfigs configs used to create the Principal * @return Principal */ Principal buildPrincipal(Map<String, ?> principalConfigs); ... }
- The specified
PrincipalBuilder
class will be responsible for building thePrincipal
using the <principal-properties>. - The
Principal
generated by thisPrincipalBuilder
can then be included inKafkaPrincipal
using the new constructor specified above. The "--principalBuilder" and "--principalBuilder-properties" parameters are optional. If its not specified, the Kafka-acls.sh would still work as it does today.
- Kafka-acls.sh will allow to specify a custom
Compatibility, Deprecation, and Migration Plan
What impact (if any) will there be on existing users?
The existing users implementing PrincipalBuilder
interface will get compile time error and will have to add the new API implementation to resolve it.
Test Plan
- Unit tests to validate that new changes work as expected without affecting the existing behavior.
Rejected Alternatives
Kafka-acls.sh will allow to specify a custom PrincipalBuilder
using a new command line parameter "-- principalBuilder" and PrincipalBuilder
configs using a new command line parameter "--principalBuilder-properties". Users can use these to build their custom Principal
(that implements Java Principal
). Add a new API to PrincipalBuilder
Interface :
public interface PrincipalBuilder extends Configurable { ... /** * Build a Principal using name. * * @param name Principal name * @return Principal */ Principal buildPrincipal(String name); ... }
This PrincipalBuilder
API will then be used to generate a Principal
using the names specified in --allow-principal and --deny-principal parameters. This Principal
can be included in KafkaPrincipal
using the new constructor specified above.
This alternative was rejected due to following reasons :
Since the
Principal
is built using the "--principalBuilder-properties", users can only specify a particular type ofPrincipal
(s) (using --allow-principal / --deny-principal) at a time.If users want to specify multiple types of Principals, they will have to run the kafka-acls.sh multiple times with different "--principalBuilder-properties", even if the Principals might have the same name. For example, we can have a service
Principal
with name "XYZ" and a userPrincipal
with name "XYZ".
Due to above reasons, it is quite clear that it is less user friendly and not intuitive.