Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Binary log format

    • No

  • The network protocol and api behavior

    • The APIs will now do authorizations so the clients will see a new exception if they are not authorized for an operation.

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • <TDB>

  • Monitoring

    • No

  • Command line tools and arguments

    • Yes, Create topic and alter will have an optional acls, the output of describe will display owner and acls and alter topic will allow to modify the acls.optional --acl and --owner options which will allow users to specify the location of their acl json file and topic owner's username. Describe topic will display both the owner and acl info. 

  • Anything else that will likely break existing users in some way when they upgrade
    • Yes, TopicConfigs will now store 2 additional fields: owner and acls.  

Proposed Changes

    • We should test that this does not break backwards compatibility.

Proposed Changes

Authorizer

 

Code Block
languagescala
 package kafka.security.auth
/**
 * Top level interface that all pluggable authorizer must implement. Kafka server will read "authorizer.class" config
 * value at startup time, create an instance of the specified class and call initialize method.
 * authorizer.class must be a class that implements this interface.
 * If authorizer.class has no value specified no authorization will be performed.
 *
 * From that point onwards, every client request will first be routed to authorize method and the request will only be
 * authorized if the method returns true.
 */
trait Authorizer {
  /**
   * Guaranteed to be called before any authorize call is made.
   */
  def initialize(kafkaConfig: KafkaConfig, topicConfigCache: TopicConfigCache): Unit  
  
 /**
   * @param session The session being authenticated.
   * @param operation Type of operation client is trying to perform on resource.
   * @param resource Resource the client is trying to access.
   * @return
   */
  def authorize(session: Session, operation: Operation, resource: String): Boolean
}

Session

This is session from and https://reviews.apache.org/r/27204/. One added assumption is that on non-secure connections the session will have principal set to an object whose name() method will return "AnonymousDr. Who?".

Code Block
languagescala
object RequestChannel extends Logging {
	case class Session(principal: Principal, host: String)
}

Operation

Code Block
languagescala
package kafka.security.auth


/**
 * Different operations a client may perform on kafka resources.
 */
public enum Operation {
   READ,
   WRITE,
   CREATE,
   DELETE,
   CONFIGUREEDIT,
   DESCRIBE,
   REPLICATESEND_CONTROL_MESSAGE
}

PermissionType

Deny will take precedence over Allow .in competing acls. i.e. if 2 Acls are defined, one that allows an operation from all hosts and one that denies the operation from host1, the operation from host1 will be denied.

Code Block
languagescala
titlePermissionType
public enum PermissionType {
   ALLOW,
   
Code Block
languagescala
titlePermissionType
public enum PermissionType {
   ALLOW,
   DENY
}

AuthorizationException

Code Block
languagescala
/**
 * Exception thrown when a principal is not authorized to perform an operation.
 * @param message
 */
class AuthorizationException(principal: String, operation: Operation, host: String, resource: String) extends RuntimeException {
}

SimpleAclAuthorizer

