Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Remove requirement that altering when authenticated via delegation token be disallowed – it is now acceptable.

Table of Contents

Status

Current state: Under Discussion Accepted

Discussion thread: here

JIRA: none yet 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10259

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

As part of the KIP-500 effort to remove Kafka's ZooKeeper dependency, we need a broker-side API to alter these settings.

Public Interfaces

...

describeUserScramCredentials

listScramUsers describeUserScramCredentials will list describe the currently configured SCRAM usersuser credentials.  For security reasons, it will not list their passwords.

Code Block
languagejava
public enum ScramMechanismTypeScramMechanism {
    UNKNOWN(0),
    HMACSCRAM_SHA_256(1),
    HMACSCRAM_SHA_512(2);

    private final byte type;

    private ScramMechanismType(byte type) {
final String mechanismName;

    public static ScramMechanism fromType(byte this.type) {
 = type;
    }
}

public class ScramMechanism {// etc...
    private}

 final ScramMechanismType type;
 public static ScramMechanism private final int iterations;fromMechanismName(String mechanismName) {
    private final String password; // willetc...
 be null when listing}

    public staticString ScramMechanism parsemechanismName(String str);
) {
    public ScramMechanism(ScramMechanismType type, int iterations, String password) {
  // etc...
    }

    public byte this.type = type;() {
        this.iterations = iterations;
        this.password = password;// etc...
    }

    publicprivate ScramMechanismTypeScramMechanism(byte type() {
        return type;// etc...
    }
}

    public intclass iterations()ScramCredentialInfo {
    private final ScramMechanism  return iterationsmechanism;
    }

private final   public String password() {
        return password;
    }int iterations;
}

public class ScramUserListingUserScramCredentialsDescription {
    private final String name;
    private final List<ScramMechanism>List<ScramCredentialInfo> mechanisminfos;
}

    public ScramUserListing(String name, List<ScramMechanism> mechanism) {
        this.name = name;
   class DescribeScramUserCredentialsOptions extends AbstractOptions<DescribeScramUserCredentialsOptions> { }

// interface Admin: Describe all users.
default DescribeScramUserCredentialsResult describeScramUserCredentials() {
     this.mechanism = mechanism;
    return describeScramUserCredentials(null);
}

// interface Admin: Describe publicindicated Stringusers name()null {
or empty implies all).
default DescribeScramUserCredentialsResult describeScramUserCredentials(List<String> users) {
 return name;
  return describeScramUserCredentials(users, new DescribeScramUserCredentialsOptions());
}

// interface Admin
DescribeScramUserCredentialsResult describeScramUserCredentials(List<String> publicusers, List<ScramMechanism>DescribeScramUserCredentialsOptions mechanism() {
   options);

public class DescribeScramUserCredentialsResult {
    // returncompletes mechanism;
successfully only if  }
}

public class ListScramUsersOptions extends AbstractOptions<ListScramUsersOptions> { }

default ListScramUsersResult listScramUsers() {
    return listScramUsers(new ListScramUsersOptions());
}

ListScramUsersResult listScramUsers(ListScramUsersOptions options);

public class ListScramUsersResult {
    public KafkaFuture<Map<String, ScramUser>> all();
}

listScramUsers will be implemented by a new RPC.

users() does and every described user does
    public KafkaFuture<Map<String, UserScramCredentialsDescription>> all();

    // completes successfully if request was authorized and the list of users described is determined
    public KafkaFuture<List<String>> users();

    // completes successfully if the individual user is described successfully
    public KafkaFuture<UserScramCredentialsDescription> description(String userName)
}

describeScramUserCredentials will be implemented by a new RPC.

Code Block
{
  "apiKey": 50,
  "type": "request
Code Block
{ 
  "apiKey": 50,
  "type": "request",
  "name": "ListScramUsersRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
  ]
}

