You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

 

Status

Current stateUnder Discussion

Discussion thread: <TBD> and Kafka Security Proposal

JIRA: KAFKA-1688

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Security proposal for Kafka

As more enterprises have started using Kafka, there is a increasing demand for authorization for who can publish or consume from the topics. Authorization can be based on different available session attributes or context, like user, IP, common name in certificate, etc.  Having an extendable authorization interface will help us to implement the core requirements in the initial phase and make it enterprise ready. Having a pluggable interface will enable other security focused products to provide more advanced and enterprise grade implementations.

Public Interfaces

 

A public interface is any change to the following:
  • Binary log format

    • No

  • The network protocol and api behavior

    • No

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • <TDB>

  • Monitoring

    • No

  • Command line tools and arguments

    • Yes, Create topic will have an optional acls

  • Anything else that will likely break existing users in some way when they upgrade
    • Yes, TopicMetadata will now store 2 additional fields: owner and acls.

Proposed Changes

Authorizer

 

 package kafka.security.auth
/**
 * Top level interface that all pluggable authorizer must implement. Kafka server will read "authorizer.class" config
 * value at startup time, create an instance of the specified class and call initialize method.
 * authorizer.class must be a class that implements this interface.
 * If authorizer.class has no value specified no authorization will be performed.
 *
 * From that point onwards, every client request will first be routed to authorize method and the request will only be
 * authorized if the method returns true.
 */
trait Authorizer {
  /**
   * Guaranteed to be called before any authorize call is made.
   */
  def initialize(kafkaConfig: KafkaConfig): Unit
  
  /**
   * @param session The session being authenticated.
   * @param operation Type of operation client is trying to perform on resource.
   * @param resource Resource the client is trying to access.
   * @return
   */
  def authorize(session: Session, operation: Operation, resource: String): Boolean
}

Session

This is session from and https://reviews.apache.org/r/27204/. One added assumption is that on non-secure connections the session will have principal set to an object whose name() method will return "Anonymous".

object RequestChannel extends Logging {
	case class Session(principal: Principal, host: String)
}

Operation

package kafka.security.auth


/**
 * Different operations a client may perform on kafka resources.
 */
public enum Operation {
   READ,
   WRITE,
   CREATE,
   DELETE,
   CONFIGURE,
   DESCRIBE,
   REPLICATE
}

AuthorizationException

/**
 * Exception thrown when a principal is not authorized to perform an operation.
 * @param message
 */
class AuthorizationException(principal: String, operation: Operation, host: String, resource: String) extends RuntimeException {
}

SimpleAclAuthorizer

  • Out of the box implementation of the Authorizer.
  • Self contained and no dependencies with any other vendor or providers.
  • Will get the location of acl property file from kafka config. Will read the topic specific acl from topic metadata stored in zookeeper.
  • Will contain a ACLStore that will store the acl from property file and leverage topic metadata cache to get topic specific acls.

Acl

package kafka.security.auth


/**
 * An instance of this class will represent an acl that can express following statement.
 * <pre>
 * Principal P is allowed Operations READ,WRITE on Resource R from hosts H1,H2.
 * </pre>
 * @param principal
 * @param resource
 * @param allowedHosts A value of * indicates , allowed from all hosts.
 * @param allowedOperations
 */
class Acl(principal: String, resource: String, allowedHosts: Set[String], allowedOperations: Set[Operation])

Changes to existing classes

  • KafkaServer will initialize the authorizer based on value of authorizer.class.name config.
  • KafkaAPI will have an additional field authorizer, which will be passed by KafkaServer at the time of server initialization. KafkaAPI will call authorizer.authorize for all requests that needs to be authorized.
  • TopicCommandOptions will have an added option called acls using which a client can specify acl property file during topic creation/modification.
  • TopicMetadata will have 2 additional field owner and List of ACLs.
  • Depending on whether we decide to authorize admin actions or not we may have to create a new API "authorize" at broker layer to authorize Admin actions.
  • I have considered zookeeper node ACLs as out of scope for this document, if we decide to make it part of this KIP we will have to change bunch of other classes to support that.

