Status
Current state: Under Discussion
Discussion thread: here
JIRA:
PR (In-progress): https://github.com/apache/kafka/pull/9101
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Producer and consumer configurations are currently reconfigurable only by restarting the client. Allowing the user to dynamically reconfigure misbehaving clients would eliminate the time consuming process of restarting one or multiple clients. This KIP proposes the mechanisms for dynamic configuration of the following client configs:
Producer Configs
acks
Consumer Configs
session.timeout.ms
heartbeat.interval.ms
Public Interfaces
Network Protocol
The clients will be reusing {Describe,IncrementalAlter}Configs
for this feature.
The new Type CLIENT
will be added to ConfigResource
that applications can associate themselves with. There will be an additional type USER_CLIENT that the admin client can use to request configurations for a specific user and client-id. This is not a change to the schema of any message protocol. However, it will allow for the ConfigResource Type
in {Describe,Alter,IncrementalAlter}Configs
to be the new byte values 16 and 32.
public final class ConfigResource { . . /** * Type of resource. */ public enum Type { USER_CLIENT((byte) 32) // <---new-- Used by admin client to fetch configs for a <user, client-id> CLIENT((byte) 16), // <---new-- Used by application to fetch configs for client-id, user principal is encoded into the resource name on the broker. BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0); . .
The user principal is encoded into the resource name on the broker when an application fetches configs for the CLIENT resource type. The resource type is also changed to USER_CLIENT, authorization occurs, and then all other logic is handled in the same way as the USER_CLIENT type.
Additionally, two new ConfigSource
will be added to DescribeConfigsResponse
and ConfigEntry
so that the user or application can distinguish between default dynamic configs and client-id override dynamic configs. Similar to the changes in ConfigResource
, this is not a change to the schema of any message protocol. However, it will allow for the ConfigSource
in DescribeConfigsResponse
to be the byte values 7 and 8.
public class DescribeConfigsResponse extends AbstractResponse { . . public enum ConfigSource { UNKNOWN((byte) 0, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.UNKNOWN), TOPIC_CONFIG((byte) 1, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG), DYNAMIC_BROKER_CONFIG((byte) 2, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG), DYNAMIC_DEFAULT_BROKER_CONFIG((byte) 3, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), STATIC_BROKER_CONFIG((byte) 4, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG), DEFAULT_CONFIG((byte) 5, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DEFAULT_CONFIG), DYNAMIC_BROKER_LOGGER_CONFIG((byte) 6, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG), DYNAMIC_CLIENT_CONFIG((byte) 7, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_CLIENT_CONFIG), // <---new-- DYNAMIC_DEFAULT_CLIENT_CONFIG((byte) 8, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_CLIENT_CONFIG); // <---new-- . . .
public class ConfigEntry { . . /** * Source of configuration entries. */ public enum ConfigSource { DYNAMIC_CLIENT_CONFIG, // <---new-- dynamic client config configured for a specific <user, client-id> DYNAMIC_DEFAULT_CLIENT_CONFIG, // <---new-- dynamic client config configured as default for all clients of a user <user, default> DYNAMIC_TOPIC_CONFIG, // dynamic topic config that is configured for a specific topic DYNAMIC_BROKER_LOGGER_CONFIG, // dynamic broker logger config that is configured for a specific broker . . .
The handlers in the broker for DescribeConfigs
and IncrementalAlterConfigs
will both be extended to support the CLIENT and USER_CLIENT
resource types. The broker will not be supporting these resource types for AlterConfigs since AlterConfigs is deprecated as outlined in KIP-339. The handler in the broker for AlterConfigs
will return an InvalidRequest
error code if this new resource type is used in a AlterConfigsRequest.
Client Configurations
The Java producer and consumer clients will have a new configuration property enable.dynamic.config
with a default value of false. Since the user has to set this property, they should know what applications support this capability. When this configuration property is true the proposed producer and consumer changes in this KIP are enabled.
Proposed Changes
Broker Changes
These dynamic configs will be stored and updated in the children of the znode /config/users/<user>/clients
based on the contents of an IncrementalAlterConfigsRequest.
The default config is updated when the client-id portion of the resource name in the IncrementalAlterConfigsRequest
is the empty string. The client-id specific config is updated otherwise.
They will have the following order of precedence:
/config/users/<user>/clients/<client-id>
/config/users/<user>/clients/<default>
For example, when the broker handles a DescribeConfigRequest
, it will first use the client config key-values stored in /config/users/<user>/clients/<default>
. If there is a matching /config/users/<user>/clients/<client-id>
then any client config key-value in the matching znode will override the key-values found in /config/users/<user>/clients/<default>
.
Client quotas are stored in these znodes as well. However, all configs that are not quota configs are filtered out when constructing a DescribeClientQuotasResponse
. Similar to this, all configs that are not dynamic client configs will be filtered out when constructing a DescribeConfigsResponse
for the CLIENT
resource type. The value for each key will also be validated against the allowed values for that key. For example, if the user tries to set acks=2, an InvalidRequest error code will be sent back. The client will also have to validate dynamic configs against user-provided configs, so the broker is only doing partial validation here. This is worth doing since partially validated configs may only be invalid for a subset of clients, whereas acks=2 would be invalid for all clients.
The same authorization that is necessary for {Describe,Alter}ClientQuotas
, CLUSTER
authorization, will be used when handling the CLIENT and USER_CLIENT
resource type in {Describe,IncrementalAlter}ConfigsRequest
.
Producer Changes
The producer will have a DynamicProducerConfig
that will periodically fetch dynamic configs from the producer’s IO thread asynchronously. The interval on which dynamic configs are fetched will be the same amount of time as the interval for MetadataRequest, metadata.max.age.ms
. It will use DescribeConfigsRequest
as the RPC, validate the dynamic configs returned in DescribeConfigsResponse
against the user provided configs, and log any configurations that are accepted. The client will reconfigure its acks value by using a method in DynamicProducerConfig
that gets the current value of acks
. The dynamic acks
config will take precedence over user provided acks
config unless the user provided configs require acks
to be a certain value, such as enable.idempotence=true
. In this case the dynamic update will be ignored.
Consumer Changes
The GroupCoordinator
in the broker receives a group member’s session timeout upon the JoinGroupRequest
and stores this with the rest of the group member's metadata. This means that to dynamically configure a consumer’s session timeout, the consumer must send a JoinGroupRequest
. Since this could trigger an expensive rebalance operation, it would be best to delay the first JoinGroupRequest
until the first DescribeConfigsRequest
is made from the consumer. This will prevent an unnecessary JoinGroupRequest
when the consumer is discovering its dynamic configurations on startup. This initial DescribeConfigsRequest
will be done synchronously from the main thread of the AbstractCoordinator
. All subsequent RPCs for dynamic configs will be done asynchronously from the HeartbeatThread
of the AbstractCoordinator
.
There will be a DynamicConsumerConfig
that will periodically fetch dynamic configs. The interval on which dynamic configs are fetched will be the same amount of time as the interval for MetadataRequest, metadata.max.age.ms
. It will use DescribeConfigsRequest
as the RPC, validate the dynamic configs that are returned in DescribeConfigsResponse
against the user provided configs and log any configurations that are accepted. The client will either reconfigure itself by changing the session timeout and heartbeat interval in the GroupRebalanceConfig,
or discard the configs if the heartbeat interval is greater than or equal to the session timeout. The dynamic configs will take precedence over user provided client configs as long as the heartbeat interval is strictly less than the session timeout.
Admin Client Changes
The methods describeConfigs
and incrementalAlterConfigs
in the admin client will now be able to use the USER_CLIENT
resource type to describe and alter dynamic client configs. All describeConfigs
and incrementalAlterConfigs
method calls with the USER_CLIENT
resource type will be routed to the least loaded node. This is a byproduct of adding the USER_CLIENT
resource type to ConfigResource
as outlined in the public interfaces section, and of adding handlers for this resource type in the broker as outlined in the broker changes section. The caveat is that the <user, client-id> has to be encoded together in the same resource name with the current APIs. Currently this is being done by separating the user and client-id by a colon.
Command Line Changes
kafka-configs.sh
will be extended to support the client configurations listed at the beginning of this KIP. The same entity types that are used for client quotas, users and clients
, will be used for dynamic client configuration.
For example, the user can add the new configs supported with this KIP along with the quota configs that are supported for the admin client in KIP-546 to their default dynamic config. In this example the user mixes some dynamic client configs that this KIP introduces with the quota config producer_byte_rate
:
bin/kafka-configs.sh --bootstrap-server localhost:9092 \ --alter \ --entity-type users \ --entity-name alice \ --add-config acks=-1,session.timeout.ms=11000,producer_byte_rate=50000 Completed updating config for user alice.
The user can also add configs specific to a client-id that will override default dynamic configs:
bin/kafka-configs.sh --bootstrap-server localhost:9092 \ --alter \ --entity-type users \ --entity-name alice \ --entity-type clients \ --entity-name clientid-override \ --add-config acks=0,heartbeat.interval.ms=2000,producer_byte_rate=60000 Completed updating config for user alice.
The user can describe these configs the same way that client quotas are described with the clients
entity type. To make this possible kafka-configs.sh will be sending a DescribeConfigsRequest
as well as the DescribeClientQuotasRequest
:
bin/kafka-configs.sh --bootstrap-server localhost:9092 \ --describe \ --entity-type users \ --entity-name alice \ --entity-type clients \ --entity-name clientid-override Dynamic configs for user alice:clientid-override are: acks=0 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:acks=0, DYNAMIC_DEFAULT_CLIENT_CONFIG:acks=-1} heartbeat.interval.ms=2000 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:heartbeat.interval.ms=2000} Quota configs for user-principal 'alice', client-id 'clientid-override' are producer_byte_rate=60000.0
The default dynamic config will be used in the case that the client-id dynamic config does not contain a key that the default does contain. This is illustrated below with the session.timeout.ms
config:
bin/kafka-configs.sh --bootstrap-server localhost:9091 \ --describe \ --all \ --entity-type users \ --entity-name alice \ --entity-type clients \ --entity-name clientid-override \ All configs for user alice:clientid-override are: acks=0 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:acks=0, DYNAMIC_DEFAULT_CLIENT_CONFIG:acks=-1} heartbeat.interval.ms=2000 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:heartbeat.interval.ms=2000} session.timeout.ms=11000 sensitive=false synonyms={DYNAMIC_DEFAULT_CLIENT_CONFIG:session.timeout.ms=11000} Quota configs for user-principal 'alice', client-id 'clientid-override' are producer_byte_rate=60000.0
Any number of the configs that this KIP provides dynamic support for can be added or deleted with --add-config
and --delete-config
. They may optionally be mixed with quotas in the same command.
Compatibility, Deprecation, and Migration Plan
If a new client with this feature attempts to send a
DescribeConfigsRequest
with the ConfigResource TypeCLIENTS
to an old broker, the broker will send back anInvalidRequest
error code and the client will disable this feature.In the case that an old client is talking to a new broker, nothing will change since the old client will never request configs for the
CLIENTS
Resource type.In the case that both the broker and client are up to date with this change, the client will take advantage of the feature as long as
enable.dynamic.config
is set as true.- Providing information about what dynamic configs are supported for each application would require the clients to register the dynamic configs that they support when bootstrapping. There would need to be storage, metadata lifecycle management, and APIs created to make this information available to the user. Having this in mind, if the user tries to set a dynamic config for a client that does no support it, the client will just log that it does not support the value and carry on.
Rejected Alternatives
Introducing new entity types for
kafka-configs.sh
that producers and consumers can associate themselves with. This would make the tool more cumbersome to use and it is most intuitive that client configurations be dynamically altered with theclients
entity type.Use the <user/client-id> hierarchy implemented for client quotas in KIP-55 and extended for the admin client in KIP-546. If there were already APIs for non-quota configs where this hierarchy existed, it would have been worthwhile to allow this for consistency. However, quota APIs are different and quotas are inherently hierarchical, so it seems reasonable to use a different approach here.
Sending user-provided client configuration values to the broker on startup. The user-provided client configurations are not needed by the broker to send back dynamic configurations.
DescribeConfigsRequest
does not have a field for config values so a new message format would need to be created.Making certain client configurations topic level configurations on the broker.
The semantic for the ProduceRequest API would be undefined since the producer would not receive a response with an offset for the ProduceRequests with
acks=0
.If this were implemented for
acks
there would also be quite a bit of overhead associated with extra round trips since the RecordAccumulator sends batches that may contain records from multiple topics. If these topics have differentacks
configurations the records would need to be sent in different batches based on theacks
value.For example, if a producer is consistently producing to 2 different topics and one is configured as
acks=0
while the other isacks=-1
. This would require twice the amount of round trips to produce the same number of messages.