{ 
  "apiKey": 50, 
  "type": "response",
  "name": "ListScramUsersResponseDescribeUserScramCredentialsRequest", 
  "validVersions": "0", 
  "flexibleVersions": "0+", 
  "fields": [ 
    { "name": "ErrorUsers", "type": "int16[]UserName", "versions": "0+", "nullableVersions": "0+",
      "about": "The message-level error code." },
users to describe, or null/empty to describe all users.", "fields": [
      { "name": "ErrorMessageName", "type": "string", "versions": "0+",
 "nullableVersions": "0+",
      "about": "The message-level error messageuser name." },
    { "name": "Users",]}
  ]
}

{
  "apiKey": 50,
  "type": "[]ScramUserresponse",
  "versionsname": "0+DescribeUserScramCredentialsResponse",
    "validVersions": "0",
  "aboutflexibleVersions": "The SCRAM users.0+",
  "fields": [
      { "name": "NameThrottleTimeMs", "type": "stringint32", "versions": "0+",
        "about": "The userduration name." in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "MechanismsErrorCode", "type": "ScramUserMechanismint16", "versions": "0+",
        "about": "The user namemessage-level error code, 0 except for user authorization or infrastructure issues." },
        { "name": "TypeErrorMessage", "type": "int8string", "versions": "0+",
    "nullableVersions": "0+",
      "about": "The SCRAM mechanism type message-level error message, if any." },
        { "name": "IterationsResults", "type": "int32[]DescribeUserScramCredentialsResult", "versions": "0+",
          "about": "The numberresults offor iterationsdescriptions, usedone in the SCRAM mechanism." }
per user.", "fields": [
      {  }
    ]}
  ]
}

It will require ALTER permissions on the CLUSTER resource.  It will return CLUSTER_AUTHORIZATION_FAILED if the user has insufficient permissions.

It will be will be sent to the controller, and will return NOT_CONTROLLER if the receiving broker is not the controller.

AlterScramUsers

alterScramUsers will delete, create, or change SCRAM users.

Deletions are done by user name.

Alterations will create the given user if it doesn't exist, or alter it if it does.

Code Block
public class ScramUserDeletion {
    private final String user;

    ScramUserDeletion(String user) {
        this.user = user;
    }

    public String user() {
        return user;
    }
}

public class ScramUserAlteration {
    private final String user;
    private final List<ScramMechanism> password;

    public ScramCredentialAlteration(String user, List<ScramMechanism> mechanisms) {"name": "User", "type": "string", "versions": "0+",
        "about": "The user name." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The user-level error code." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The user-level error message, if any." },
      { "name": "CredentialInfos", "type": "[]CredentialInfo", "versions": "0+",
        "about": "The mechanism and related information associated with the user's SCRAM credentials.", "fields": [
        this.user = user;
        this.mechanisms = mechanisms;
{ "name": "Mechanism", "type": "int8", "versions": "0+",
         }

    public String user() { "about": "The SCRAM mechanism." },
        return user;
    }
{ "name": "Iterations", "type": "int32", "versions": "0+",
    public List<ScramMechanism> mechanisms() {
   "about": "The number of iterations returnused mechanisms;
in the SCRAM credential." }
]}

public class AlterScramUsersOptions extends AbstractOptions<AlterScramUsersOptions> {]}

default AlterScramUsersResult alterScramUsers(List<ScramUserDeletion> deletions,
                                              List<ScramUserAlteration> alterations) {
    return alterScramUsers(deletions, alterations, new AlterScramUsersOptions());
}

AlterScramUsersResult alterScramUsers(List<ScramUserDeletion> deletions,
         ]
}

It will require DESCRIBE permissions on the CLUSTER resource.  It will return CLUSTER_AUTHORIZATION_FAILED if the user has insufficient permissions.

It will return RESOURCE_NOT_FOUND if a user is requested to be described but that user does not exist/has no credentials.  Note that DescribeScramUserCredentialsResult will not consider such a username to be part of its users() result list, and this RESOURCE_NOT_FOUND error by itself will not prevent the future returned by all() from completing successfully.

