You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state: Under Discussion

Discussion thread:

JIRA: none yet

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The kafka-configs.sh command supports changing the SCRAM settings for users.  For example:

bin/kafka-configs --zookeeper localhost:2181 \
    --alter \
    --entity-type users \
    --entity-name alice \
    --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]'

However, this command depends on Apache ZooKeeper.  There is no broker-based API to change these settings.

As part of the KIP-500 effort to remove Kafka's ZooKeeper dependency, we need a broker-side API to alter these settings.

Public Interfaces

listScramUsers

listScramUsers will list the currently configured SCRAM users.  For security reasons, it will not list their passwords.

public enum ScramMechanismType {
    UNKNOWN(0),
    HMAC_SHA_256(1),
    HMAC_SHA_512(2);

    byte type;

    private ScramMechanismType(byte type) {
        this.type = type;
    }
}

public class ScramMechanism {
    private final ScramMechanismType type;
    private final int iterations;

    public ScramMechanism(ScramMechanismType type, int iterations) {
        this.type = type;
        this.iterations = iterations;
    }

    public ScramMechanismType type() {
        return type;
    }

    public int iterations() {
        return iterations;
    }
}

public class ScramUserListing {
    private final String name;
    private final List<ScramMechanism> mechanism;

    public ScramUserListing(String name, List<ScramMechanism> mechanism) {
        this.name = name;
        this.mechanism = mechanism;
    }

    public String name() {
        return name;
    }

    public List<ScramMechanism> mechanism() {
        return mechanism;
    }
}

public class ListScramUsersOptions extends AbstractOptions<ListScramUsersOptions> { }

default ListScramUsersResult listScramUsers() {
    return listScramUsers(new ListScramUsersOptions());
}

ListScramUsersResult listScramUsers(ListScramUsersOptions options);

public class ListScramUsersResult {
    public KafkaFuture<Map<String, ScramUser>> all();
}

listScramUsers will be implemented by a new RPC.

{ 
  "apiKey": 50,
  "type": "request",
  "name": "ListScramUsersRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
  ]
}

{ 
  "apiKey": 50, 
  "type": "response",
  "name": "ListScramUsersResponse", 
  "validVersions": "0", 
  "flexibleVersions": "0+", 
  "fields": [ 
    { "name": "Error", "type": "int16", "versions": "0+",
      "about": "The message-level error code." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The message-level error message." },
    { "name": "Users", "type": "[]ScramUser", "versions": "0+",
      "about": "The SCRAM users.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The user name." },
      { "name": "Mechanisms", "type": "ScramUserMechanism", "versions": "0+",
        "about": "The user name." },
        { "name": "Type", "type": "int8", "versions": "0+",
          "about": "The SCRAM mechanism type." },
        { "name": "Iterations", "type": "int32", "versions": "0+",
          "about": "The number of iterations used in the SCRAM mechanism." }
      }
    ]}
  ]
}

It will require ALTER permissions on the CLUSTER resource.  It will return CLUSTER_AUTHORIZATION_FAILED if the user has insufficient permissions.

It will be will be sent to the controller, and will return NOT_CONTROLLER if the receiving broker is not the controller.

AlterScramUsers

alterScramUsers will delete, create, or change SCRAM users.

Deletions are done by user name.

Alterations will create the given user if it doesn't exist, or alter it if it does.

public class ScramUserDeletion {
    private final String user;

    ScramUserDeletion(String user) {
        this.user = user;
    }

    public String user() {
        return user;
    }
}

public class ScramCredential {
    private final ScramMechanism mechanism;
    private final byte[] salt;
    private final byte[] saltedPassword;
}

public class ScramUserAlteration {
    private final String user;
    private final List<ScramCredential> credentials;

    public ScramCredentialAlteration(String user, List<ScramCredential> credentials) {
        this.user = user;
        this.credentials = credentials;
    }

    public String user() {
        return user;
    }

