Status

Current state["DISCUSSION"]. 

Discussion thread: here

JIRA: KAFKA-1696 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

We introduced support for security in kafka version 0.9.0. using kerberos as authentication layer. Kafka is designed to work with a lot of producers and consumers so in a secure environment all these clients will need access to a keytab or a TGT to ensure they can communicate with a secure kafka broker. This has few disadvantages:

Please read http://carfield.com.hk:8080/document/distributed/hadoop-security-design.pdf HDFS section for more detailed explanation of all the disadvatages above. To address the problems listed above we propose to add support for delegation tokens to secure Kafka. Delegation tokens are shared secret between kafka brokers and clients so authentication can be done without having to go through KDC.

Delegation tokens will help processing frameworks to distribute workload to available workers in a secure environment without the added cost of distributing keytabs or TGT. i.e. In case of Storm, Storm’s master (nimbus) is the only node that needs a keytab. Using this keytab Nimbus will authenticate with kafka broker and acquire a delegation token. Nimbus can then distribute this delegation token to all of its worker hosts and all workers will be able to authenticate to kafka using tokens and will have all the access that nimbus keytab principal has.

Public Interfaces

Following new APIs and request/response classes will be added:

getDelegationToken(request: DelegationTokenRequest): DelegationTokenResponse

class DelegationTokenRequest(renewer: option(KafkaPrincipal) = None, maxLifeTime: long = -1)

class DelegationTokenResponse(owner:  KafkaPrincipal,  expiryTimeMillis: long, renewer: option(KafkaPrincipal) = None, maxLifeTime: long = -1, tokenId: String, hmac: byte[])  

renewDelegationToken(request: RenewDelegationTokenRequest): DelegationTokenResponse

class RenewDelegationTokenRequest(hmac: byte[], expiryTimeMillis: long) 

expireToken(request: ExpireTokenRequest)

class ExpireTokenRequest(hmac: byte[], expireAt: long  = Systemtime.currentTimeMillis) 

Proposed Changes

Token acquisition

Following steps describe how tokens can be acquired:


kafka_token_acquisition.png

Authentication using Token

 We will reuse the current SASL channel but for authentication using delegation we will use DIGEST-MD5.

kafka_authentication_using_tokens.png

Token renewal

token_renewal.png

Token expiration and cancellation

 If a token is not renewed by the token’s expiration time or if token is beyond the max life time, it will be deleted from all broker caches as well as from zookeeper. Alternatively an owner or renewer can issue a expiration/cancellation by following a similar process as renewal.

Invalidating all tokens

 In case of a password compromise scenario all the tokens can be deleted from zookeeper and this will result in all the tokens to be invalidated. We can provide a simple CLI tool for this. 

Command line tool

 We will provide a CLI to acquire delegation tokens, renew tokens and to invalidate/expire tokens.

Alternatives

Originally we considered to not have any shared Secret at config level. This required us to chose one of the 2 options