Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: renames, cleanup for consistency

...

As part of the KIP-500 effort to remove Kafka's ZooKeeper dependency, we need a broker-side API to alter these settings.

Public Interfaces

...

describeUserScramCredentials

describeScramUsers describeUserScramCredentials will describe the currently configured SCRAM usersuser credentials.  For security reasons, it will not list their passwords.

Code Block
languagejava
public enum ScramMechanism {
    UNKNOWN(0),
    HMAC_SHA_256(1),
    HMAC_SHA_512(2);

    byte type;

    private ScramMechanism(byte type) {
        this.type = type;
    }
}

public class ScramMechanismInfoScramCredentialInfo {
    private final ScramMechanism mechanism;
    private final int iterations;
}

public class ScramUserDescriptionUserScramCredentialsDescription {
    private final String name;
    private final List<ScramMechanismInfo>List<ScramCredentialInfo> infos;
}

public class DescribeScramUsersOptionsDescribeScramUserCredentialsOptions extends AbstractOptions<DescribeScramUsersOptions>AbstractOptions<DescribeScramUserCredentialsOptions> { }

// Describe all users.
default DescribeScramUsersResultDescribeScramUserCredentialsResult describeScramUsersdescribeScramUserCredentials() {
    return describeScramUsersdescribeScramUserCredentials(null);
}

default DescribeScramUsersResultDescribeScramUserCredentialsResult describeScramUsersdescribeScramUserCredentials(List<String> users) {
    return describeScramUsersdescribeScramUserCredentials(users, new DescribeScramUsersOptionsDescribeScramUserCredentialsOptions());
}

DescribeScramUsersResultDescribeScramUserCredentialsResult describeScramUsersdescribeScramUserCredentials(DescribeScramUsersOptionsDescribeScramUserCredentialsOptions options);

public class DescribeScramUsersResultDescribeScramUserCredentialsResult {
    public KafkaFuture<Map<String, ScramUserDescription>>UserScramCredentialsDescription>> all();
}

describeScramUsers describeScramUserCredentials will be implemented by a new RPC.

Code Block
{ 
  "apiKey": 50,
  "type": "request",
  "name": "DescribeScramUsersRequestDescribeUserScramCredentialsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Users", "type": "[]UserName", "versions": "0+", "nullableVersions": "0+",
      "about": "The users to describe, or null to describe all users.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The user name." }
    ]},
  ]
}

{ 
  "apiKey": 50, 
  "type": "response",
  "name": "DescribeScramUsersResponseDescribeUserScramCredentialsResponse", 
  "validVersions": "0", 
  "flexibleVersions": "0+", 
  "fields": [ 
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota
 violation, or zero if the request did not violate any quota." },
    { "name": "Error", "type": "int16", "versions": "0+",
      "about": "The message-level error code." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The message-level error message." },
    { "name": "UsersUserScramCredentials", "type": "[]ScramUserUserScramCredential", "versions": "0+",
      "about": "The users' SCRAM userscredentials.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The user name." },
      { "name": "MechanismInfosCredentialInfos", "type": "[]ScramUserMechanismInfoCredentialInfo", "versions": "0+",
        "about": "The mechanisms mechanism and related information associated with the user's SCRAM credential.", "fields": [
        { "name": "Mechanism", "type": "int8", "versions": "0+",
          "about": "The SCRAM mechanism." },
        { "name": "Iterations", "type": "int32", "versions": "0+",
          "about": "The number of iterations used in the SCRAM mechanismcredential." }
      ]}
    ]}
  ]
}

It will require DESCRIBE  permissions permissions on the CLUSTER resource.  It will return CLUSTER_AUTHORIZATION_FAILED if the user has insufficient permissions.

It will be will be sent to the controller, and will return NOT_CONTROLLER if the receiving broker is not the controller.

...

AlterScramUserCredentials

alterScramUsersalterScramUserCredentials will create or change SCRAM usersuser credentials.

Alterations will create the given user credential if it doesn't exist, or alter it if it does.

Code Block
interface ScramCredentialAlterationpublic abstract class UserScramCredentialAlteration {
    private final String user;
}

public class ScramCredentialAdditionUserScramCredentialUpsertion extends ScramCredentialAlterationUserScramCredentialAlteration {
    private final ScramMechanismInfoScramCredentialInfo info;
    private final byte[] salt;
    private final byte[] password;