It will return DUPLICATE_RESOURCE if a user is requested to be described twice.  Note that DescribeScramUserCredentialsResult will consider such a username to be part of its users() result list, and this will cause the future returned by all() to complete exceptionally.

AlterScramUserCredentials

alterScramUserCredentials will create or change SCRAM user credentials.

Alterations will create the given user credential if it doesn't exist, or alter it if it does.

Code Block
public abstract class UserScramCredentialAlteration {
    private final String user;
}

public class UserScramCredentialUpsertion extends UserScramCredentialAlteration {
    private final ScramCredentialInfo info;
    private final byte[] salt;
    private final byte[] password;

    // There will be one constructor that randomly generates a salt, and one that accepts a pre-defined salt.
}

public class UserScramCredentialDeletion extends UserScramCredentialAlteration {
    private final ScramMechanism mechanism;
}

public class AlterScramUserCredentialsOptions extends AbstractOptions<AlterScramUserCredentialsOptions> {}

// interface Admin
default AlterScramUserCredentialsResult alterScramUserCredentials(List<UserScramCredentialAlteration> alterations) {
    return alterScramUserCredentials( alterations, new AlterScramUserCredentialsOptions());
}

// interface Admin
AlterScramUserCredentialsResult alterScramUserCredentials(List<UserScramCredentialAlteration> alterations,
                                      AlterScramUserCredentialsOptions options);

public class AlterUserScramCredentialsResult {
    public Map<String, KafkaFuture<Void>> values();
    public KafkaFuture<Void> all(); // completes successfully only if everything in values() does
}

If any of the operations associated with a single user can't be done, none of the operations will be done.  On the other hand, operations could succeed for one user but fail for another, different user.

If a user doesn't exist and we add a credential for them, they will be created.  If a user exists and we remove their last credential, they will be deleted.

The AlterScramUserCredentialsRequest and AlterScramUserCredentialsResponse implement the new API.

Code Block
languagejava
{
  "apiKey": 51,
  "type": "request",
  "name": "AlterUserScramCredentialsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Deletions", "type": "[]ScramCredentialDeletion", "versions": "0+",
      "about": "The SCRAM credentials to remove.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The user name." },
      { "name": "Mechanism", "type": "int8", "versions": "0+",
      List<ScramUserAlteration> alterations,
  "about": "The SCRAM mechanism." }
    ]},
    { "name": "Upsertions", "type": "[]ScramCredentialUpsertion", "versions": "0+",
      "about": "The SCRAM credentials to update/insert.", "fields": [
     AlterScramUsersOptions options);

public class AlterScramCredentialsResult {
    public KafkaFuture<Void> all();
 { "name": "Name", "type": "string", "versions": "0+",
      public Map<String, KafkaFuture<Void>> results();
}

The AlterScramUsersRequest and AlterScreamUsersResponse implement the new API.

Code Block
languagejava
{ 
  "apiKey": 51, 
  "type": "response",
  "about": "The user name." },
      { "name": "AlterScramUsersRequestMechanism",
  "validVersionstype": "0int8", 
  "flexibleVersionsversions": "0+",
      
  "fieldsabout": [  "The SCRAM mechanism." },
      { "name": "DeletionsIterations", "type": "[]ScramUserDeletionint32", "versions": "0+",
        "about": "The SCRAMnumber users toof removeiterations.", "fields": [
   },
      { "name": "NameSalt", "type": "stringbytes", "versions": "0+",
          "about": "TheA userrandom name." }
      ]salt generated by the client." },
      { "name": "AlterationsSaltedPassword", "type": "[]ScramUserAlterationbytes", "versions": "0+",
        "about": "The SCRAM users to alter.", "fields": [salted password." }
    ]}
  ]
}

{
  "nameapiKey": "Name"51,
  "type": "stringresponse",
  "versionsname": "0+AlterUserScramCredentialsResponse",
      "validVersions": "0",
  "aboutflexibleVersions": "The0+",
 user name."fields": }[
      { "name": "MechanismsThrottleTimeMs", "type": "ScramMechanismint32", "versions": "0+",
        "about": "The SCRAMduration mechanismsin tomilliseconds configure." }
        { "name": "Type", "type": "int8", "versions": "0+",
          "about": "The mechanism typefor which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
        { "name": "IterationsResults", "type": "int32[]AlterUserScramCredentialsResult", "versions": "0+",
          "about": "The numberresults offor iterations,deletions orand -1alterations, toone useper the server default." },
  affected user.", "fields": [
      { "name": "PasswordUser", "type": "string", "versions": "0+", ",
          "about": "The user passwordname." },
      { "name": "ErrorCode", "type": "int16",  ]}"versions": "0+",
    ]}  
  ]    "about": "The error code." },
   
}   

