Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As part of the KIP-500 effort to remove Kafka's ZooKeeper dependency, we need a broker-side API to alter these settings.

Public Interfaces

...

describeScramUsers

listScramUsers describeScramUsers will list describe the currently configured SCRAM users.  For security reasons, it will not list their passwords.

Code Block
languagejava
public enum ScramMechanism {
    UNKNOWN(0),
    HMAC_SHA_256(1),
    HMAC_SHA_512(2);

    byte type;

    private ScramMechanism(byte type) {
        this.type = type;
    }
}

public class ScramMechanismInfo {
    private final ScramMechanism mechanism;
    private final int iterations;
}

public class ScramUserListingScramUserDescription {
    private final String name;
    private final List<ScramMechanismInfo> infos;
}

public class ListScramUsersOptionsDescribeScramUsersOptions extends AbstractOptions<ListScramUsersOptions>AbstractOptions<DescribeScramUsersOptions> { }

// Describe all users.
default ListScramUsersResultDescribeScramUsersResult listScramUsersdescribeScramUsers() {
    return listScramUsers(new ListScramUsersOptions()describeScramUsers(null);
}

ListScramUsersResultdefault DescribeScramUsersResult listScramUsersdescribeScramUsers(ListScramUsersOptionsList<String> options);

public class ListScramUsersResultusers) {
    publicreturn KafkaFuture<Map<StringdescribeScramUsers(users, ScramUserListing>> new DescribeScramUsersOptions());
}

DescribeScramUsersResult describeScramUsers(DescribeScramUsersOptions options);

public class DescribeScramUsersResult {
    public KafkaFuture<Map<String, ScramUserDescription>> all();
}

listScramUsers describeScramUsers will be implemented by a new RPC.

Code Block
{ 
  "apiKey": 50,
  "type": "request",
  "name": "ListScramUsersRequestDescribeScramUsersRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
  ]
}

{ 
 { "apiKeyname": 50"Users", 
  "type": "response[]UserName",
  "nameversions": "ListScramUsersResponse0+", 
  "validVersionsnullableVersions": "0+",
 
     "flexibleVersionsabout": "0+", 
 The users to describe, or null to describe all users.", "fields": [
  
    { "name": "ErrorName", "type": "int16string", "versions": "0+",
        "about": "The message-level error code." user name." }
    ]},
  ]
}

{ 
 { "nameapiKey": "ErrorMessage"50, 
  "type": "stringresponse",
  "versionsname": "0+DescribeScramUsersResponse", 
  "nullableVersionsvalidVersions": "0+", 
  "flexibleVersions": "0+", 
  "aboutfields": "The message-level error message." },
 [ 
    { "name": "UsersError", "type": "[]ScramUserint16", "versions": "0+",
      "about": "The SCRAM usersmessage-level error code.", "fields": [
   },
    { "name": "NameErrorMessage", "type": "string", "versions": "0+",
 "nullableVersions": "0+",
      "about": "The message-level usererror namemessage." },
      { "name": "MechanismInfosUsers", "type": "ScramUserMechanismInfo[]ScramUser", "versions": "0+",
        "about": "The userSCRAM nameusers." },
 "fields": [
      { "name": "MechanismName", "type": "int8string", "versions": "0+",
          "about": "The SCRAMuser mechanismname." },
        { "name": "IterationsMechanismInfos", "type": "int32ScramUserMechanismInfo", "versions": "0+",
          "about": "The number of iterations used in the SCRAM mechanismuser name." },
      }
  {  ]}
  ]
}

It will require ALTER permissions on the CLUSTER resource.  It will return CLUSTER_AUTHORIZATION_FAILED if the user has insufficient permissions.

It will be will be sent to the controller, and will return NOT_CONTROLLER if the receiving broker is not the controller.

AlterScramUsers

alterScramUsers will create or change SCRAM users.

Alterations will create the given user if it doesn't exist, or alter it if it does.

"name": "Mechanism", "type": "int8", "versions": "0+",
          "about": "The SCRAM mechanism." },
        { "name": "Iterations", "type": "int32", "versions": "0+",
          "about": "The number of iterations used in the SCRAM mechanism." }
      }
    ]}
  ]
}

It will require ALTER permissions on the CLUSTER resource.  It will return CLUSTER_AUTHORIZATION_FAILED if the user has insufficient permissions.

It will be will be sent to the controller, and will return NOT_CONTROLLER if the receiving broker is not the controller.

AlterScramUsers

alterScramUsers will create or change SCRAM users.

Alterations will create the given user if it doesn't exist, or alter it if it does.

Code Block
interface ScramCredentialAlteration
Code Block
public class ScramCredential {
    private final ScramMechanismInfo info;
    private final byte[] salt;
    private final byte[] password;

    // There will be one constructor that randomly generates a salt, and one that accepts a pre-defined salt.
}

public class ScramUserAlteration {
    private final String user;
}

public class ScramCredentialAddition extends privateScramCredentialAlteration final{
 List<ScramCredential> credentials;

  private final public ScramCredentialAlteration(String user, List<ScramCredential> credentials) {
   ScramMechanismInfo info;
    private final byte[] salt;
    private this.user = userfinal byte[] password;

    // There will be this.credentialsone =constructor credentials;
that randomly generates a }

salt, and one that publicaccepts String user() {
     a pre-defined salt.
}

public class ScramCredentialDeletion extends ScramCredentialAlteration {
   return user;
private final ScramMechanism  mechanism;
}

    public List<ScramCredential>class credentials()DeletedScramCredential {
    private final ScramMechanismInfo  return credentialsinfo;
    }
}}

public class AlterScramUsersOptions extends AbstractOptions<AlterScramUsersOptions> {}

