Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state:  "Under Discussion"Accepted

Discussion thread: here 

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6945

...

Public Interfaces

Protocol Changes

CreateTokenRequestCreateDelegationTokenRequest

We will bump the version of the CreateTokenRequest API to include the owner details in CreateDelegationTokenRequest and "Token requester"  in CreateDelegationTokenResponse.  For old versions, we will skip the owner and owner will be same as token request principal.

Code Block
linenumberstrue
{
  "apiKey": 38,
  "type": "request",
  "name": "CreateDelegationTokenRequest",
  // Version 1 is the same as version 0.
  //
  // Version 2 is the first flexible version.
  //
  // Version 3 adds the owner principal
  "validVersions": "0-3",
  "flexibleVersions": "2+",
  "fields": [
    { "name": "OwnerOwnerPrincipalType", "type": "[]CreatableOwnersstring", "versions": "3+", "nullableVersions": "23+",
      "about": "OwnerThe isprincipal antype Kafka PrincipalType+name string, who is of the owner of the token. If owner string is null, thenit's null it defaults to the token request principal is treated as owner"." },
     { "fieldsname": [
      {"OwnerPrincipalName", "nametype": "PrincipalTypestring", "typeversions": "string3+", "versionsnullableVersions": "23+",
        "about": "The type principal name of the owner of the Kafkatoken. If it's null it defaults to the token request principal." },
      { "name": "PrincipalNameRenewers", "type": "string[]CreatableRenewers", "versions": "20+",
        ""about": "TheA namelist of the Kafka principal." }
    ]}, those who are allowed to renew this token before it expires.", "fields": [
      { "name": "RenewersPrincipalType", "type": "[]CreatableRenewersstring", "versions": "0+",
        "about": "AThe listtype of thosethe whoKafka are allowed to renew this token before it expires.", "fields": [
      principal." },
      { "name": "PrincipalTypePrincipalName", "type": "string", "versions": "0+",
        "about": "The typename of the Kafka principal." },
    ]},
    { "name": "PrincipalNameMaxLifetimeMs", "type": "stringint64", "versions": "0+",
        "about": "The namemaximum lifetime of the Kafkatoken principal." }
    ]},
    { "name": "MaxLifetimeMs"in milliseconds, or -1 to use the server side default." }
  ]
}


Field

Description

Owner

Owner is an Kafka PrincipalType+name string, who is the owner of the token.  If owner string is null, then token request principal is treated as owner


CreateDelegationTokenResponse

Code Block
linenumberstrue
{
  "apiKey": 38,
 , "type": "int64response",
  "versionsname": "0+CreateDelegationTokenResponse",
  // Starting in version 1,  "about": "The maximum lifetime of the token in milliseconds, or -1 to use the server side default." }
  ]
}

...

Field

...

Description

...

Owner

...

Owner is an Kafka PrincipalType+name string, who is the owner of the token.  If owner string is null, then token request principal is treated as owner

CreateDelegationTokenResponse

