You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »

Status

Current state: Under Discussion

Discussion threadhere

JIRA: Unable to render Jira issues macro, execution error.

PR (In-progress): https://github.com/apache/kafka/pull/9101

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation


Producer and consumer configurations are currently reconfigurable only by restarting the client. Allowing the user to dynamically reconfigure misbehaving clients would eliminate the time consuming process of restarting one or multiple clients. This KIP proposes the mechanisms for dynamic configuration of the following client configs:

Producer Configs

  • acks

Consumer Configs

  • session.timeout.ms

  • heartbeat.interval.ms

Public Interfaces


Network Protocol

The clients will be reusing {Describe,IncrementalAlter}Configs for this feature.

A new Type will be added to ConfigResource that clients can associate themselves with. This is not a change to the schema of any message protocol. However, it will allow for the ConfigResource Type in {Describe,Alter,IncrementalAlter}Configs to be the byte value 9.

public final class ConfigResource {
    .
    .
    /**
     * Type of resource.
     */
    public enum Type {
        CLIENT((byte) 9), // <---new--
        BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
        .
        .


Additionally, two new ConfigSource will be added to DescribeConfigsResponse and ConfigEntry so that the client can distinguish between default dynamic configs and client-id override dynamic configs. Similar to the changes in ConfigResource, this is not a change to the schema of any message protocol. However, it will allow for the ConfigSource in DescribeConfigsResponse to be the byte values 7 and 8.

public class DescribeConfigsResponse extends AbstractResponse {
    .
    .
    public enum ConfigSource {
        UNKNOWN((byte) 0, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.UNKNOWN),
        TOPIC_CONFIG((byte) 1, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG),
        DYNAMIC_BROKER_CONFIG((byte) 2, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG),
        DYNAMIC_DEFAULT_BROKER_CONFIG((byte) 3, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG),
        STATIC_BROKER_CONFIG((byte) 4, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG),
        DEFAULT_CONFIG((byte) 5, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DEFAULT_CONFIG),
        DYNAMIC_BROKER_LOGGER_CONFIG((byte) 6, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG),
        DYNAMIC_CLIENT_CONFIG((byte) 7, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_CLIENT_CONFIG), // <---new--
        DYNAMIC_DEFAULT_CLIENT_CONFIG((byte) 8, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_CLIENT_CONFIG); // <---new--
        .
        .
        .
public class ConfigEntry {
  .
  .
  /**
  * Source of configuration entries.
  */
  public enum ConfigSource {
      DYNAMIC_CLIENT_CONFIG,          // <---new-- dynamic client config configured for a specific client-id
      DYNAMIC_DEFAULT_CLIENT_CONFIG,  // <---new-- dynamic client config configured as default for all clients
      DYNAMIC_TOPIC_CONFIG,           // dynamic topic config that is configured for a specific topic
      DYNAMIC_BROKER_LOGGER_CONFIG,   // dynamic broker logger config that is configured for a specific broker
      .
      .
      .


The handlers in the broker for DescribeConfigs and IncrementalAlterConfigs will both be extended to support the CLIENT resource type. The broker will not be supporting this resource type for AlterConfigs since AlterConfigs is deprecated as outlined in KIP-339The handler in the broker for AlterConfigs will return an InvalidRequest error code if this new resource type is used in a AlterConfigsRequest.

Client Configurations

The Java producer and consumer clients will have a new configuration property enable.dynamic.config with a default value of true. When this configuration property is true the proposed producer and consumer changes in this KIP are enabled.

Proposed Changes


Broker Changes

These dynamic configs will be stored and updated in the children of the znode /config/clients based on the contents of an IncrementalAlterConfigsRequest.

The default config is updated when the resource name in the IncrementalAlterConfigsRequest is the empty string. The client-id specific config is updated otherwise.

They will have the following order of precedence:

/config/clients/<client-id>

/config/clients/<default>

For example, when the broker handles DescribeConfigRequest, it will first use the client config key-values stored in /config/clients/<default>. If there is a matching config/clients/<client-id> then any client config key-value in the matching znode will override the key-values found in config/clients/<default>

Client quotas are stored in these znodes as well. However, all configs that are not quota configs are filtered out when constructing a DescribeClientQuotasResponse. Similar to this, all configs that are not dynamic client configs will be filtered out when constructing a DescribeConfigsResponse for the CLIENT resource type. The value for each key will also be validated against the allowed values for that key. For example, if we try to set acks=2, an InvalidRequest error code will be sent back.

The same authorization that is necessary for {Describe,Alter}ClientQuotas, CLUSTER authorization, will be used when handling the CLIENT resource type in {Describe,IncrementalAlter}ConfigsRequest.

Producer Changes

The producer will have a DynamicProducerConfig that will periodically fetch dynamic configs from the producer’s IO thread asynchronously. The interval on which dynamic configs are fetched will be the same amount of time as the interval for MetadataRequest, metadata.max.age.ms. It will use DescribeConfigsRequest as the RPC, validate the dynamic configs returned in DescribeConfigsResponse against the user provided configs, and reconfigure the client. This reconfiguration will be done by using a method in DynamicProducerConfig that gets the current value of acks. The dynamic acks config will take precedence over user provided acks config unless the user provided configs require acks to be a certain value, such as enable.idempotence=true. In this case the dynamic update will be ignored.

Consumer Changes

The GroupCoordinator in the broker receives a group member’s session timeout upon the JoinGroupRequest and stores this with the rest of the group member's metadata. This means that to dynamically configure a consumer’s session timeout, the consumer must send a JoinGroupRequest. Since this could trigger an expensive rebalance operation, it would be best to delay the first JoinGroupRequest until the first DescribeConfigsRequest is made from the consumer. This will prevent an unnecessary JoinGroupRequest when the consumer is discovering its dynamic configurations on startup. This initial DescribeConfigsRequest will be done synchronously from the main thread of the AbstractCoordinator. All subsequent RPCs for dynamic configs will be done asynchronously from the HeartbeatThread of the AbstractCoordinator

There will be a DynamicConsumerConfig that will periodically fetch dynamic configs. The interval on which dynamic configs are fetched will be the same amount of time as the interval for MetadataRequest, metadata.max.age.ms. It will use DescribeConfigsRequest as the RPC and validate the dynamic configs that are returned in DescribeConfigsResponse against the user provided configs. The client will either reconfigure itself by changing the session timeout and heartbeat interval in the GroupRebalanceConfig, or discard the configs if the heartbeat interval is greater than or equal to the session timeout. The dynamic configs will take precedence over user provided client configs as long as the heartbeat interval is strictly less than the session timeout.

Admin Client Changes

The methods describeConfigs and incrementalAlterConfigs in the admin client will now be able to use the CLIENT resource type to describe and alter dynamic client configs. All describeConfigs and incrementalAlterConfigs method calls with the CLIENT resource type will be routed to the least loaded node. This is a byproduct of adding the CLIENT resource type to ConfigResource as outlined in the public interfaces section, and of adding handlers for this resource type in the broker as outlined in the broker changes section.

Command Line Changes

kafka-configs.sh will be extended to support the client configurations listed at the beginning of this KIP. The same entity type that is used for client quotas, clients, will be used for dynamic client configuration.

For example, the user can add the new configs supported with this KIP along with the quota configs that are supported for the admin client in KIP-546 to the default dynamic config. In this example the user mixes some dynamic client configs that this KIP introduces with the quota config producer_byte_rate:

bin/kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter \
  --entity-type clients \
  --entity-default \
  --add-config acks=-1,session.timeout.ms=11000,producer_byte_rate=50000
  Completed updating default config for clients in the cluster.

The user can also add configs specific to a client-id that will override default dynamic configs:

bin/kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter \
  --entity-type clients \
  --entity-name clientid-override \
  --add-config acks=0,heartbeat.interval.ms=2000,producer_byte_rate=60000
  Completed updating config for client clientid-override.

The user can describe these configs the same way that client quotas are described with the clients entity type. To make this possible kafka-configs.sh will be sending a DescribeConfigsRequest as well as the DescribeClientQuotasRequest:

bin/kafka-configs.sh --bootstrap-server localhost:9092 \
  --describe \
  --entity-type clients \
  --entity-name clientid-override
Dynamic configs for client clientid-override are:
  acks=0 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:acks=0, DYNAMIC_DEFAULT_CLIENT_CONFIG:acks=-1}
  heartbeat.interval.ms=2000 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:heartbeat.interval.ms=2000}
Quota configs for client-id 'clientid-override' are producer_byte_rate=60000.0

The default dynamic config will be used in the case that the client-id dynamic config does not contain a key that the default does contain. This is illustrated below with the session.timeout.ms config:

bin/kafka-configs.sh --bootstrap-server localhost:9091 \
  --describe \
  --all \
  --entity-type clients \
  --entity-name clientid-override \
All configs for client clientid-override are:
  acks=0 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:acks=0, DYNAMIC_DEFAULT_CLIENT_CONFIG:acks=-1}
  heartbeat.interval.ms=2000 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:heartbeat.interval.ms=2000}
  session.timeout.ms=11000 sensitive=false synonyms={DYNAMIC_DEFAULT_CLIENT_CONFIG:session.timeout.ms=11000}
Configs for client-id 'clientid-override' are producer_byte_rate=60000.0

Any number of the configs that this KIP provides dynamic support for can be added or deleted with --add-config and --delete-config. They may optionally be mixed with quotas in the same command.

Compatibility, Deprecation, and Migration Plan