{ 
 "name": "ErrorMessage", "apiKeytype": 51"string", 
 "versions": "0+", "typenullableVersions": "response0+",
        "nameabout": "AlterScramUsersResponse",The error message, if any." }
  "validVersions": "0", 
  "flexibleVersions": "0+", 
  "fields": [ 
    { "name": "Results", "type": "[]AlterScramUsersResult", "versions": "0+",
      "about": "The results for removals, followed by the results for alterations.", "fields": [
        { "name": "ErrorCode", "type": "int8", "versions": "0+",
          "about": "The error code." },
        { "name": "ErrorString", "type": "string", "versions": "0+", "nullableVersions": "0+",
          "about": "The error message, if any." }
    ]}  
  ]       
}   

A removal or alteration will return INVALID_REQUEST if an empty user name is passed, or an invalid number of iterations, or a duplicate user name.  Note that if the number of iterations is set to -1, the server-side default will be used.

A removal will return a new error code, RESOURCE_NOT_FOUND, if it was instructed to delete a user, but that user was not found.

The RPC will require ALTER on CLUSTER.  It will return CLUSTER_AUTHORIZATION_FAILED if the user has insufficient permissions.  It will be will be sent to the controller, and will return NOT_CONTROLLER if the receiving broker is not the controller.

Command-Line Changes

...

]}
  ]
}

An addition will return UNACCEPTABLE_CREDENTIAL if an empty user name or an invalid number of iterations (less than the minimum required for the mechanism or more than a hard-coded max of 16,384) is passed.

DUPLICATE_RESOURCE will be returned if a user appears as both an upsertion and a deletion in the same request.

UNSUPPORTED_SASL_MECHANISM will be returned if the broker does not recognize the requested SASL mechanism.

A removal will return an error code, RESOURCE_NOT_FOUND, if it was instructed to delete a credential that did not exist.

The RPC will require ALTER on CLUSTER.  It will return CLUSTER_AUTHORIZATION_FAILED if the user has insufficient permissions.

It will be will be sent to the controller and will return NOT_CONTROLLER if the receiving broker is not the controller.

Command-Line Changes

We will extend the kafka-configs.sh command to so that it is possible to set a SCRAM configuration without using --zookeeper.  The command-line syntax will be unchanged, except for the fact that users will now be able to pass --bootstrap-server instead of --zookeeper.

As mentioned earlier, this API does not return secrets.  Therefore, the salt, salted password, and so on will not be returned by a kafka-configs.sh --describe operation.  The describe operation will return only the presence of the user plus the algorithm and number of iterations used.  For example:

Code Block
$ bin/kafka-configs.sh --bootstrap-server localhost:9020 \
    --alter \
    --entity-type users \
    --entity-name alice \
    --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]'

Completed updating config for entity: user-principal 'alice'.

$ bin/kafka-configs.sh --bootstrap-server localhost:9020 --entity-type users --entity-name alice --describe

Configs for user-principal 'alice' are SCRAM-SHA-512=iterations=8192

Compatibility, Deprecation, and Migration Plan

...