Code Block
linenumberstrue
{
  "apiKey": 38,
  "type": "response",
  "name": "CreateDelegationTokenResponse",
  // Starting in version 1, on quota violation, brokers send out responses before throttling.
  "validVersions": "0-2on quota violation, brokers send out responses before throttling.
  //
  // Version 2 is the first flexible version.
  //
  // Version 3 adds the token requester principal
  "validVersions": "0-3",
  "flexibleVersions": "2+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error, or zero if there was no error."},
    { "name": "OwnerPrincipalType", "type": "CreatableOwnerstring", "versions": "20+",
      "about": "OwnerThe is an Kafka PrincipalType+name string, who is the owner principal type of the token owner." },
      "fields": [
        { "name": "PrincipalTypePrincipalName", "type": "string", "versions": "20+",
          "about": "The typename of the Kafkatoken principalowner." },
        { "name": "PrincipalNameTokenRequesterPrincipalType", "type": "string", "versions": "23+",
          "about": "The nameprincipal type of the Kafka principalrequester of the token." }
      ]},
    { "name": "TokenRequesterTokenRequesterPrincipalName", "type": "CreatableTokenRequesterstring", "versions": "23+",
      "about": "TokenThe requesterprincipal istype anof Kafkathe PrincipalType+name string, who requested thisrequester of the token.", "fields": [
  },
     { "name": "PrincipalTypeIssueTimestampMs", "type": "stringint64", "versions": "20+",
        "about": "TheWhen typethis oftoken thewas Kafka principalgenerated." },
      { "name": "PrincipalNameExpiryTimestampMs", "type": "stringint64", "versions": "20+",
        "about": "TheWhen namethis of the Kafka principaltoken expires." }
    ]},
    { "name": "IssueTimestampMsMaxTimestampMs", "type": "int64", "versions": "0+",
      "about": "WhenThe maximum lifetime of this token was generated." },
    { "name": "ExpiryTimestampMsTokenId", "type": "int64string", "versions": "0+",
      "about": "WhenThe this token expiresUUID." },
    { "name": "MaxTimestampMsHmac", "type": "int64bytes", "versions": "0+",
      "about": "TheHMAC maximumof lifetimethe of thisdelegation token." },
    { "name": "TokenIdThrottleTimeMs", "type": "stringint32", "versions": "0+",
      "about": "The token UUID." },
    { "name": "Hmac", "type": "bytes", "versions": "0+",
      "about": "HMAC of the delegation token." },
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }
  ]
}

...

Field

...

Description

...

Token requester

...

Token requester is an Kafka PrincipalType+name string, who requested this token.

duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }
  ]
}


Field

Description

Token requester

Token requester is an Kafka PrincipalType+name string, who requested this token.


DescribeDelegationTokenResponse

We will bump the version of the DescribeTokensRequest API to include the "Token requester"  info in response details. 


Code Block
linenumberstrue
{
  "apiKey": 41,
  "type": "response",
  "name": "DescribeDelegationTokenResponse",
  // Starting in version 1, on quota violation, brokers send out responses before throttling.
  // Version 2 adds flexible version support
  // Version 3 adds the token requester details
  "validVersions": "0-3",
  "flexibleVersions": "2+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
  

CreateDelegationTokenResponse

We will bump the version of the DescribeTokensRequest API to include the "Token requester"  info in response details. 

Code Block
linenumberstrue
{
  "apiKey": 41,
  "type": "response",
  "name": "DescribeDelegationTokenResponse",
  // Starting in version 1, on quota violation, brokers send out responses before throttling.
  "validVersions": "0-2",
  "fields": [
    { "name": "ErrorCodeTokens", "type": "int16[]DescribedDelegationToken", "versions": "0+",
      "about": "The error codetokens.", or 0 if there was no error." },
    "fields": [
      { "name": "TokensPrincipalType", "type": "[]DescribedDelegationTokenstring", "versions": "0+",
        "about": "The tokenstoken principal type.", "fields": [ },
      { "name": "OwnerPrincipalName", "type": "CreatableOwnerstring", "versions": "20+",
        "about": "OwnerThe istoken an Kafka PrincipalType+name string, who is the owner of the token."principal name." },
        "fields": [
          { "name": "PrincipalTypeTokenRequesterPrincipalType", "type": "string", "versions": "23+",
            "about": "The principal type of the requester Kafkaof the principaltoken." },
          { "name": "PrincipalNameTokenRequesterPrincipalName", "type": "string", "versions": "23+",
            ""about": "The nameprincipal type of the requester Kafkaof the principaltoken." }
        ]},
      { "name": "TokenRequesterIssueTimestamp", "type": "CreatableTokenRequesterint64", "versions": "20+",
        "about": "TokenThe requestertoken isissue antimestamp Kafka PrincipalType+name string, who requested this token", "fields": [
        { "name": "PrincipalType"in milliseconds." },
      { "name": "ExpiryTimestamp", "type": "stringint64", "versions": "20+",
          "about": "The typetoken ofexpiry thetimestamp Kafkain principalmilliseconds." },
        { "name": "PrincipalNameMaxTimestamp", "type": "stringint64", "versions": "20+",
          "about": "The nametoken maximum oftimestamp thelength Kafkain principalmilliseconds." }
      ]},
      { "name": "IssueTimestampTokenId", "type": "int64string", "versions": "0+",
        "about": "The token issue timestamp in millisecondsID." },
      { "name": "ExpiryTimestampHmac", "type": "int64bytes", "versions": "0+",
        "about": "The token expiry timestamp in millisecondsHMAC." },
      { "name": "MaxTimestampRenewers", "type": "int64[]DescribedDelegationTokenRenewer", "versions": "0+",
        "about": "The token maximum timestamp length in milliseconds." },Those who are able to renew this token before it expires.", "fields": [
        { "name": "TokenIdPrincipalType", "type": "string", "versions": "0+",
          "about": "The tokenrenewer ID." principal type" },
        { "name": "HmacPrincipalName", "type": "bytesstring", "versions": "0+",
          "about": "The token HMAC.renewer principal name" },
  
      ]}
    ]},
    { "name": "RenewersThrottleTimeMs", "type": "[]DescribedDelegationTokenRenewerint32", "versions": "0+",
        "about": "ThoseThe whoduration arein ablemilliseconds tofor renewwhich thisthe tokenrequest beforewas it expires.", "fields": [
        { "name": "PrincipalType", "type": "string", "versions": "0+",
          "about": "The renewer principal type" },
        { "name": "PrincipalName", "type": "string", "versions": "0+",
          "about": "The renewer principal name" }
      ]}
    ]},
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",throttled due to a quota violation, or zero if the request did not violate any quota." }
  ]
}