    public List<ScramCredential> credentials() {
        return credentials;
    }
}

public class AlterScramUsersOptions extends AbstractOptions<AlterScramUsersOptions> {}

default AlterScramUsersResult alterScramUsers(List<ScramUserDeletion> deletions,
                                              List<ScramUserAlteration> alterations) {
    return alterScramUsers(deletions, alterations, new AlterScramUsersOptions());
}

AlterScramUsersResult alterScramUsers(List<ScramUserDeletion> deletions,
                                      List<ScramUserAlteration> alterations,
                                      AlterScramUsersOptions options);

public class AlterScramCredentialsResult {
    public KafkaFuture<Void> all();
    public Map<String, KafkaFuture<Void>> results();
}

The AlterScramUsersRequest and AlterScreamUsersResponse implement the new API.

{ 
  "apiKey": 51, 
  "type": "response",
  "name": "AlterScramUsersRequest",
  "validVersions": "0", 
  "flexibleVersions": "0+", 
  "fields": [ 
    { "name": "Deletions", "type": "[]ScramUserDeletion", "versions": "0+",
      "about": "The SCRAM users to remove.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+",
          "about": "The user name." }
      ]},
    { "name": "Alterations", "type": "[]ScramUserAlteration", "versions": "0+",
      "about": "The SCRAM users to alter.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The user name." }
      { "name": "Credentials", "type": "ScramCredential", "versions": "0+",
        "about": "The SCRAM credentials to configure." }
        { "name": "MechanismType", "type": "int8", "versions": "0+",
          "about": "The mechanism type." },
        { "name": "MechanismIterations", "type": "int32", "versions": "0+",
          "about": "The number of iterations, or -1 to use the server default." },
        { "name": "Salt", "type": "bytes", "versions": "0+", ",
          "about": "A random salt generated by the client." },
        { "name": "SaltedPassword", "type": "bytes", "versions": "0+", ",
          "about": "The salted password." }

      ]}
    ]}  
  ]       
}   

{ 
  "apiKey": 51, 
  "type": "response",
  "name": "AlterScramUsersResponse",
  "validVersions": "0", 
  "flexibleVersions": "0+", 
  "fields": [ 
    { "name": "Results", "type": "[]AlterScramUsersResult", "versions": "0+",
      "about": "The results for removals, followed by the results for alterations.", "fields": [
        { "name": "ErrorCode", "type": "int8", "versions": "0+",
          "about": "The error code." },
        { "name": "ErrorString", "type": "string", "versions": "0+", "nullableVersions": "0+",
          "about": "The error message, if any." }
    ]}  
  ]       
}   

A removal or alteration will return INVALID_REQUEST if an empty user name is passed, or an invalid number of iterations, or a duplicate user name.  Note that if the number of iterations is set to -1, the server-side default will be used.

A removal will return a new error code, RESOURCE_NOT_FOUND, if it was instructed to delete a user, but that user was not found.

The RPC will require ALTER on CLUSTER.  It will return CLUSTER_AUTHORIZATION_FAILED if the user has insufficient permissions.  It will be will be sent to the controller, and will return NOT_CONTROLLER if the receiving broker is not the controller.

Command-Line Changes

We will extend the kafka-configs.sh command to so that it is possible to set a SCRAM configuration without using --zookeeper.  The command-line syntax will be unchanged, except for the fact that users will now be able to pass --bootstrap-server instead of --zookeeper.

Compatibility, Deprecation, and Migration Plan

There is no impact on compatibility, since this adds a new API that didn't exist before.

Rejected Alternatives

Extend IncrementalAlterConfigs to handle SCRAM

Rather than creating new RPCs, we could have extended the IncrementalAlterConfigs RPC to support changing SCRAM configurations.  However, this would involve some fairly complex string manipulation for clients that wanted to use the API.  It would also be more cumbersome to list the SCRAM users.


  • No labels