You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 65 Next »

Status

Current stateUnder Discussion

Discussion thread: <TBD> and Kafka Security Proposal

JIRA: KAFKA-1688

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Security proposal for Kafka

As more enterprises have started using Kafka, there is a increasing demand for authorization for who can publish or consume from the topics. Authorization can be based on different available session attributes or context, like user, IP, common name in certificate, etc.  Having an extendable authorization interface will help us to implement the core requirements in the initial phase and make it enterprise ready. Having a pluggable interface will enable other security focused products to provide more advanced and enterprise grade implementations.

Public Interfaces

 

A public interface is any change to the following:
  • Binary log format

    • No

  • The network protocol and api behavior

    • The APIs will now do authorizations so the clients will see a new exception if they are not authorized for an operation.

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • No.

  • Monitoring

    • No

  • Command line tools and arguments

    • Describe topic will display acl info. 

  • Anything else that will likely break existing users in some way when they upgrade
    • No.

Proposed Changes

Public Interfaces and classes

Authorizer

 

package kafka.security.auth

import kafka.network.RequestChannel.Session
import kafka.server.KafkaConfig

/**
 * Top level interface that all plugable authorizer must implement. Kafka server will read "authorizer.class" config
 * value at startup time, create an instance of the specified class and call initialize method.
 * authorizer.class must be a class that implements this interface.
 * If authorizer.class has no value specified no authorization will be performed.
 *
 * From that point onwards, every client request will first be routed to authorize method and the request will only be
 * authorized if the method returns true.
 */
trait Authorizer {
  /**
   * Guaranteed to be called before any authorize call is made.
   */
  def initialize(kafkaConfig: KafkaConfig): Unit
  
  /**
   * @param session The session being authenticated.
   * @param operation Type of operation client is trying to perform on resource.
   * @param resource Resource the client is trying to access.
   * @return
   */
  def authorize(session: Session, operation: Operation, resource: Resource): Boolean

  /**
   * add the acls to resource, this is an additive operation so existing acls will not be overwritten, instead these new
   * acls will be added to existing acls.
   * @param acls set of acls to add to existing acls
   * @param resource the resource to which these acls should be attached.
   */
  def addAcls(acls: Set[Acl], resource: Resource): Unit

  /**
   * remove these acls from the resource.
   * @param acls set of acls to be removed.
   * @param resource resource from which the acls should be removed.
   * @return true if some acl got removed, false if no acl was removed.
   */
  def removeAcls(acls: Set[Acl], resource: Resource): Boolean

  /**
   * remove a resource along with all of its acls from acl store.
   * @param resource
   * @return
   */
  def removeAcls(resource: Resource): Boolean

  /**
   * get set of acls for this resource
   * @param resource
   * @return empty set if no acls are found, otherwise the acls for the resource.
   */
  def getAcls(resource: Resource): Set[Acl]

  /**
   * get the acls for this principal.
   * @param principal
   * @return empty set if no acls exist for this principal, otherwise the acls for the principal.
   */
  def getAcls(principal: KafkaPrincipal): Set[Acl]
}

Session

This is session from and https://reviews.apache.org/r/27204/. One added assumption is that on non-secure connections the session will have principal set to an object whose name() method will return "Dr. Who?".

object RequestChannel extends Logging {
	case class Session(principal: Principal, host: String)
}

KafkaPrincipal

KafkaPrincipal
/**
 * 
 * @param principalType type of principal. (i.e. for default implementation we will support user and group but custom authorizer can add more types.)
 * @param name name of the principal
 */
case class KafkaPrincipal(principalType: String, name: String) extends Principal 

Operation

package kafka.security.auth

/**
 * Different operations a client may perform on kafka resources.
 */
public enum Operation {
   READ, 
   WRITE,
   CREATE,
   DELETE,
   EDIT,
   DESCRIBE,
   SEND_CONTROL_MESSAGE
}


OperationResourceAPI
READTopics
Fetch,
JoinGroup,
OffsetCommit
WRITETopics  Producer
CREATECluster  KIP-4
DELETE Topics KIP-4
ALTER TopicsKIP-4
DESCRIBE Topics and Cluster

getOffSet,

getTopicMetaData,

getConsumerMetaData,

listAllTopics(KIP-4),

getTopicInfo(KIP-4)

 

CLUSTER_ACTION
 Topics and Cluster 
LeaderAndIsr,
StopReplica,
UpdateMetadata,
ControlledShutdown,

HeartBeat

 

 

PermissionType