Field

Description

Token requester

Token requester is an Kafka PrincipalType+name string, who requested this token.

AdminClient API Changes

CreateDelegationTokenOptions class will be updated to take "owner" principal as input.


Code Block
languagejava
linenumberstrue
AdminClient {
    //create delegation token with supplied options
	public abstract CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options)
}

public class CreateDelegationTokenOptions extends AbstractOptions<CreateDelegationTokenOptions> {
    private long maxLifeTimeMs = -1;
    private List<KafkaPrincipal> renewers =  new LinkedList<>();
    private Optional<KafkaPrincipal> owner = Optional.empty(); // New

    public CreateDelegationTokenOptions owner(KafkaPrincipal owner) {  // New
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }
  ]
}

...

Field

...

Description

...

Token requester

...

Token requester is an Kafka PrincipalType+name string, who requested this token.

AdminClient API Changes

CreateDelegationTokenOptions class will be updated to take "owner" principal as input.

Code Block
languagejava
linenumberstrue
AdminClient {
    //create delegation token with supplied options
	public abstract CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options)
}

public class CreateDelegationTokenOptions extends AbstractOptions<CreateDelegationTokenOptions> {
    private long maxLifeTimeMs = -1;
    private List<KafkaPrincipal> renewers =  new LinkedList<>();
    private Optional<KafkaPrincipal> owner = Optional.empty(); // New this.owner = Optional.of(owner);
        return this;
    }

    public CreateDelegationTokenOptions renewers(List<KafkaPrincipal> renewers) {
        this.renewers = renewers;
        return this;
    }

    public Optional<KafkaPrincipal> owner() {
        return owner;
    }

    public List<KafkaPrincipal> renewers() {
        return renewers;
    }

    public CreateDelegationTokenOptions ownermaxlifeTimeMs(KafkaPrincipallong ownermaxLifeTimeMs) {  // New
        this.ownermaxLifeTimeMs = Optional.of(owner)maxLifeTimeMs;
        return this;
    }

    public CreateDelegationTokenOptionslong renewersmaxlifeTimeMs(List<KafkaPrincipal> renewers) {
        this.renewers = renewers;
        return thismaxLifeTimeMs;
    }

    public Optional<KafkaPrincipal> owner()}


