Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

 

Status

Current state:  Under Discussion Accepted

Discussion thread: <TBD> and Kafka Security Proposal

...

As more enterprises have started using Kafka, there is a increasing demand for authorization for who can publish or consume from the topics. Authorization can be based on different available session attributes or context, like user, IP, common name in certificate, etc.  Having an extendable authorization interface will help us to implement the core requirements in the initial phase and make it enterprise ready. Having a pluggable interface will enable other security focused products to provide more advanced and enterprise grade implementations.

 

Public Interfaces

 

A public interface is any change to the following:
  • Binary log format

    • No

  • The network protocol and api behavior

    • NoThe APIs will now do authorizations so the clients will see a new exception if they are not authorized for an operation.

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • <TDB>No.

  • Monitoring

    • No

  • Command line tools and arguments

    • Yes, Create Describe topic will have an optional aclsdisplay acl info. 

  • Anything else that will likely break existing users in some way when they upgrade
    • Yes, TopicMetadata will now store 2 additional fields: owner and aclsNo.

Proposed Changes

Public Interfaces and classes

Authorizer

 

Code Block
languagescala
 package kafka.security.auth

import kafka.network.RequestChannel.Session
import kafka.server.KafkaConfig

/**
 * Top level interface that all pluggableplugable authorizer must implement. Kafka server will read "authorizer.class" config
 * value at startup time, create an instance of the specified class and call initialize method.
 * authorizer.class must be a class that implements this interface.
 * If authorizer.class has no value specified no authorization will be performed.
 *
 * From that point onwards, every client request will first be routed to authorize method and the request will only be
 * authorized if the method returns true.
 */
trait Authorizer {
  /**
   * Guaranteed to be called before any authorize call is made.
   */
  def initialize(kafkaConfig: KafkaConfig): Unit
  
  /**
   * @param session The session being authenticated.
   * @param operation Type of operation client is trying to perform on resource.
   * @param resource Resource the client is trying to access.
   * @return
   */
  def authorize(session: Session, operation: Operation, resource: Resource): String): Boolean
 Boolean

  /**
   * add the acls to resource, this is an additive operation so existing acls will not be overwritten, instead these new
   * acls will be added to existing acls.
   * @param acls set of acls to add to existing acls
   * @param resource the resource to which these acls should be attached.
   */
  def addAcls(acls: Set[Acl], resource: Resource): Unit

  /**
   * remove these acls from the resource.
   * @param acls set of acls to be removed.
   * @param resource resource from which the acls should be removed.
   * @return true if some acl got removed, false if no acl was removed.
   */
  def removeAcls(acls: Set[Acl], resource: Resource): Boolean

  /**
   * remove a resource along with all of its acls from acl store.
   * @param resource
   * @return
   */
  def removeAcls(resource: Resource): Boolean

  /**
   * get set of acls for this resource
   * @param resource
   * @return empty set if no acls are found, otherwise the acls for the resource.
   */
  def getAcls(resource: Resource): Set[Acl]

  /**
   * get the acls for this principal.
   * @param principal
   * @return empty set if no acls exist for this principal, otherwise the acls for the principal.
   */
  def getAcls(principal: KafkaPrincipal): Set[Acl]
}

Session

This is session from and from https://reviews.apache.org/r/27204/. One added assumption is that on non-secure connections the session will have principal set to an object whose name() method will return "Anonymous".

Code Block
languagescala
object RequestChannel extends Logging {
	case class Session(principal: Principal, host: String)
}

KafkaPrincipal

Code Block
languagescala
titleKafkaPrincipal
/**
 * 
 * @param principalType type of principal. (i.e. for default implementation we will support user and group but custom authorizer can add more types.)
 * @param name name of the principal
 */
case class KafkaPrincipal(principalType: String, name: String) extends Principal 

ResourceType

Code Block
languagescala
titleResourceType
package kafka.security.auth

/**
 * Types of resources.
 */
public enum ResourceType {
   CLUSTER,
   TOPIC,
   CONSUMER_GROUP
}