    // There will be one constructor that randomly generates a salt, and one that accepts a pre-defined salt.
}

public class ScramCredentialDeletionUserScramCredentialDeletion extends ScramCredentialAlterationUserScramCredentialAlteration {
    private final ScramMechanism mechanism;
}

public class DeletedScramCredential {
    private final ScramMechanismInfo info;
}

public class AlterScramUsersOptions extends AbstractOptions<AlterScramUsersOptions>AlterScramUserCredentialsOptions extends AbstractOptions<AlterScramUserCredentialsOptions> {}

default AlterScramUsersResultAlterScramUserCredentialsResult alterScramUsersalterScramUserCredentials(List<ScramUserAlteration>List<UserScramCredentialAlteration> alterations) {
    return alterScramUsersalterScramUserCredentials(deletions, alterations, new AlterScramUsersOptionsAlterScramUserCredentialsOptions());
}

AlterScramUsersResultAlterScramUserCredentialsResult alterScramUsersalterScramUserCredentials(List<ScramUserAlteration>List<UserScramCredentialAlteration> alterations,
                                      AlterScramUsersOptionsAlterScramUserCredentialsOptions options);

public class AlterScramCredentialsResultAlterUserScramCredentialsResult {
    public KafkaFuture<Void> all();
    public Map<String, KafkaFuture<Void>> results();
}

...

If a user doesn't exist and we add a credential for them, they will be created.  If a user exists and we remove their last credential, they will be deleted.

The AlterScramUsersRequest AlterScramUserCredentialsRequest and AlterScramUsersResponse AlterScramUserCredentialsResponse implement the new API.

Code Block
languagejava
{ 
  "apiKey": 51, 
  "type": "request",
  "name": "AlterScramUsersRequestAlterUserScramCredentialsRequest",
  "validVersions": "0", 
  "flexibleVersions": "0+", 
  "fields": [ 
    { "name": "AlterationsDeletions", "type": "[]ScramUserAlterationScramCredentialDeletion", "versions": "0+",
      "about": "The SCRAM userscredentials to create or alterremove.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The user name." },
      { "name": "DeletionsMechanism", "type": "[]ScramCredentialDeletionint8", "versions": "0+",
        "about": "The credentialsSCRAM to deletemechanism.", "fields": [ }
    ]},
    { "name": "MechanismUpsertions", "type": "int8[]ScramCredentialUpsertion", "versions": "0+",
          "about": "The SCRAM mechanism." }
      ]}, credentials to update/insert.", "fields": [
      { "name": "AdditionsName", "type": "[]ScramCredentialAdditionstring", "versions": "0+",
        "about": "The SCRAM credentials to adduser name." },
        { "name": "Mechanism", "type": "int8", "versions": "0+",
          "about": "The SCRAM mechanism." },
        { "name": "Iterations", "type": "int32", "versions": "0+",
          "about": "The number of iterations, or -1 to use the server default." },
        { "name": "Salt", "type": "bytes", "versions": "0+", ",
          "about": "A random salt generated by the client." },
        { "name": "SaltedPassword", "type": "bytes", "versions": "0+", ",
          "about": "The salted password." }
      ] "The salted password." }
    ]}
  ]
}

{ 
  "apiKey": 51, 
  "type": "response",
  "name": "AlterScramUsersResponseAlterUserScramCredentialsResponse",
  "validVersions": "0", 
  "flexibleVersions": "0+", 
  "fields": [ 
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Results", "type": "[]AlterScramUsersResultAlterUserScramCredentialsResult", "versions": "0+",
      "about": "The results for deletions and alterations, inone theper same order as the requestaffected user.", "fields": [
        { "name": "User", "type": "string", "versions": "0+",
          "about": "The user name." },
        { "name": "ErrorCode", "type": "int8int16", "versions": "0+",
          "about": "The error code." },
        { "name": "ErrorStringErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
          "about": "The error message, if any." }
    ]}  
  ]       
}   

An addition will return INVALID_REQUEST if an empty user name is passed, or an invalid number of iterations, or a duplicate user name.  Note that if the number of iterations is set to -1, the server-side default will be used.

...