Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Scope client-id by user principal

...

The clients will be reusing {Describe,IncrementalAlter}Configs for this feature.

A The new Type CLIENT will be added to ConfigResource that clients applications can associate themselves with. There will be an additional type USER_CLIENT that the admin client can use to request configurations for a specific user and client-id.  This is not a change to the schema of any message protocol. However, it will allow for the ConfigResource Type in {Describe,Alter,IncrementalAlter}Configs to be the byte value 9new byte values 16 and 32.

Code Block
languagejava
public final class ConfigResource {
    .
    .
    /**
     * Type of resource.
     */
    public enum Type {
        		USER_CLIENT((byte) 1632), // <---new--
 Used by admin client to fetch configs  BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWNfor a <user, client-id>
        CLIENT((byte) 016);
, // <---new-- Used by application to fetch configs for client-id, user principal is encoded into the resource name on the broker.
        .BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
        .
        .


The user principal is encoded into the resource name on the broker when an application fetches configs for the CLIENT resource type. The resource type is also change to USER_CLIENT, authorization occurs, and then all other logic is handled in the same way as the USER_CLIENT type.


Additionally, two new ConfigSource will be added to Additionally, two new ConfigSource will be added to DescribeConfigsResponse and ConfigEntry so that the client can user or application can distinguish between default dynamic configs and client-id override dynamic configs. Similar to the changes in ConfigResource, this is not a change to the schema of any message protocol. However, it will allow for the ConfigSource in DescribeConfigsResponse to be the byte values 7 and 8.

...

Code Block
languagejava
public class ConfigEntry {
  .
  .
  /**
  * Source of configuration entries.
  */
  public enum ConfigSource {
      DYNAMIC_CLIENT_CONFIG,          // <---new-- dynamic client config configured for a specific <user, client-idid>
      DYNAMIC_DEFAULT_CLIENT_CONFIG,  // <---new-- dynamic client config configured as default for all clients of a user <user, default>
      DYNAMIC_TOPIC_CONFIG,           // dynamic topic config that is configured for a specific topic
      DYNAMIC_BROKER_LOGGER_CONFIG,   // dynamic broker logger config that is configured for a specific broker
      .
      .
      .

...

The handlers in the broker for DescribeConfigs and IncrementalAlterConfigs will both be extended to support the CLIENT and USER_CLIENT resource typetypes. The broker will not be supporting this these resource type types for AlterConfigs since AlterConfigs is deprecated as outlined in KIP-339The handler in the broker for AlterConfigs will return an InvalidRequest error code if this new resource type is used in a AlterConfigsRequest.

...

The Java producer and consumer clients will have a new configuration property enable.dynamic.config with a default value of true. When false. Since the user has to set this property, they should know what applications support this capability. When this configuration property is true the proposed producer and consumer changes in this KIP are enabled.

...

These dynamic configs will be stored and updated in the children of the znode /config/users/<user>/clients based on the contents of an IncrementalAlterConfigsRequest.

The default config is updated when the client-id portion of the resource name in the IncrementalAlterConfigsRequest is the empty string. The client-id specific config is updated otherwise.

They will have the following order of precedence:

/config/users/<user>/clients/<client-id>

/config/users/<user>/clients/<default>

For example, when the broker handles DescribeConfigRequest, it will first use the client config key-values stored in /config/users/<user>/clients/<default>. If there is a matching config/config/users/<user>/clients/<client-id> then  then any client config key-value in the matching znode will override the key-values found in /config/users/<user>/clients/<default>

Client quotas are stored in these znodes as well. However, all configs that are not quota configs are filtered out when constructing a DescribeClientQuotasResponse. Similar to this, all configs that are not dynamic client configs will be filtered out when constructing a DescribeConfigsResponse for the CLIENT resource type. The value for each key will also be validated against the allowed values for that key. For example, if the user tries to set acks=2, an InvalidRequest error code will be sent back. The client will also have to validate dynamic configs against user-provided configs, so the broker is only doing partial validation here. This is worth doing since partially validated configs may only be invalid for a subset of clients, whereas acks=2 would be invalid for all clients.

The same authorization that is necessary for {Describe,Alter}ClientQuotas, CLUSTER authorization, will be used when handling the CLIENT and USER_CLIENT resource type in {Describe,IncrementalAlter}ConfigsRequest.

...

There will be a DynamicConsumerConfig that will periodically fetch dynamic configs. The interval on which dynamic configs are fetched will be the same amount of time as the interval for MetadataRequest, metadata.max.age.ms. It will use DescribeConfigsRequest as the RPC, validate the dynamic configs that are returned in DescribeConfigsResponse against the user provided configs and log any configurations that are accepted. The client will either reconfigure itself by changing the session timeout and heartbeat interval in the GroupRebalanceConfig, or discard the configs if the heartbeat interval is greater than or equal to the session timeout. The dynamic configs will take precedence over user provided client configs as long as the heartbeat interval is strictly less than the session timeout.

...

The methods describeConfigs and incrementalAlterConfigs in the admin client will now be able to use the USER_CLIENT resource type to describe and alter dynamic client configs. All describeConfigs and incrementalAlterConfigs method calls with the USER_CLIENT resource type will be routed to the least loaded node. This is a byproduct of adding the USER_CLIENT resource type to ConfigResource as outlined in the public interfaces section, and of adding handlers for this resource type in the broker as outlined in the public interfaces section, and of adding handlers for this resource type in the broker as outlined in the broker changes sectionin the broker changes section. The caveat is that the <user, client-id> has to be encoded together in the same resource name with the current APIs. Currently this is being done by separating the user and client-id by a colon.

Command Line Changes

kafka-configs.sh will be extended to support the client configurations listed at the beginning of this KIP. The same entity type types that is are used for client quotas, users and clients, will be used for dynamic client configuration.

For example, the user can add the new configs supported with this KIP along with the quota configs that are supported for the admin client in KIP-546 to the their default dynamic config. In this example the user mixes some dynamic client configs that this KIP introduces with the quota config producer_byte_rate:

Code Block
languagebash
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter \
  --entity-type clientsusers \
  --entity-defaultname alice \
  --add-config acks=-1,session.timeout.ms=11000,producer_byte_rate=50000
  Completed updating default config for clients in theuser clusteralice.

The user can also add configs specific to a client-id that will override default dynamic configs:

Code Block
languagebash
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter \
  --entity-type users \
  --entity-name alice \
  --entity-type clients \
  --entity-name clientid-override \
  --add-config acks=0,heartbeat.interval.ms=2000,producer_byte_rate=60000
  Completed updating config for clientuser clientid-overridealice.

The user can describe these configs the same way that client quotas are described with the clients entity type. To make this possible kafka-configs.sh will be sending a DescribeConfigsRequest as well as the DescribeClientQuotasRequest:

Code Block
languagebash
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
  --describe \
  --entity-type users \
   --describe--entity-name alice \ 
  --entity-type clients \
  --entity-name clientid-override
Dynamic configs for clientuser alice:clientid-override are:
  acks=0 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:acks=0, DYNAMIC_DEFAULT_CLIENT_CONFIG:acks=-1}
  heartbeat.interval.ms=2000 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:heartbeat.interval.ms=2000}
Quota configs for user-principal 'alice', client-id 'clientid-override' are producer_byte_rate=60000.0

...

Code Block
languagebash
bin/kafka-configs.sh --bootstrap-server localhost:9091 \
  --describe \
  --all \
  --entity-type users \
  --entity-name alice \ 
  --entity-type clients \
  --entity-name clientid-override \
All configs for clientuser alice:clientid-override are:
  acks=0 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:acks=0, DYNAMIC_DEFAULT_CLIENT_CONFIG:acks=-1}
  heartbeat.interval.ms=2000 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:heartbeat.interval.ms=2000}
  session.timeout.ms=11000 sensitive=false synonyms={DYNAMIC_DEFAULT_CLIENT_CONFIG:session.timeout.ms=11000}
Configs forQuota configs for user-principal 'alice', client-id 'clientid-override' are producer_byte_rate=60000.0

...

  1. If a new client with this feature attempts to send a DescribeConfigsRequest with the ConfigResource Type CLIENTS to an old broker, the broker will send back an InvalidRequest error code and the client will disable this feature.

  2. In the case that an old client is talking to a new broker, nothing will change since the old client will never request configs for the CLIENTS Resource type.

  3. to an old broker, the broker will send back an InvalidRequest error code and the client will disable this feature.

  4. In the case that an old client is talking to a new broker, nothing will change since the old client will never request configs for the CLIENTS Resource type.

  5. In the case that both the broker and client are up to date with this change, the client will take advantage of the feature as long as enable.dynamic.config is set as true.

  6. Providing information about what dynamic configs are supported for each application would require the clients to register the dynamic configs that they support when bootstrapping. There would need to be storage, metadata lifecycle management, and APIs created to make this information available to the user. Having this in mind, if the user tries to set a dynamic config for a client that does no support it, the client will just log that it does not support the value and carry onIn the case that both the broker and client are up to date with this change, the client will take advantage of the feature as long as enable.dynamic.config is not configured as false.

Rejected Alternatives

...

  • Introducing new entity types for kafka-configs.sh that producers and consumers can associate themselves with. This would make the tool more cumbersome to use and it is most intuitive that client configurations be dynamically altered with the clients entity type.

  • Use the <user/client-id> hierarchy implemented for client quotas in KIP-55 and extended for the admin client in KIP-546. If there were already APIs for non-quota configs where this hierarchy existed, it would have been worthwhile to allow this for consistency. However, quota APIs are different and quotas are inherently hierarchical, so it seems reasonable to use a different approach here.

  • Sending user-provided client configuration values to the broker on startup. The user-provided client configurations are not needed by the broker to send back dynamic configurations. DescribeConfigsRequest does not have a field for config values so a new message format would need to be created.

  • Making certain client configurations topic level configurations on the broker.

    • The semantic for the ProduceRequest API would be undefined since the producer would not receive a response with an offset for the ProduceRequests with acks=0.

    • If this were implemented for acks there would also be quite a bit of overhead associated with extra round trips since the RecordAccumulator sends batches that may contain records from multiple topics. If these topics have different acks configurations the records would need to be sent in different batches based on the acks value.

    • For example, if a producer is consistently producing to 2 different topics and one is configured as acks=0 while the other is acks=-1. This would require twice the amount of round trips to produce the same number of messages.