Resource

Code Block
languagescala
titleResource
/**
 * @param resourceType type of Resource.
 * @param name name of the resource, for cluster this will always be "kafka-cluster", for topic it will be name of the topic, for group it will be group name.
 */
case class Resource(resourceType: ResourceType, name: String) 

Operation

Code Block
languagescala
package kafka.security.auth


/**
 * Different operations a client may perform on kafka resources.
 */
public enum Operation {
   READ, 
   WRITE,
   CREATE,
   DELETE,
   CONFIGUREALTER,
   DESCRIBE,
   REPLICATECLUSTER_ACTION
}


 

OperationResourceAPI

READ

Topic,

ConsumerGroup,

ConsumerGroup,

ConsumerGroup,

ConsumerGroup,

ConsumerGroup

Fetch,

JoinGroup,

OffsetFetchRequest,

HeartBeat,

JoinGroup,

CommitOffset

WRITE

Topic

Produce

CREATE

Cluster 

KIP-4

DELETE

Topics

KIP-4

ALTER

Topics

KIP-4

DESCRIBE

Topic,

Topic,

Topic,

Cluster,

Topic

GetOffSet,

GetTopicMetaData,

GetConsumerMetaData,

ListAllTopics(KIP-4),

GetTopicInfo(KIP-4)

CLUSTER_ACTION

Cluster,

Cluster,

Cluster,

Cluster

LeaderAndIsr,

StopReplica,

UpdateMetadata,

ControlledShutdown

 

In order to produce to a topic, the principal will require WRITE on the TOPIC. 

In order to consume from a topic using the new consumer API, the principal will need: READ on TOPIC and READ on CONSUMER-GROUP.

In order to edit topic config/add acls , the principal will require ALTER on the TOPIC.

PermissionType

Code Block
languagescala
titlePermissionType
public enum PermissionType { ALLOW, DENY }

AuthorizationException

Code Block
languagescala
/** 
 * Exception thrown when a principal is not authorized to perform an operation. 
 * @param messagemessage 
 */ 
class AuthorizationException(principalmessage: String) extends RuntimeException(message) { }

Acl

Code Block
languagescala
package kafka.security.auth 
/** 
 * An instance of this class will represent an acl that can express following statement. 
 * <pre> Principals P1,P2 has permissionType PT on Operations O1, operation: Operation, host: String, resource: String) extends RuntimeException {
}O2 from hosts H1,H2. </pre> 
 * @param principals A value of user:* indicates all users. 
 * @param permissionType 
 * @param hosts A value of * indicates all hosts. 
 * @param operations A value of ALL indicates all operations. 
 */ 
case class Acl(val principals: Set[KafkaPrincipal], val permissionType: PermissionType, val hosts: Set[String], val operations: Set[Operation])

Default implementation

SimpleAclAuthorizer

  • Out of the box implementation of the Authorizer.

  • Self contained and no dependencies with any other vendor or providers.

  • Will

    get the location of acl property file from kafka config. Will read the topic specific acl from topic metadata stored in zookeeper.
  • Will contain a ACLStore that will store the acl from property file and leverage topic metadata cache to get topic specific acls.
Acl
  • contain a ACLCache that will cache the broker acls and topic specific acls with a TTL of 1 hour.

  • Deny will take precedence over Allow in competing acls. i.e. if 2 Acls are defined, one that allows an operation from all hosts and one that denies the operation from host1, the operation from host1 will be denied. 

  • When no Acl is attached to a resource , this implementation will always fail close(deny all requests). 

  • When any Acl is attached to a resource only users that are in the allowed list will have access. All users with no explicit allow acls will be denied access by default.

  • Will allow principals that have READ or WRITE permission the DESCRIBE Operation as well without having to specify explicit acls.

  • It will use zookeeper as the storage layer for acls. Acls will be stored in json format described below under /kafka-acls/resource-type/<resource-name>.

    Code Block
    languagescala
package kafka.security.auth