Data Flows

Authentication and session initialization details are out of scope of this document. We will assume that the authentication is done before any authorization happens and the session instance is properly initialized. As mentioned above, we assume that on secure connection session has principal set to authenticated user and on non secure connections it is set to a special principal such that it's name() function returns "Anonymous".

Initialize Authorizer

Since this is pluggable architecture, users can easily replace the default provider implementation by writing their own custom provider and providing that class's FQCN as the value of config authorizer.class.name. On kafka server side on server initialization KafkaServer will read the value of authorizer.class.name, create an instance of the class name specified and call it's init method with kafakConfig as parameter. This instance will be passed as a constructor argument to KafkaAPI.During initialization SimpleAclAuthorizer will read the value of acl.file.path which should have location of a property file that contains cluster acl. SimpleAclAuthorizer will initialize its ACLStore using this property file. ACLStore will also read all the topic metadata and cache the topic acls and owner for all topics.

If the value of authorizer.class.name is null, in secure mode the cluster will fail with ConfigException. In non secure mode in absence of config value for authorizer.class.name the server will allow all requests to all topics that , even if the topic has configured acls. This is done purely for backwards compatibility and it will be a security hole. To avoid this we can always default to SimpleAclAuthorizer which will allow only access to topics that has acl configured to allow access for Anonymous user which will be the default acl for all topics created without any acls.

Topic Creation and Modification

Currently all topic creation/modification/deletion actions are performed using KafkaAdminUtil which mostly interacts directly with zookeeper instead of forwarding requests to a broker host. Given all the code is executed on client side there is no easy way to perform authorization. The simplest way to support authorization right now is to create a broker API "authorize" that will be called by AdminUtil
to authorize an operation. In absence of such an API we can not perform any authorization as the user can modify the client side jar and replace our authorization implementation with their own Authorizer implementation which can just ignore all the acl and always return true.

Regardless of our decision to support authorization on admin operation, we will add an extra param --acl to topic creation CLI, using which the user can specify the location of a property file that contains topic acls. As part of TopicMetadata we will also store a list of Acls and owner which will be used for all topic operation authorization. If no acls are specified we will assume user wants to allow access to all users and we will create an acl with "allow user Anonymous to perform all operations from all hosts". 

Kafka API Authorization

For each API that needs authorization KafkaApi class will first hand off the request to authorizer's authorize method with the session,operation and resource param. If the function returns false , KafkaApi will throw an AuthorizationException. Following pseudocode describes at a high level what the implementation will look like: 

 
authorize(session, operation, resource) {
    principal = session.principal
    remoteAddress = session.host
   
    if(topicOperations.contains(operation)) {
        owner = aclStore.getOwner(resource)
    	acls = aclStore.getAclsForTopic(resource) //get cached acls from acl store for the topic.
    } else {
    	acls = aclStore.getClusterAcl(resource)
    }
    if(acls.contains(allowAccessToEveryoneAcl) || owner eq principal)
       return true
   otherwise check if any acl authorizes operation and return true or false based on that.
}

 

Open Questions

  • Do we want to support authorization of KafkaAdminUtil operation? If yes, are we open to adding a new server side API to proxy authorize call or we want to wait till KIP-4 is committed and merged in? Any alternative approach?

  • What does acls on zookeeper node look like given all our admin APIs are currently performed directly from client?
  • Do we want to support group acls as part of this authorizer? Do we want to support principal to local user mapping? If yes we need to add plugins for UserToGroupMapper and PrincipalToUserMapper.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

    • This shouldn't affect any existing users

  • If we are changing behavior how will we phase out the older behavior?
    • No. The default implementation would maintain all existing usability behavior
  • If we need special migration tools, describe them here.
    • No
  • When will we remove the existing behavior?
    • No

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels