Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Changes as described in VOTE thread based on PR reviews

...

Code Block
languagejava
public enum ScramMechanism {
    UNKNOWN(0),
    SCRAM_SHA_256(1),
    SCRAM_SHA_512(2);

    private final byte type;
    private final String mechanismName;

    public privatestatic ScramMechanism fromType(byte type) {
        // this.type = type;etc...
    }
}

public class ScramCredentialInfo {
 public static ScramMechanism private final ScramMechanism mechanism;
fromMechanismName(String mechanismName) {
     private final int iterations;
 // etc...
    }

    public classString UserScramCredentialsDescriptionmechanismName() {
    private final String name;
    private final List<ScramCredentialInfo> infos;
// etc...
    }

public   class DescribeScramUserCredentialsOptionspublic extendsbyte AbstractOptions<DescribeScramUserCredentialsOptions>type() {
 }

// Describe all users.
default DescribeScramUserCredentialsResult describeScramUserCredentials() {
      // etc...
   return describeScramUserCredentials(null);
}

default    DescribeScramUserCredentialsResultprivate describeScramUserCredentialsScramMechanism(List<String>byte userstype) {
         return describeScramUserCredentials(users, new DescribeScramUserCredentialsOptions());// etc...
    }
}

DescribeScramUserCredentialsResult describeScramUserCredentials(DescribeScramUserCredentialsOptions options);

public class DescribeScramUserCredentialsResult {public class ScramCredentialInfo {
    private final ScramMechanism mechanism;
    publicprivate KafkaFuture<Map<String,final UserScramCredentialsDescription>>int all()iterations;
}

describeScramUserCredentials will be implemented by a new RPC.



public class UserScramCredentialsDescription {
    private final String name;
    private final List<ScramCredentialInfo> infos;
}

public class DescribeScramUserCredentialsOptions extends AbstractOptions<DescribeScramUserCredentialsOptions> { }

// interface Admin: Describe all users.
default DescribeScramUserCredentialsResult describeScramUserCredentials() {
    return describeScramUserCredentials(null);
}

// interface Admin: Describe indicated users (null or empty implies all).
default DescribeScramUserCredentialsResult describeScramUserCredentials(List<String> users) {
    return describeScramUserCredentials(users, new DescribeScramUserCredentialsOptions());
}

// interface Admin
DescribeScramUserCredentialsResult describeScramUserCredentials(List<String> users, DescribeScramUserCredentialsOptions options);

public class DescribeScramUserCredentialsResult {
    // completes successfully only if users() does and every described user does
    public KafkaFuture<Map<String, UserScramCredentialsDescription>> all();

    // completes successfully if request was authorized and the list of users described is determined
    public KafkaFuture<List<String>> users();

    // completes successfully if the individual user is described successfully
    public KafkaFuture<UserScramCredentialsDescription> description(String userName)
}

describeScramUserCredentials will be implemented by a new RPC.

Code Block
{
  "apiKey": 50,
  "type": "request",
  "name": "DescribeUserScramCredentialsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
   
Code Block
{
  "apiKey": 50,
  "type": "request",
  "name": "DescribeUserScramCredentialsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Users", "type": "[]UserName", "versions": "0+", "nullableVersions": "0+",
      "about": "The users to describe, or null to describe all users.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The user name." }
    ]}
  ]
}

{
  "apiKey": 50,
  "type": "response",
  "name": "DescribeUserScramCredentialsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Error", "type": "int16", "versions": "0+",
      "about": "The message-level error code." },
    { "name": "ErrorMessageUsers", "type": "string[]UserName", "versions": "0+", "nullableVersions": "0+",
      "about": "The message-level error message." }, users to describe, or null/empty to describe all users.", "fields": [
      { "name": "UserScramCredentialsName", "type": "[]UserScramCredential"string", "versions": "0+",
        "about": "The users' SCRAM credentialsuser name.", "fields": [ }
    ]}
  ]
}

{
  "nameapiKey": "Name"50,
  "type": "stringresponse",
  "versionsname": "0+DescribeUserScramCredentialsResponse",
  "validVersions": "0",
     "aboutflexibleVersions": "The0+",
 user name."fields": },[
      { "name": "CredentialInfosThrottleTimeMs", "type": "[]CredentialInfoint32", "versions": "0+",
        "about": "The mechanismduration andin relatedmilliseconds informationfor associated withwhich the user'srequest SCRAM credential.", "fields": [
        was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "MechanismErrorCode", "type": "int8int16", "versions": "0+",
          "about": "The SCRAM mechanism message-level error code, 0 except for user authorization or infrastructure issues." },
        { "name": "IterationsErrorMessage", "type": "int32string", "versions": "0+",
    "nullableVersions": "0+",
      "about": "The numbermessage-level oferror iterationsmessage, used in the SCRAM credentialif any." }]},
     ]}
  ]
}

It will require DESCRIBE permissions on the CLUSTER resource.  It will return CLUSTER_AUTHORIZATION_FAILED if the user has insufficient permissions.

