Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Jose's comments from 2020/07/31 18:27:38

...

The handlers in the broker for DescribeConfigs and IncrementalAlterConfigs will both be extended to support the CLIENT resource type. The broker will not be supporting this resource type for AlterConfigs since AlterConfigs is deprecated as outlined in KIP-339The handler in the broker for AlterConfigs will return an InvalidRequest error code if this new resource type is used in a AlterConfigsRequest.

Client Configurations

A new client configuration The Java producer and consumer clients will have a new configuration property enable.dynamic.configwill be added to both the producer and consumer that will be true by default so that the user has the option to disable this feature with a default value of true. When this configuration property is true the proposed producer and consumer changes in this KIP are enabled.

Proposed Changes

...

Broker Changes

These dynamic configs will be stored and updated in zookeeper as the children of the ZNode znode /config/clients with  based on the contents of an IncrementalAlterConfigsRequest.

The default config is updated when the resource name in the IncrementalAlterConfigsRequest is the empty string. The client-id specific config is updated otherwise.

They will have the following order of precedence:

/config/clients/<client-id>

/config/clients/<default>

For example, when the broker handles DescribeConfigRequest, it will first use the client config key-values stored in /config/clients/<default>. If there is a matching config/clients/<client-id> then any client config key-value in the matching znode will override the key-values found in /config/clients/<default>

Client quotas are stored in these ZNodes znodes as well. However, all configs that are not quota configs are filtered out when constructing a DescribeClientQuotasResponse. Similar to this, all configs that are not dynamic client configs will be filtered out when constructing a DescribeConfigsResponse for the CLIENT resource type. The value for each key will also be validated against the allowed values for that key. For example, if we try to set acks=2, an InvalidRequest error code will be sent back.

The same authorization that is necessary for {Describe,Alter}ClientQuotas, CLUSTER authorization, will be used when handling the CLIENT resource type in {Describe,IncrementalAlter}ConfigsRequest.

...

The producer will have a DynamicProducerConfig that will periodically fetch dynamic configs from the producer’s IO thread asynchronously. The interval on which dynamic configs are fetched will be the same amount of time as the interval for MetadataRequest which is currently five minutes, metadata.max.age.ms. It will use DescribeConfigsRequest as the RPC, validate the dynamic configs returned in DescribeConfigsResponse against the user provided configs, and reconfigure the client. This reconfiguration will be done by using a method in DynamicProducerConfig that gets the current value of acks. The dynamic acks config will take precedence over user provided acks config unless the user provided configs require acks to be a certain value, such as enable.idempotence=true. In this case the dynamic update will be ignored.

...

There will be a DynamicConsumerConfig that will periodically fetch dynamic configs. The interval on which dynamic configs are fetched will be the same amount of time as the interval for MetadataRequest which is currently five minutes MetadataRequest, metadata.max.age.ms. It will use DescribeConfigsRequest as the RPC and validate the dynamic configs that are returned in DescribeConfigsResponse against the user provided configs. The client will either reconfigure itself by changing the session timeout and heartbeat interval in the GroupRebalanceConfig, or discard the configs if the heartbeat interval is greater than or equal to the session timeout. The dynamic configs will take precedence over user provided client configs as long as the heartbeat interval is strictly less than the session timeout.

...

The methods describeConfigs and incrementalAlterConfigs in the admin client will now be able to use the CLIENT resource type to describe and alter dynamic client configs. All describeConfigs and incrementalAlterConfigs method calls with the CLIENT resource type will be routed to the least loaded node. This is a byproduct of adding the CLIENT resource type to ConfigResource as outlined in the public interfaces section, and of adding handlers for this resource type in the broker as outlined in the broker changes section.

...

  • Introducing new entity types for kafka-configs.sh that producers and consumers can associate themselves with. This would make the tool more cumbersome to use and it is most intuitive that client configurations be dynamically altered with the clients entity type.

  • Use the <user/client-id> hierarchy implemented for client quotas in KIP-55 and extended for the admin client in KIP-546. If we already had APIs for non-quota configs where this hierarchy existed, it would have been worthwhile to allow this for consistency. However, quota APIs are different and quotas are inherently hierarchical, so it seems reasonable to use a different approach here.

  • Sending user-provided client configuration values to the broker on startup. The user-provided client configurations are not needed by the broker to send back dynamic configurations. DescribeConfigsRequest does not have a field for config values so a new message format would need to be created.

  • Making certain client configurations topic level configurations on the broker

    This might make sense for certain configurations such as acks, but does not for others such as timeouts

    .

    • The semantic for the ProduceRequest API would be undefined since the producer would not receive a response with an offset for the ProduceRequests with acks=0.

    • If this were implemented for acks there would also be quite a bit of overhead associated with extra round trips since the RecordAccumulator sends batches that may contain records from multiple topics. If these topics have different acks configurations the records would need to be sent in different batches based on the acks value.

    • For example, if a producer is consistently producing to 2 different topics and one is configured as acks=0 while the other is acks=-1. This would require twice the amount of round trips to produce the same number of messages.