  • Out of the box implementation of the Authorizer.
  • Self contained and no dependencies with any other vendor or providers.
  • For cluster actions that do not apply to a specific topic like CREATE we have 2 options. We can either add a broker config called broker.acls which will point to a json file. This file will be available on all broker hosts and authorizer will read the acls on initialization and keep refreshing it every X minutes. Any changes will require re-distribution of the acl json file. Alternatively we can add a zookeeper path /brokers/acls and store the acl json as data. Authorizer can refresh the acl from json every X minutes. In absence of broker acls the authorizer will fail open, in other words it will allow all users from all hosts to perform all cluster actions.
  • Will contain a ACLCache that will cache the broker acls and topic specific acls.
  • When an Acl is missing , this implementation will always fail open for backward compatibility. 
  • For clients that do not use any authentication , If authorizer finds an Acl for either broker action of for a topic but session does not have principal or principal is set to "AnonymousDr. Who?" it will fail close. For example if the client uses ssl channel and chooses to not authenticate it will still have access to all topics that have no acls attached or has an "allow anonymous access" acl but it won't have access to topics that allow access only to specific users.

Acl

Code Block
languagescala
package kafka.security.auth


/**
 * An instance of this class will represent an acl that can express following statement.
 * <pre>
 * Principal P has permissionType (Allow or Deny) on Operations READ,WRITE from hosts H1,H2.
 * </pre>
 * These acls instances will be attached to a resource like a topic or cluster.
 * @param principal A value of "Anonymous" indicates all users.
 * @param permissionType allow/deny
 * @param hosts A value of * indicates all hosts.
 * @param operations A value of ALL indicates all operations.
 */
case class Acl(principal: String, permissionType: PermissionType, hosts: Set[String], operations: Set[Operation])

Example Topic Acl Json

Code Block
languagescala
[
  {
{"version": 1, 
"acls": [
  {
    "principal": "alice”,    
    "permissionType": "ALLOW",
    "operations": [
      "READ",
      "WRITE"
    ],
    "hosts": [
      "host1",
      "host2"
    ]
  },
  {
    "principal": "bob”,
    "permissionType": "ALLOW",
    "operations": [
      "READ"
    ],
    "hosts": [
      "*"
    ]
  },
  {
    "principal": "bob”,
    "permissionType": "DENY",
    "operations": [
      "READ"
    ],
    "hosts": [
      "host1",
	  "host2"
    ]
  }
]
}
AclCache

TopicConfig

Code Block
languagescala
titleAclCache
package kafka.security.auth
/**
 * AAn cacheinstance layerthat forrepresents allthe aclstopic whichconfig alsostored actsin aszookeeper theduring converter that converts json acls to actual acl instances.
 * @param clusterAclFilePath local file path to the json file that describes cluster acls.
 * @param topicConfigCache instance of topicConfigCache that will be used to read topic owner and topic acls.
 */
class AclCache(clusterAclFilePath: String, topicConfigCache: TopicConfigCache)  {
  /**
   * Get acls for the topic.
   * @param topic
   * @return
   */
  def getAcls(topic: String): Set[Acl]
  /**
   * Get topic owner.
   * @param topic
   * @return
   */
  def getOwner(topic: String): String
  /**
   * Get cluster acls.
   * @return
   */
  def getClusterAcl()topic creation/modification.
*/
class TopicConfig(val version: Int, val owner: String,val logConfig: LogConfig,val acls: Set[Acl])
}
 

Changes to existing classes

  • KafkaServer will initialize the authorizer based on value of authorizer.class.name config. 
  • KafkaAPI will have an additional field authorizer, which will be passed by KafkaServer at the time of server initialization. KafkaAPI will call authorizer.authorize for all requests that needs to be authorized.
  • TopicCommandOptions will have an added option called acls using which a client can specify acl json file during topic creation/modification and another extra param called owner to specify the topic owner.
  • TopicConfig will have 2 additional field owner and List of ACLs.
  • KafkaConfig will have 3 additional configurations. 
    • authorizer.class.name: FQCN of the authroizer class to be used. Provided class must implement Authorizer interface.
    • kafka.superusers: list of users that will be given superuser access. These users will have access to everything. Users should set this to the user kafka broker processes are running as to avoid duplicate configuration for every single topic like ALLOW REPLICATION to BROKER_USER for TOPIC from ALL hosts.
    • broker.acl.file.path: path of the acl json file on the broker hosts. The Acl json file should have cluster related acls like which users other than the superusers have access to CREATE topic. If this value is not set the authorizer will allow all users to perform broker Operations like CREATE.
    Depending on whether we decide to authorize admin actions or not we may have to create a new API "authorize" at broker layer to authorize Admin actions
    • users to perform broker Operations like CREATE.
  • I have considered zookeeper node ACLs as out of scope for this document, if we decide to make it part of this KIP we will have to change bunch of other classes to support that.

Data Flows

Authentication and session initialization details are out of scope of this document. We will assume that the authentication is done before any authorization happens and the session instance is properly initialized. As mentioned above, we assume that on secure connection session has principal set to authenticated user and on non secure connections it is set to a special principal such that it's name() function returns "Anonymous".

Initialize Authorizer

Since this is pluggable architecture, users can easily replace the default provider implementation by writing their own custom provider and providing that class's FQCN as the value of config authorizer.class.name. On kafka server side on server initialization KafkaServer will read the value of authorizer.class.name, create an instance of the class name specified and call it's init method with KafkaConfig and TopicConfigCache parameter. This instance will be passed as a constructor argument to KafkaAPI. SimpleAclAuthorizer will initialize its AclCache ClusterAclCache by providing the value of broker.acl.file.path  and TopicConfigCache as constructor parameters. During initialization AclCacheClusterAclCache will read and parse the supplied json file to construct cluster ACL.

If the value of authorizer.class.name is null, in secure mode the cluster will fail with ConfigException. In non secure mode in absence of config value for authorizer.class.name the server will allow all requests to all topics that , even if the topic has configured acls. This is done purely for backwards compatibility and it will be a security hole. To avoid this we can always default to SimpleAclAuthorizer which will allow only access to topics that has acl configured to allow access for Anonymous user which will be the default acl for all topics created without any acls.

Topic Creation and Modification

Currently all topic creation/modification/deletion actions are performed using KafkaAdminUtil which mostly interacts directly with zookeeper instead of forwarding requests to a broker host. Given all the code is executed on client side there is no easy way to perform authorization. 

We will add an extra param two extra params and modify the topic config stored in zookeeper:

  • --acl to topic creation
CLI,
  • CLI, using which the user can specify the location of a json file that contains topic acls. 
  • --owner using which the user can specify the
location of a json file that contains topic acls. As part of TopicMetadata we will also store a
  • username of the topic owner, the default value will be the process owner and whenever we enable kerberos it will default to jass logged-in username. The topic owner will always have access to all topic related actions.
  • As part of TopicConfig we will also store these list of Acls and owner which will be used for all topic operation authorization. If no acls are specified we will assume user wants to allow access to all users and we will create an acl with "allow user Anonymous to perform all operations from all hosts". 

Kafka API Authorization

For each API that needs authorization KafkaApi class will first hand off the request to authorizer's authorize method with the session,operation and resource param. If the function returns false , KafkaApi will throw an AuthorizationException. Following pseudocode describes at a high level what the implementation will look like: 

Code Block
languagescala
 
authorize(session, operation, resource) {
    principal = session.principal
    remoteAddress = session.host
 
	if(principal is one of the superuser) {
    	return true
	}
   
    if(topicOperations.contains(operation)) {
        owner = AclCache.getOwner(resource)
    	acls = AclCache.getAclsForTopic(resource) //get cached acls from acl store for the topic.
    } else {
    	acls = AclCache.getClusterAcl(resource)
    }
	
    if(acls.isEmpty || acls.contains(allowAccessToEveryoneAcl) || owner eq principal)
       return true
   
   if any deny acl's are configured deny the access.
   otherwise check if any acl authorizes operation and return true or false based on that.
}

 

...