You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 16 Next »

Status

Current state: Under Discussion

Discussion threadhere

JIRA: Unable to render Jira issues macro, execution error.

PR (In-progress): https://github.com/apache/kafka/pull/9101

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation


Producer and consumer configurations are currently reconfigurable only by restarting the client. Allowing the user to dynamically reconfigure misbehaving clients would eliminate the time consuming process of restarting one or multiple clients. This KIP proposes the mechanisms for dynamic configuration of the following client configs:

Producer Configs

  • acks

Consumer Configs

  • session.timeout.ms

  • heartbeat.interval.ms

Public Interfaces


Network Protocol

The clients will be reusing {Describe,IncrementalAlter}Configs for this feature.

The new Type CLIENT will be added to ConfigResource that applications can associate themselves with. There will be an additional type USER_CLIENT that the admin client can use to request configurations for a specific user and client-id.  This is not a change to the schema of any message protocol. However, it will allow for the ConfigResource Type in {Describe,Alter,IncrementalAlter}Configs to be the new byte values 16 and 32.

public final class ConfigResource {
    .
    .
    /**
     * Type of resource.
     */
    public enum Type {
		USER_CLIENT((byte) 32) // <---new-- Used by admin client to fetch configs for a <user, client-id>
        CLIENT((byte) 16), // <---new-- Used by application to fetch configs for client-id, user principal is encoded into the resource name on the broker.
        BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
        .
        .


The user principal is encoded into the resource name on the broker when an application fetches configs for the CLIENT resource type. The resource type is also changed to USER_CLIENT, authorization occurs, and then all other logic is handled in the same way as the USER_CLIENT type.


Additionally, two new ConfigSource will be added to DescribeConfigsResponse and ConfigEntry so that the user or application can distinguish between default dynamic configs and client-id override dynamic configs. Similar to the changes in ConfigResource, this is not a change to the schema of any message protocol. However, it will allow for the ConfigSource in DescribeConfigsResponse to be the byte values 7 and 8.

public class DescribeConfigsResponse extends AbstractResponse {
    .
    .
    public enum ConfigSource {
        UNKNOWN((byte) 0, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.UNKNOWN),
        TOPIC_CONFIG((byte) 1, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG),
        DYNAMIC_BROKER_CONFIG((byte) 2, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG),
        DYNAMIC_DEFAULT_BROKER_CONFIG((byte) 3, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG),
        STATIC_BROKER_CONFIG((byte) 4, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG),
        DEFAULT_CONFIG((byte) 5, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DEFAULT_CONFIG),
        DYNAMIC_BROKER_LOGGER_CONFIG((byte) 6, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG),
        DYNAMIC_CLIENT_CONFIG((byte) 7, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_CLIENT_CONFIG), // <---new--
        DYNAMIC_DEFAULT_CLIENT_CONFIG((byte) 8, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_CLIENT_CONFIG); // <---new--
        .
        .
        .
public class ConfigEntry {
  .
  .
  /**
  * Source of configuration entries.
  */
  public enum ConfigSource {
      DYNAMIC_CLIENT_CONFIG,          // <---new-- dynamic client config configured for a specific <user, client-id>
      DYNAMIC_DEFAULT_CLIENT_CONFIG,  // <---new-- dynamic client config configured as default for all clients of a user <user, default>
      DYNAMIC_TOPIC_CONFIG,           // dynamic topic config that is configured for a specific topic
      DYNAMIC_BROKER_LOGGER_CONFIG,   // dynamic broker logger config that is configured for a specific broker
      .
      .
      .


The handlers in the broker for DescribeConfigs and IncrementalAlterConfigs will both be extended to support the CLIENT and USER_CLIENT resource types. The broker will not be supporting these resource types for AlterConfigs since AlterConfigs is deprecated as outlined in KIP-339The handler in the broker for AlterConfigs will return an InvalidRequest error code if this new resource type is used in a AlterConfigsRequest.

Client Configurations

The Java producer and consumer clients will have a new configuration property enable.dynamic.config with a default value of false. Since the user has to set this property, they should know what applications support this capability. When this configuration property is true the proposed producer and consumer changes in this KIP are enabled.

Proposed Changes


Broker Changes

These dynamic configs will be stored and updated in the children of the znode /config/users/<user>/clients based on the contents of an IncrementalAlterConfigsRequest.

The default config is updated when the client-id portion of the resource name in the IncrementalAlterConfigsRequest is the empty string. The client-id specific config is updated otherwise.

They will have the following order of precedence:

/config/users/<user>/clients/<client-id>

/config/users/<user>/clients/<default>

For example, when the broker handles a DescribeConfigRequest, it will first use the client config key-values stored in /config/users/<user>/clients/<default>. If there is a matching /config/users/<user>/clients/<client-id> then any client config key-value in the matching znode will override the key-values found in /config/users/<user>/clients/<default>

Client quotas are stored in these znodes as well. However, all configs that are not quota configs are filtered out when constructing a DescribeClientQuotasResponse. Similar to this, all configs that are not dynamic client configs will be filtered out when constructing a DescribeConfigsResponse for the CLIENT resource type. The value for each key will also be validated against the allowed values for that key. For example, if the user tries to set acks=2, an InvalidRequest error code will be sent back. The client will also have to validate dynamic configs against user-provided configs, so the broker is only doing partial validation here. This is worth doing since partially validated configs may only be invalid for a subset of clients, whereas acks=2 would be invalid for all clients.

The same authorization that is necessary for {Describe,Alter}ClientQuotas, CLUSTER authorization, will be used when handling the CLIENT and USER_CLIENT resource type in {Describe,IncrementalAlter}ConfigsRequest.

Producer Changes

The producer will have a DynamicProducerConfig that will periodically fetch dynamic configs from the producer’s IO thread asynchronously. The interval on which dynamic configs are fetched will be the same amount of time as the interval for MetadataRequest, metadata.max.age.ms. It will use DescribeConfigsRequest as the RPC, validate the dynamic configs returned in DescribeConfigsResponse against the user provided configs, and log any configurations that are accepted. The client will reconfigure its acks value by using a method in DynamicProducerConfig that gets the current value of acks. The dynamic acks config will take precedence over user provided acks config unless the user provided configs require acks to be a certain value, such as enable.idempotence=true. In this case the dynamic update will be ignored.

Consumer Changes

The GroupCoordinator in the broker receives a group member’s session timeout upon the JoinGroupRequest and stores this with the rest of the group member's metadata. This means that to dynamically configure a consumer’s session timeout, the consumer must send a JoinGroupRequest. Since this could trigger an expensive rebalance operation, it would be best to delay the first JoinGroupRequest until the first DescribeConfigsRequest is made from the consumer. This will prevent an unnecessary JoinGroupRequest when the consumer is discovering its dynamic configurations on startup. This initial DescribeConfigsRequest will be done synchronously from the main thread of the AbstractCoordinator. All subsequent RPCs for dynamic configs will be done asynchronously from the HeartbeatThread of the AbstractCoordinator

There will be a DynamicConsumerConfig that will periodically fetch dynamic configs. The interval on which dynamic configs are fetched will be the same amount of time as the interval for MetadataRequest, metadata.max.age.ms. It will use DescribeConfigsRequest as the RPC, validate the dynamic configs that are returned in DescribeConfigsResponse against the user provided configs and log any configurations that are accepted. The client will either reconfigure itself by changing the session timeout and heartbeat interval in the GroupRebalanceConfig, or discard the configs if the heartbeat interval is greater than or equal to the session timeout. The dynamic configs will take precedence over user provided client configs as long as the heartbeat interval is strictly less than the session timeout.

Admin Client Changes

The methods describeConfigs and incrementalAlterConfigs in the admin client will now be able to use the USER_CLIENT resource type to describe and alter dynamic client configs. All describeConfigs and incrementalAlterConfigs method calls with the USER_CLIENT resource type will be routed to the least loaded node. This is a byproduct of adding the USER_CLIENT resource type to ConfigResource as outlined in the public interfaces section, and of adding handlers for this resource type in the broker as outlined in the broker changes section. The caveat is that the <user, client-id> has to be encoded together in the same resource name with the current APIs. Currently this is being done by separating the user and client-id by a colon.

Command Line Changes

kafka-configs.sh will be extended to support the client configurations listed at the beginning of this KIP. The same entity types that are used for client quotas, users and clients, will be used for dynamic client configuration.

For example, the user can add the new configs supported with this KIP along with the quota configs that are supported for the admin client in KIP-546 to their default dynamic config. In this example the user mixes some dynamic client configs that this KIP introduces with the quota config producer_byte_rate:

bin/kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter \
  --entity-type users \
  --entity-name alice \
  --add-config acks=-1,session.timeout.ms=11000,producer_byte_rate=50000
  Completed updating config for user alice.

The user can also add configs specific to a client-id that will override default dynamic configs:

bin/kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter \
  --entity-type users \
  --entity-name alice \
  --entity-type clients \
  --entity-name clientid-override \
  --add-config acks=0,heartbeat.interval.ms=2000,producer_byte_rate=60000
  Completed updating config for user alice.

The user can describe these configs the same way that client quotas are described with the clients entity type. To make this possible kafka-configs.sh will be sending a DescribeConfigsRequest as well as the DescribeClientQuotasRequest:

bin/kafka-configs.sh --bootstrap-server localhost:9092 \
  --describe \
  --entity-type users \
  --entity-name alice \ 
  --entity-type clients \
  --entity-name clientid-override
Dynamic configs for user alice:clientid-override are:
  acks=0 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:acks=0, DYNAMIC_DEFAULT_CLIENT_CONFIG:acks=-1}
  heartbeat.interval.ms=2000 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:heartbeat.interval.ms=2000}
Quota configs for user-principal 'alice', client-id 'clientid-override' are producer_byte_rate=60000.0

The default dynamic config will be used in the case that the client-id dynamic config does not contain a key that the default does contain. This is illustrated below with the session.timeout.ms config:

bin/kafka-configs.sh --bootstrap-server localhost:9091 \
  --describe \
  --all \
  --entity-type users \
  --entity-name alice \ 
  --entity-type clients \
  --entity-name clientid-override \
All configs for user alice:clientid-override are:
  acks=0 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:acks=0, DYNAMIC_DEFAULT_CLIENT_CONFIG:acks=-1}
  heartbeat.interval.ms=2000 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:heartbeat.interval.ms=2000}
  session.timeout.ms=11000 sensitive=false synonyms={DYNAMIC_DEFAULT_CLIENT_CONFIG:session.timeout.ms=11000}
Quota configs for user-principal 'alice', client-id 'clientid-override' are producer_byte_rate=60000.0

Any number of the configs that this KIP provides dynamic support for can be added or deleted with --add-config and --delete-config. They may optionally be mixed with quotas in the same command.

Compatibility, Deprecation, and Migration Plan


  1. If a new client with this feature attempts to send a DescribeConfigsRequest with the ConfigResource Type CLIENTS to an old broker, the broker will send back an InvalidRequest error code and the client will disable this feature.

  2. In the case that an old client is talking to a new broker, nothing will change since the old client will never request configs for the CLIENTS Resource type.

  3. In the case that both the broker and client are up to date with this change, the client will take advantage of the feature as long as enable.dynamic.config is set as true.

  4. Providing information about what dynamic configs are supported for each application would require the clients to register the dynamic configs that they support when bootstrapping. There would need to be storage, metadata lifecycle management, and APIs created to make this information available to the user. Having this in mind, if the user tries to set a dynamic config for a client that does no support it, the client will just log that it does not support the value and carry on.

Rejected Alternatives


  • Introducing new entity types for kafka-configs.sh that producers and consumers can associate themselves with. This would make the tool more cumbersome to use and it is most intuitive that client configurations be dynamically altered with the clients entity type.

  • Use the <user/client-id> hierarchy implemented for client quotas in KIP-55 and extended for the admin client in KIP-546. If there were already APIs for non-quota configs where this hierarchy existed, it would have been worthwhile to allow this for consistency. However, quota APIs are different and quotas are inherently hierarchical, so it seems reasonable to use a different approach here.

  • Sending user-provided client configuration values to the broker on startup. The user-provided client configurations are not needed by the broker to send back dynamic configurations. DescribeConfigsRequest does not have a field for config values so a new message format would need to be created.

  • Making certain client configurations topic level configurations on the broker.

    • The semantic for the ProduceRequest API would be undefined since the producer would not receive a response with an offset for the ProduceRequests with acks=0.

    • If this were implemented for acks there would also be quite a bit of overhead associated with extra round trips since the RecordAccumulator sends batches that may contain records from multiple topics. If these topics have different acks configurations the records would need to be sent in different batches based on the acks value.

    • For example, if a producer is consistently producing to 2 different topics and one is configured as acks=0 while the other is acks=-1. This would require twice the amount of round trips to produce the same number of messages.

  • No labels