Update TokenInformation class to include "token request" details.


Code Block
languagejava
linenumberstrue
public class TokenInformation {

    private KafkaPrincipal   return owner;
    }

private KafkaPrincipal tokenRequester; // publicNew
 List<KafkaPrincipal> renewers() {
 private Collection<KafkaPrincipal> renewers;
    private returnlong renewersissueTimestamp;
    }

private long maxTimestamp;
    public CreateDelegationTokenOptionsprivate maxlifeTimeMs(long maxLifeTimeMs) {
        this.maxLifeTimeMs = maxLifeTimeMsexpiryTimestamp;
    private String   return thistokenId;
    }
....
    public long maxlifeTimeMs() {....
        return maxLifeTimeMs;
    }
}

Update TokenInformation class to include "token request" details.

Code Block
languagejava
linenumberstrue
public class TokenInformation {

    private KafkaPrincipal owner;
    private KafkaPrincipal tokenRequester; // New
    private Collection<KafkaPrincipal> renewers;
    private long issueTimestamp;
    private long maxTimestamp;
    private long expiryTimestamp;
    private String tokenId;
    ....
    ....
    ....
}

ACL Changes:

We like to add two new Operations "CreateTokens", "DescribeTokens" on cluster resource, to allow users create token for other users and describe others tokens. 

 Owners/renewers/token requester principals can always renew/expire/describe their own tokens.  

...

Operation

...

Resource

...

Describe

...

DelegationToken

...

....
}

ACL Changes

Resources

A new resource called "User" will be added that represents a user which is available to the Authorizer as a Resource.

Operations

We like to add two new Operations "CreateTokens", "DescribeTokens" on User resource, to allow users create token for other users and describe others tokens.

 Owners/renewers/token requester principals can always renew/expire/describe their own tokens.  

Operation

Resource

API
CreateTokensUsercreateTokens for other users // New
DescribeTokensUserdescribeTokens for others tokens // New

Describe

DelegationToken

describeTokens for a given tokenId // Existing

DelegationTokenCommand Changes

We will allow kafka-delegation-tokens.sh script with "--create" option to take owner principal from "--owner-principal" option.

Code Block
linenumberstrue
>> bin/kafka-delegation-token.sh --bootstrap-server broker1:9092 --create -owner-principal User:owner1 --renewer-principal User:renewer1 --max-life-time 1486750745585

AclCommand Changes

To represent the new User resource type we have to modify the AclCommand slightly and add a new option called --user-principal . This represents a user principal of principal type "User". By specifying this parameter we would control (allow or deny) the token requester principal to create or describe tokens for the user-principal.

For instance:

Code Block
linenumberstrue
>> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:tokenRequester --allow-host * --operation CreateTokens --user-principal "owner1"

Protocol Changes

The version of CreateAcl, DescribeAcl and DeleteAcl will be increased to avoid serialization errors in case of older brokers which can't handle the newly added User resource type. This way the client can reject a request that the broker doesn't support.

DelegationTokenCommand Changes:

We will allow kafka-delegation-tokens.sh script with "--create" option to take owner principal from "--owner-principal" option.

...

linenumberstrue

...

Proposed Changes

Create/Renew Tokens:

Token requester users with 'CreateTokens' permission on 'ClusterUser' Resource can create or renew tokens for other users .  which are authorized by the ACL. The token requester must be authenticated using any of the available secure channels (Kerberos, SCRAM, SSL) to create or renew tokens for other users. The token requester can not use delegation token based authentication for creating or renewing tokens.

...

Users with 'DescribeTokens' permission on Cluster User resource can describe others tokens which are authorized by the ACL. A token can also be described if the user has a Describe permission on the DelegationToken resource.  

Token Details in Zookeeper

...