Deny will take precedence over Allow in competing acls. i.e. if 2 Acls are defined, one that allows an operation from all hosts and one that denies the operation from host1, the operation from host1 will be denied. 

PermissionType
public enum PermissionType {
   ALLOW,
   DENY
}

Resource

Resource
/**
 * Represents a Resource, the name can be a topic name or kafka_cluster for default authorizer implementation.
 */
case class Resource(name: String)

 

AuthorizationException

/**
 * Exception thrown when a principal is not authorized to perform an operation.
 * @param message
 */
class AuthorizationException(principal: String, operation: Operation, host: String, resource: String) extends RuntimeException {
}

Acl

package kafka.security.auth


/**
 * An instance of this class will represent an acl that can express following statement.
 * <pre>
 * Principal P has permissionType PT on Operations O1,O2 from hosts H1,H2.
 * </pre>
 * @param principal A value of *:* indicates all users.
 * @param permissionType
 * @param hosts A value of * indicates all hosts.
 * @param operations A value of ALL indicates all operations.
 */
case class Acl(val principal: KafkaPrincipal,val permissionType: PermissionType,val hosts: Set[String],val operations: Set[Operation]) 

Example Acl Json That will be stored in zookeeper

{"version": 1, 
"acls": [
  {
    "principal": "alice”,    
    "permissionType": "ALLOW",
    "operations": [
      "READ",
      "WRITE"
    ],
    "hosts": [
      "host1",
      "host2"
    ]
  },
  {
    "principal": "bob”,
    "permissionType": "ALLOW",
    "operations": [
      "READ"
    ],
    "hosts": [
      "*"
    ]
  },
  {
    "principal": "bob”,
    "permissionType": "DENY",
    "operations": [
      "READ"
    ],
    "hosts": [
      "host1",
	  "host2"
    ]
  }
]
}

 

SimpleAclAuthorizer

 

  • Out of the box implementation of the Authorizer.
  • Self contained and no dependencies with any other vendor or providers.
  • Will contain a ACLCache that will cache the broker acls and topic specific acls.
  • When no Acl is attached to a resource , this implementation will always fail open(allow all requests) for backward compatibility. 
  • It will use zookeeper as the storage layer for acls. Acls will be stored in json format described below under /kafka-acls/<resource-name>.

Changes to existing classes

  • KafkaServer will initialize the authorizer based on value of authorizer.class.name config. 

  • KafkaAPI will have an additional field authorizer, which will be passed by KafkaServer at the time of server initialization. KafkaAPI will call authorizer.authorize for all requests that needs to be authorized.

  • KafkaConfig will have 3 additional configurations. 
    • authorizer.class.name: FQCN of the authroizer class to be used. Provided class must implement Authorizer interface.
    • kafka.superusers: list of users that will be given superuser access. These users will have access to everything. Users should set this to the user kafka broker processes are running as to avoid duplicate configuration for every single topic like ALLOW REPLICATION to BROKER_USER for TOPIC from ALL hosts.
  • I have considered zookeeper node ACLs as out of scope for this document, if we decide to make it part of this KIP we will have to change ZKUtils so it can set acls on all zkNodes. I already have an implementation for this (not tested yet (smile)) however we will have to wait for KIP-4 to be merged.

Data Flows

Authentication and session initialization details are out of scope of this document. We will assume that the authentication is done before any authorization happens and the session instance is properly initialized. As mentioned above, we assume that on secure connection session has principal set to authenticated user and on non secure connections it is set to a special principal such that it's name() function returns "Anonymous".

Initialize Authorizer

Since this is pluggable architecture, users can easily replace the default provider implementation by writing their own custom provider and providing that class's FQCN as the value of config authorizer.class.name. On kafka server side on server initialization KafkaServer will read the value of authorizer.class.name, create an instance of the class name specified and call it's init method with KafkaConfig parameter. This instance will be passed as a constructor argument to KafkaAPI. 

If the value of authorizer.class.name is null, in secure mode the cluster will fail with ConfigException. In non secure mode in absence of config value for authorizer.class.name the server will allow all requests to all topics , even if the topic has configured acls. This is done purely for backwards compatibility and it will be a security hole. To avoid this we can always default to SimpleAclAuthorizer which will allow only access to topics that has acl configured to allow access for Anonymous users.

Acl Management

We can either modify the existing topics CLI or create a new CLI tool just for acl management. 