{ "name": "Results", "type": "[]DescribeUserScramCredentialsResult", "versions": "0+",
      "about": "The results for descriptions, one per user.", "fields": [
      { "name": "User", "type": "string", "versions": "0+",
        "about": "The user name." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The user-level error code." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The user-level error message, if any." },
      { "name": "CredentialInfos", "type": "[]CredentialInfo", "versions": "0+",
        "about": "The mechanism and related information associated with the user's SCRAM credentials.", "fields": [
        { "name": "Mechanism", "type": "int8", "versions": "0+",
          "about": "The SCRAM mechanism." },
        { "name": "Iterations", "type": "int32", "versions": "0+",
          "about": "The number of iterations used in the SCRAM credential." }]}
    ]}
  ]
}

It will require DESCRIBE permissions on the CLUSTER resource.  It will return CLUSTER_AUTHORIZATION_FAILED if the user has insufficient permissions.

It will return RESOURCE_NOT_FOUND if a user is requested to be described but that user does not exist/has no credentials.  Note that DescribeScramUserCredentialsResult will not consider such a username to be part of its users() result list, and this RESOURCE_NOT_FOUND error by itself will not prevent the future returned by all() from completing successfully.

It will return DUPLICATE_RESOURCE if a user is requested to be described twice.  Note that DescribeScramUserCredentialsResult will consider such a username to be part of its users() result list, and this will cause the future returned by all() to complete exceptionallyIt will be will be sent to the controller, and will return NOT_CONTROLLER if the receiving broker is not the controller.

AlterScramUserCredentials

...

Code Block
public abstract class UserScramCredentialAlteration {
    private final String user;
}

public class UserScramCredentialUpsertion extends UserScramCredentialAlteration {
    private final ScramCredentialInfo info;
    private final byte[] salt;
    private final byte[] password;

    // There will be one constructor that randomly generates a salt, and one that accepts a pre-defined salt.
}

public class UserScramCredentialDeletion extends UserScramCredentialAlteration {
    private final ScramMechanism mechanism;
}

public class AlterScramUserCredentialsOptions extends AbstractOptions<AlterScramUserCredentialsOptions> {}


// interface Admin
default AlterScramUserCredentialsResult alterScramUserCredentials(List<UserScramCredentialAlteration> alterations) {
    return alterScramUserCredentials( alterations, new AlterScramUserCredentialsOptions());
}

// interface Admin
AlterScramUserCredentialsResult alterScramUserCredentials(List<UserScramCredentialAlteration> alterations,
                                      AlterScramUserCredentialsOptions options);

public class AlterUserScramCredentialsResult {
    public Map<String, KafkaFuture<Void>> resultsvalues();
    public KafkaFuture<Void> all(); // completes successfully only if everything in values() does
}

If any of the operations associated with a single user can't be done, none of the operations will be done.  On the other hand, operations could succeed for one user but fail for another, different user.

...

Code Block
languagejava
{
  "apiKey": 51,
  "type": "request",
  "name": "AlterUserScramCredentialsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Deletions", "type": "[]ScramCredentialDeletion", "versions": "0+",
      "about": "The SCRAM credentials to remove.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The user name." },
      { "name": "Mechanism", "type": "int8", "versions": "0+",
        "about": "The SCRAM mechanism." }
    ]},
    { "name": "Upsertions", "type": "[]ScramCredentialUpsertion", "versions": "0+",
      "about": "The SCRAM credentials to update/insert.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The user name." },
      { "name": "Mechanism", "type": "int8", "versions": "0+",
        "about": "The SCRAM mechanism." },
      { "name": "Iterations", "type": "int32", "versions": "0+",
        "about": "The number of iterations, or -1 to use the server default." },
      { "name": "Salt", "type": "bytes", "versions": "0+",
        "about": "A random salt generated by the client." },
      { "name": "SaltedPassword", "type": "bytes", "versions": "0+",
        "about": "The salted password." }
    ]}
  ]
}

{
  "apiKey": 51,
  "type": "response",
  "name": "AlterUserScramCredentialsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Results", "type": "[]AlterUserScramCredentialsResult", "versions": "0+",
      "about": "The results for deletions and alterations, one per affected user.", "fields": [
      { "name": "User", "type": "string", "versions": "0+",
        "about": "The user name." },
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The error message, if any." }
    ]}
  ]
}  

An addition will return INVALIDUNACCEPTABLE_REQUEST CREDENTIAL if an empty user name is passed, or an invalid number of iterations , or a duplicate user name.  Note that if the number of iterations is set to -1, the server-side default will be used(less than the minimum required for the mechanism or more than a hard-coded max of 16,384) is passed.

DUPLICATE_RESOURCE will be returned if a user appears as both an upsertion and a deletion in the same request.

UNSUPPORTED_SASL_MECHANISM will be returned if the broker does not recognize the requested SASL mechanism.

A removal will return an error code, RESOURCE_NOT_FOUND, if it was instructed to delete a credential that did not exist.

The RPC will require ALTER on CLUSTER.  It will return CLUSTER_AUTHORIZATION_FAILED if the user has insufficient permissions.

It will disallow a user from altering any credential if the user authenticated with a delegation token and will return DELEGATION_TOKEN_REQUEST_NOT_ALLOWED in this case.

  It will be will be sent to the controller , and will return NOT_CONTROLLER if the receiving broker is not the controller.

...