default AlterScramUsersResult alterScramUsers(List<ScramUserAlteration> alterations) {
    return alterScramUsers(deletions, alterations, new AlterScramUsersOptions());
}

AlterScramUsersResult alterScramUsers(List<ScramUserAlteration> alterations,
                                      AlterScramUsersOptions options);

public class AlterScramCredentialsResult {
    public KafkaFuture<Void> all();
    public Map<String, KafkaFuture<Void>> results();
}

The AlterScramUsersRequest and AlterScreamUsersResponse implement the new API.

If any of the operations associated with a single user can't be done, none of the operations will be done.  On the other hand, operations could succeed for one user but fail for another, different user.

If a user doesn't exist and we add a credential for them, they will be created.  If a user exists and we remove their last credential, they will be deleted.

The AlterScramUsersRequest and AlterScramUsersResponse implement the new API.

Code Block
language
Code Block
languagejava
{ 
  "apiKey": 51, 
  "type": "request",
  "name": "AlterScramUsersRequest",
  "validVersions": "0", 
  "flexibleVersions": "0+", 
  "fields": [ 
    { "name": "Alterations", "type": "[]ScramUserAlteration", "versions": "0+",
      "about": "The SCRAM users to create or alter.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The user name." }
      { "name": "CredentialsDeletions", "type": "ScramCredential[]ScramCredentialDeletion", "versions": "0+",
        "about": "The SCRAM credentials to configuredelete." }, "fields": [
        { "name": "Mechanism", "type": "int8", "versions": "0+",
          "about": "The SCRAM mechanism." },
      ]},
      { "name": "IterationsAdditions", "type": "int32[]ScramCredentialAddition", "versions": "0+",
          "about": "The numberSCRAM of iterations, or -1 to use the server defaultcredentials to add." },
        { "name": "SaltMechanism", "type": "bytesint8", "versions": "0+", ",
          "about": "AThe randomSCRAM salt generated by the client." mechanism." },
        { "name": "SaltedPasswordIterations", "type": "bytesint32", "versions": "0+", ",
          "about": "The salted password." }
      ]} number of iterations, or -1 to use the server default." },
    ]}
  ]
}

{ 
 { "apiKeyname": 51"Salt", 
  "type": "responsebytes",
  "nameversions": "0+"AlterScramUsersResponse, ",
          "validVersionsabout": "0", 
  "flexibleVersions": "0+", 
  "fields": [ 
A random salt generated by the client." },
        { "name": "ResultsSaltedPassword", "type": "[]AlterScramUsersResultbytes", "versions": "0+", ",
          "about": "The resultssalted for alterations, in the same order as the request.", "fields": [password." }
      ]}
    ]}
  ]
}

{ 
 { "nameapiKey": "ErrorCode"51, 
  "type": "int8response",
  "versionsname": "0+AlterScramUsersResponse",
  "validVersions":      "0", 
  "aboutflexibleVersions": "The error code." },0+", 
  "fields": [ 
    { "name": "ErrorStringResults", "type": "string[]AlterScramUsersResult", "versions": "0+", "nullableVersions": "0+",
          "about": "The results errorfor messagealterations, if anyin the same order as the request." }
, "fields": [
     ]}   { 
  ]   "name": "User", "type": "string", "versions": "0+",
    
}   

A removal or alteration will return INVALID_REQUEST if an empty user name is passed, or an invalid number of iterations, or a duplicate user name.  Note that if the number of iterations is set to -1, the server-side default will be used.

A removal will return a new error code, RESOURCE_NOT_FOUND, if it was instructed to delete a user, but that user was not found.

The RPC will require ALTER on CLUSTER.  It will return CLUSTER_AUTHORIZATION_FAILED if the user has insufficient permissions.  It will be will be sent to the controller, and will return NOT_CONTROLLER if the receiving broker is not the controller.

DeleteScramUsers

deleteScramusers will delete SCRAM users.

Deletions are done by user name.

Code Block
public class ScramUserDeletion {
    private final String user;
}

public class DeleteScramUsersOptions extends AbstractOptions<AlterScramUsersOptions> {}

default DeleteScramUsersResult deleteScramUsers(List<ScramUserDeletion> deletions) {
    return deleteScramUsers(deletions, new DeleteScramUsersOptions());
}

DeleteScramUsersResult deleteScramUsers(List<ScramUserDeletion> deletions,
         "about": "The user name." },
        { "name": "ErrorCode", "type": "int8", "versions": "0+",
          "about": "The error code." },
        { "name": "ErrorString", "type": "string", "versions": "0+", "nullableVersions": "0+",
          "about": "The error message, if any." }
    ]}  
  ]                                  DeleteScramUsersOptions options);

public class DeleteScramUsersResult {
    public KafkaFuture<Void> all();
    public Map<String, KafkaFuture<Void>> results();
}

The DeleteScramUsersRequest and DeleteScramUsersResponse implement the new API.

...

languagejava

...


}   

An addition will return INVALID_REQUEST if an empty user name is passed, or an invalid number of iterations, or a duplicate user name.  Note that if the number of iterations is set to -1, the server-side default will be used.

A removal will return an error code, RESOURCE_NOT_FOUND, if it was instructed to delete a credential that did not exist.

The RPC will require ALTER on CLUSTER.  It will return CLUSTER_AUTHORIZATION_FAILED if the user has insufficient permissions.  It will be will be sent to the controller, and will return NOT_CONTROLLER if the receiving broker is not the controller.

Command-Line Changes

We will extend the kafka-configs.sh command to so that it is possible to set a SCRAM configuration without using --zookeeper.  The command-line syntax will be unchanged, except for the fact that users will now be able to pass --bootstrap-server instead of --zookeeper.

...