  1. If a new client with this feature attempts to send a DescribeConfigsRequest with the ConfigResource Type CLIENTS to an old broker, the broker will send back an InvalidRequest error code and the client will disable this feature.

  2. In the case that an old client is talking to a new broker, nothing will change since the old client will never request configs for the CLIENTS Resource type.

  3. In the case that both the broker and client are up to date with this change, the client will take advantage of the feature as long as enable.dynamic.config is not configured as false.

Rejected Alternatives


  • Introducing new entity types for kafka-configs.sh that producers and consumers can associate themselves with. This would make the tool more cumbersome to use and it is most intuitive that client configurations be dynamically altered with the clients entity type.

  • Use the <user/client-id> hierarchy implemented for client quotas in KIP-55 and extended for the admin client in KIP-546. If we already had APIs for non-quota configs where this hierarchy existed, it would have been worthwhile to allow this for consistency. However, quota APIs are different and quotas are inherently hierarchical, so it seems reasonable to use a different approach here.

  • Sending user-provided client configuration values to the broker on startup. The user-provided client configurations are not needed by the broker to send back dynamic configurations. DescribeConfigsRequest does not have a field for config values so a new message format would need to be created.

  • Making certain client configurations topic level configurations on the broker.

    • The semantic for the ProduceRequest API would be undefined since the producer would not receive a response with an offset for the ProduceRequests with acks=0.

    • If this were implemented for acks there would also be quite a bit of overhead associated with extra round trips since the RecordAccumulator sends batches that may contain records from multiple topics. If these topics have different acks configurations the records would need to be sent in different batches based on the acks value.

    • For example, if a producer is consistently producing to 2 different topics and one is configured as acks=0 while the other is acks=-1. This would require twice the amount of round trips to produce the same number of messages.

  • No labels