/**
 * An instance of this class will represent an acl that can express following statement.
 * <pre>
 * Principal P is allowed Operations READ,WRITE on Resource R from hosts H1,H2.
 * </pre>
 * @param principal
 * @param resource
 * @param allowedHosts A value of * indicates , allowed from all hosts.
 * @param allowedOperations
 */
class Acl(principal: String, resource: String, allowedHosts: Set[String], allowedOperations: Set[Operation])
  • titleSimpleAuthorizer
    authorize(session, operation, resource) { 
      principal = session.principal 
      remoteAddress = session.host 
     
      if(principal is one of the superuser) { 
        return true 
      } 
      acls = getAcl(resource) //will have a cache. 
      if(acls.isEmpty || acls.contains(allowAccessToEveryoneAcl)) 
        return true 
      
      if any deny acl's are configured for this principal/host/operation combination, deny the access. 
     
      otherwise check if any acl exist that explicitly allows operation for this principal/host/operation and return true if some acl is found. 
     
      no explicit allow acls were found so deny the access. 
    }

    Example Acl Json That will be stored in zookeeper

    Code Block
    languagescala
    {"version": 1, 
     "acls": [ 
    	{ 
          "principals": ["user:alice”, "group:kafka-devs"], 
    	  "permissionType": "ALLOW", 
          "operations": [ "READ", "WRITE" ], 
          "hosts": [ "host1", "host2" ] 
        }, 
       { 
         "principal": ["user:bob”, "user:*"] ,
         "permissionType": "ALLOW", 
         "operations": [ "READ" ], 
         "hosts": [ "*" ] 
       }, 
       { 
         "principal": "user:bob”, 
         "permissionType": "DENY", 
         "operations": [ "READ" ], 
         "hosts": [ "host1", "host2" ] 
       } 
     ] 
    }

Changes to existing classes

  • KafkaServer

Changes to existing classes

  • Kafka server

    will initialize the authorizer based on value of

    "

    authorizer.class.name

    " config

     config. 

  • KafkaAPI will have an additional field authorizer, which will be passed by KafkaServer at the time of server initialization. KafkaAPI will call authorizer.authorize for all requests that needs to be authorized.

  • TopicCommandOptions will have an added option called acls using which a client can specify acl property file during topic creation/modification.
  • TopicMetadata will have 2 additional field owner and List of ACLs.
  • If the function returns false , KafkaApi will throw an AuthorizationException

  • KafkaConfig will have 2 additional configurations. 
    I have considered zookeeper node ACLs as out of scope for this document, if we decide to make it part of this KIP we will have to change ZKUtils so it can set acls on all zkNodes. I already have an implementation for this (not tested yet (smile)) however we will have to wait for KIP-4 to be merged.

    • authorizer.class.name: FQCN of the authroizer class to be used. Provided class must implement Authorizer interface.
    • kafka.superusers: list of users that will be given superuser access. These users will have access to everything. Users should set this to the user kafka broker processes are running as to avoid duplicate configuration for every single topic like ALLOW REPLICATION to BROKER_USER for TOPIC from ALL hosts.
    • authorizer.config.path: path to a properties file that will contain authorizer specific configuration. In case of DefaultAuthorizer implementation this config can contain the following 2 configs:

      • zookeeper.url: comma separated list of zookeeper host: port that default authorizer should use to store all the acls. Useful when the acl zookeeper store needs to be different from kafka zookeeper.

      • allow.everyone: The default authorizer implementation denies access to everyone and expects specific allow acls to be defined to grant access. If this flag is set to true it will allow access from everyone unless an explicit deny is found to revoke access

    Depending on weather we decide to authorize admin actions or not we may have to create a new APU "authorize" at broker layer to authorize Admin actions
      • .

Data Flows

Authentication and session initialization details are out of scope of this document. We will assume that the authentication is done before any authorization happens and the session instance is properly initialized. As mentioned above, we assume that on secure connection session has principal set to authenticated user and on non secure connections it is set to a special principal such that it's name() function returns "Anonymous".

Initialize Authorizer

Since this is pluggable architecture, users can easily replace the default provider implementation by writing their own custom provider and providing that class's FQCN as the value of config authorizer.class.name. On kafka server side on server initialization KafkaServer will read the value of authorizer.class.name, create an instance of the class name specified and call it's init method with kafakConfig as KafkaConfig parameter. This instance will be passed as a constructor argument to KafkaAPI.
During initialization SimpleAclAuthorizer will read the value of acl.file.path which should have location of a property file that contains cluster acl. SimpleAclAuthorizer will initialize its ACLStore using this property file. ACLStore will also read all the topic metadata and cache the TopicAcls. 

If the value of "authorizer.class.name" is null, in secure mode the cluster will fail with ConfigException. In non secure mode in absence of config value for "authorizer.class.name" the server will allow all requests to all topics that , even if the topic has configured acls. This is done purely for backwards compatibility and it will be a security hole. To avoid this we can always default to SimpleAclAuthorizer which will allow only access to topic topics that has acl configured to allow access for Annonymous userAnonymous users.

Topic Creation and Modification

Currently all topic creation/modification/deletion actions are performed using KafkaAdminUtil which mostly interacts directly with zookeeper instead of forwarding requests to a broker host. Given all the code is executed on client side there is no easy way to perform authorization. The simplest way to support authorization right now is to create a broker API "authorize" that will be called by AdminUtil
to authorize an operation. In absence of such an API we can not perform any authorization as the user can modify the client side jar and replace our authorization implementation with their own Authorizer implementation which can just ignore all the acl and always return true.

Regardless of our decision to support authorization on admin operation, we will add an extra param --acl using which the user can specify the location of a property file that contains topic acls. As part of TopicMetadata we wiil also store TopicAcls and owner which will be used by all topic operations. If no acls are specified we will assume user wants to allow access to all users and we will create an acl with

"allow annonymous to perform all operations from all hosts".

Kafka API Authorization

For each API that needs authorization KafkaApis class will first hand off the request to authorizer's authorize method.

  • SimpleAclAuthorizer will check if the operation is topic operation and read topic acls from TopicMetadata.
  • If acl's allow the operation , SimpleAclAuthorizer will return true otherwise it will return false.
  • If authorization fails an AuthorizationException will be thrown.

    Open Questions

    ...

    Do we want to support authorization of KafkaAdminUtil operation? If yes, are we open to adding a new server side API to proxy authorize call or we want to wait till KIP-4 is committed and merged in?

    ...

    Image Added

    Acl Management (CLI)

    Please see Kafka Authorization Command Line Interface

    • Image Added

    Out of scope

    * Admin APIs (Create/Alter/Delete e.g.) will not invoke authorizer until KIP-4 is done.

    * Setting correct acls on zookeeper nodes. 

    ...

    Compatibility, Deprecation, and Migration Plan

    What impact (if any) will there be on existing users?

    This shouldn't affect any existing users

    If we are changing behavior how will we phase out the older behavior?

    No. The default implementation would maintain all existing usability behavior

    If we need special migration tools, describe them here.

    ...

    Mirror maker will have to start using new acl management tool, 

    When will we remove the existing behavior?

    No

    Rejected Alternatives

    If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other wayWe originally proposed to store the acls as part of TopicConfig and no ACL management APIs were exposed. This had the advantage of simplicity of implementation ,less number of public APIs and Classes (ACL, KafkaPrincipal ,Resource were all private)and out of the box support for mirror maker and cleanup of acls with topic deletion and reusing some of the existing infrastructure around propagating topic config changes. However this approach had the draw back of mixing acls with topic config which seems like braking seperation fo concerns and it could have caused confusion to users using custom authorizer as custom authorizer could completely ignore the acls set using topic config. To overcome this we moved exposing ACL management APIs as public APIs that all authorizer must implement and enforcing all the authorizer to maintain their own ACL storage our side of topic config.