Following is the help message from the script which lists all possible options:

  • kafka-acl.sh --help
    A tool to manage acls for kafka cluster and topics. Following are the options
    --topic <topicname> name of topic whose acls you want to add/remove.
    --cluster this will add/edit cluster level acls where you can control which users can create topics or list all topics for the cluster or send control messages to brokers.
    --add indicates you are trying to add Acl
    --remove indicates you are trying to remove acl
    --grantprincipal <principalType: principalName> a comma separated list of principalType: principalName where currently supported principalType is "user". When this option is not present but --granthost is specified it will default to * : * which is wild card that matches all types and all users. 
    --granthost <host1,host2> a comma separated list of hosts from which the --grantpricipal is allowed the actions. When this option is not present but --grantprincipal is specified, it defaults to * which is wildcard matching all hosts.
    --revokeprincipal <principalType: principalName> a comma separated list of principalType: principalName where currently supported principalType is "user". When this option is not present but --revokehost  is specified , it defaults to * : * which is wild card that matches all types and all users. Revoke should only be used to limit the access added by some other grants. A revoke without an accompanying grant is meaningless as by default only principals in grant lists are allowed access based on their acls and everyone else is denied.
    ----revokehost <host1,host2> a comma separated list of hosts from which the --revokeprincipal will be denied actions. When this option is not present but --revokeprincipal is specified, it defaults to * which is wildcard matching all hosts.
    --removeAll removes all acls for a topic, only works with topic and not with cluster.
    --list list all acls
    --principal <principalType: principalName> Used only with --list to list all acls for a principal

Kafka API Authorization

For each API that needs authorization KafkaApi class will first hand off the request to authorizer's authorize method with the session,operation and resource param. If the function returns false , KafkaApi will throw an AuthorizationException. Following pseudocode describes at a high level what the implementation will look like: 

SimpleAuthorizer
authorize(session, operation, resource) { 
principal = session.principal  
remoteAddress = session.host   
if(principal is one of the superuser) {   
	return true 
} 
 
acls = getAcl(resource) //will have a cache. 
if(acls.isEmpty || acls.contains(allowAccessToEveryoneAcl)) return true   
 
if any deny acl's are configured for this principal/host/operation combination, deny the access. 
otherwise check if any acl exist that explicitly allows operation for this principal/host/operation and return true.

no explicit allow acls were found so deny the access. 
}

Open Questions

  • What should be the default behavior for scenarios like "no acls found" which will be the case for all topics created before security release? Generally in secure setups the de-facto behavior is to fail close but this forces users to add an "Allow everyone from all hosts" acl to all of their existing topics. 
  • What should be the storage for out of box Authorizer implementation? Zookeeper seems like a good fit here as acls seldom change and we can have a caching layer with TTL of 1 hour. Jun has suggested to use topic (just like we use it for consumer offset storage) which will allow mirror maker to replicate acls without any changes however this means for any custom authorizer the acls will not be replicated by mirror maker. It might be better to update mirror maker to explicitly use getAcls and addAcls API so Acls are replicated no matter what authorizer implementation is chosen.
  • Do we want to modify the existing CLI with all the acl options? or just create a new acl management CLI? 
  • Do we want to support Deny as a permissionType? This adds support for complex acls like "allow access to user1 from all hosts but host1,host2" at the expense of simplicity. I don't think anyone feels about keeping/removing this strongly so will be great to make a quick decision. 
  • Do we want 1:1 mapping from Operation to API or a categorization as listed above? If we decide to go with a 1:1 mapping I think it is better to make it an open string instead of an enum.
  •  Should we even allow defining acls for operation type CLUSTER_ACTION or only allow super users to perform these actions? 

Compatibility, Deprecation, and Migration Plan

What impact (if any) will there be on existing users?

This shouldn't affect any existing users

If we are changing behavior how will we phase out the older behavior?

No. The default implementation would maintain all existing usability behavior

If we need special migration tools, describe them here.

Mirror maker will have to start using new acl management tool, 

When will we remove the existing behavior?

No

Rejected Alternatives

We originally proposed to store the acls as part of TopicConfig and no ACL management APIs were exposed. This had the advantage of simplicity of implementation ,less number of public APIs and out of the box support for mirror maker and cleanup of acls with topic deletion and reusing some of the existing infrastructure around propagating topic config changes. However this approach had the draw back of mixing acls with topic config which seems like braking seperation fo concerns and it could have caused confusion to users using custom authorizer as custom authorizer could completely ignore the acls set using topic config. To overcome this we moved exposing ACL management APIs as public APIs that all authorizer must implement and enforcing all the authorizer to maintain their own ACL storage our side of topic config.

  • No labels