Status
Current state: Under Discussion
Discussion thread: Coming Soon
JIRA:
PR (In-progress): https://github.com/apache/kafka/pull/9101
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Producer and consumer configurations are currently reconfigurable only by restarting the client. Allowing the user to dynamically reconfigure misbehaving clients would eliminate the time consuming process of restarting one or multiple clients. In this KIP, we will focus on the dynamic configuration of the following client configs:
Producer Configs
acks
Consumer Configs
session.timeout.ms
heartbeat.interval.ms
Public Interfaces
Network Protocol
We will be reusing {Describe,IncrementalAlter}Configs
for this feature.
We will add a new Type
to ConfigResource
that clients can associate themselves with. This is not a change to the schema of any message protocol. However, it will allow for the ConfigResource Type
in {Describe,Alter,IncrementalAlter}Configs
to be the byte value 9.
public final class ConfigResource { . . /** * Type of resource. */ public enum Type { CLIENT((byte) 9), // <---new-- BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0); . .
Additionally, two new ConfigSource
will be added to DescribeConfigsResponse
and ConfigEntry
so that we can distinguish between default dynamic configs and client-id override dynamic configs. Similar to the changes in ConfigResource
, this is not a change to the schema of any message protocol. However, it will allow for the ConfigSource
in DescribeConfigsResponse
to be the byte values 7 and 8.
public class DescribeConfigsResponse extends AbstractResponse { . . public enum ConfigSource { UNKNOWN((byte) 0, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.UNKNOWN), TOPIC_CONFIG((byte) 1, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG), DYNAMIC_BROKER_CONFIG((byte) 2, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG), DYNAMIC_DEFAULT_BROKER_CONFIG((byte) 3, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG), STATIC_BROKER_CONFIG((byte) 4, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG), DEFAULT_CONFIG((byte) 5, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DEFAULT_CONFIG), DYNAMIC_BROKER_LOGGER_CONFIG((byte) 6, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG), DYNAMIC_CLIENT_CONFIG((byte) 7, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_CLIENT_CONFIG), // <---new-- DYNAMIC_DEFAULT_CLIENT_CONFIG((byte) 8, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_CLIENT_CONFIG); // <---new-- . . .
public class ConfigEntry { . . /** * Source of configuration entries. */ public enum ConfigSource { DYNAMIC_CLIENT_CONFIG, // <---new-- dynamic client config configured for a specific client-id DYNAMIC_DEFAULT_CLIENT_CONFIG, // <---new-- dynamic client config configured as default for all clients DYNAMIC_TOPIC_CONFIG, // dynamic topic config that is configured for a specific topic DYNAMIC_BROKER_LOGGER_CONFIG, // dynamic broker logger config that is configured for a specific broker . . .
The handlers in the broker for DescribeConfigs
and IncrementalAlterConfigs
will both be extended to support the CLIENT
resource type. The handler in the broker for AlterConfigs
will return an InvalidRequest
error code if this new resource type is used in a AlterConfigsRequest. This is because we are using IncrementalAlterConfigs
in kafka-configs.sh
to alter these dynamic configs and will not be adding support for the CLIENT
resource type in AlterConfigs
.
Client Configurations
We will also be adding a new client configuration enable.dynamic.config
to both the producer and consumer that will be true
by default so that the user has the option to disable this feature.
Proposed Changes
Broker Changes
These dynamic configs will be stored in zookeeper as the children of the ZNode /config/clients
with the following order of precedence:
/config/clients/<client-id>
/config/clients/<default>
Client quotas are stored in these ZNodes as well. However, all configs that are not quota configs are filtered out when constructing a DescribeClientQuotasResponse
. Similar to this, all configs that are not dynamic client configs will be filtered out when constructing a DescribeConfigsResponse
for the CLIENT
resource type. The same authorization that is necessary for {Describe,Alter}ClientQuotas
, CLUSTER
authorization, will be used when handling the CLIENT
resource type in {Describe,IncrementalAlter}ConfigsRequest
.
Producer Changes
The producer will have a DynamicProducerConfig
that will periodically fetch dynamic configs from the producer’s IO thread asynchronously. The interval on which dynamic configs are fetched will be the same amount of time as the interval for MetadataRequest which is currently five minutes. It will use DescribeConfigsRequest
as the RPC, validate the dynamic configs returned in DescribeConfigsResponse
against the user provided configs, and reconfigure the client. This reconfiguration will be done by using a method in DynamicProducerConfig
that gets the current value of acks
. The dynamic acks
config will take precedence over user provided acks
config unless the user provided configs require acks
to be a certain value, such as enable.idempotence=true
. In this case the dynamic update will be ignored.
Consumer Changes
The consumer will have a DynamicConsumerConfig
that will periodically fetch dynamic configs from the consumer’s heartbeat thread asynchronously. The interval on which dynamic configs are fetched will be the same amount of time as the interval for MetadataRequest
which is currently five minutes. It will use DescribeConfigsRequest
as the RPC, validate the dynamic configs that are returned in DescribeConfigsResponse
against the user provided configs, and reconfigure the client by changing the session timeout and heartbeat interval in the GroupRebalanceConfig
. The dynamic configs will take precedence over user provided client configs.
The GroupCoordinator
in the broker receives a group member’s session timeout upon the JoinGroupRequest
and stores this with the rest of the group member's metadata. This means that to dynamically configure a consumer’s session timeout, we must send a JoinGroupRequest
. Since this could trigger an expensive rebalance operation, we think that it would be best to delay the first JoinGroupRequest
until the first DescribeConfigsRequest
is made from the consumer. This will prevent an unnecessary JoinGroupRequest
when the consumer is discovering its dynamic configurations on startup. This initial DescribeConfigsRequest
will be done synchronously from the main thread of the AbstractCoordinator
. All subsequent RPCs for dynamic configs will be done asynchronously from the HeartbeatThread
of the AbstractCoordinator
.
Admin Client Changes
The methods describeConfigs
and incrementalAlterConfigs
in the admin client will now be able to use the CLIENT
resource type to describe and alter dynamic client configs. This is a byproduct of adding the CLIENT
resource type to ConfigResource
as outlined in the public interfaces section, and of adding handlers for this resource type in the broker as outlined in the broker changes section.
Command Line Changes
kafka-configs.sh
will be extended to support the client configurations listed at the beginning of this KIP. The same entity type that is used for client quotas, clients
, will be used for dynamic client configuration.
For example, we can add the new configs supported with this KIP along with the quota configs that are supported for the admin client in KIP-546 to the default dynamic config. In this example we mix some dynamic client configs that this KIP introduces with the quota config producer_byte_rate
:
bin/kafka-configs.sh --bootstrap-server localhost:9092 \ --alter \ --entity-type clients \ --entity-default \ --add-config acks=-1,session.timeout.ms=11000,producer_byte_rate=50000 Completed updating default config for clients in the cluster.
We can also add configs specific to a client-id that will override default dynamic configs:
bin/kafka-configs.sh --bootstrap-server localhost:9092 \ --alter \ --entity-type clients \ --entity-name clientid-override \ --add-config acks=0,heartbeat.interval.ms=2000,producer_byte_rate=60000 Completed updating config for client clientid-override.
We can describe these configs the same way that client quotas are described with the clients
entity type. To make this possible we will be sending a DescribeConfigsRequest
as well as the DescribeClientQuotasRequest
:
bin/kafka-configs.sh --bootstrap-server localhost:9092 \ --describe \ --entity-type clients \ --entity-name clientid-override Dynamic configs for client clientid-override are: acks=0 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:acks=0, DYNAMIC_DEFAULT_CLIENT_CONFIG:acks=-1} heartbeat.interval.ms=2000 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:heartbeat.interval.ms=2000} Quota configs for client-id 'clientid-override' are producer_byte_rate=60000.0
The default dynamic config will be used in the case that the client-id dynamic config does not contain a key that the default does contain. This is illustrated below with the session.timeout.ms
config:
bin/kafka-configs.sh --bootstrap-server localhost:9091 \ --describe \ --all \ --entity-type clients \ --entity-name clientid-override \ All configs for client clientid-override are: acks=0 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:acks=0, DYNAMIC_DEFAULT_CLIENT_CONFIG:acks=-1} heartbeat.interval.ms=2000 sensitive=false synonyms={DYNAMIC_CLIENT_CONFIG:heartbeat.interval.ms=2000} session.timeout.ms=11000 sensitive=false synonyms={DYNAMIC_DEFAULT_CLIENT_CONFIG:session.timeout.ms=11000} Configs for client-id 'clientid-override' are producer_byte_rate=60000.0
Any number of the configs that we are providing dynamic support for can be added or deleted with --add-config
and --delete-config
. They may optionally be mixed with quotas in the same command.
Compatibility, Deprecation, and Migration Plan
If a new client with this feature attempts to send a
DescribeConfigsRequest
with the ConfigResource TypeCLIENTS
to an old broker, the broker will send back anInvalidRequest
error code and the client will disable this feature.In the case that an old client is talking to a new broker, nothing will change since the old client will never request configs for the
CLIENTS
Resource type.In the case that both the broker and client are up to date with this change, the client will take advantage of the feature as long as
enable.dynamic.config
is not configured as false.
Rejected Alternatives
Introducing new entity types for
kafka-configs.sh
that producers and consumers can associate themselves with. This would make the tool more cumbersome to use and it is most intuitive that client configurations be dynamically altered with theclients
entity type.Use the <user/client-id> hierarchy implemented for client quotas in KIP-55 and extended for the admin client in KIP-546. If we already had APIs for non-quota configs where this hierarchy existed, it would have been worthwhile to allow this for consistency. However, quota APIs are different and quotas are inherently hierarchical, so it seems reasonable to use a different approach here.
Sending user-provided client configuration values to the broker on startup. The user-provided client configurations are not needed by the broker to send back dynamic configurations.
DescribeConfigsRequest
does not have a field for config values so a new message format would need to be created.Making certain client configurations topic level configurations on the broker
This might make sense for certain configurations such as
acks
, but does not for others such as timeouts.The semantic for the ProduceRequest API would be undefined since we would not receive a response with an offset for the ProduceRequests with
acks=0
.If this were implemented for
acks
there would also be quite a bit of overhead associated with extra round trips since the RecordAccumulator sends batches that may contain records from multiple topics. If these topics have differentacks
configurations the records would need to be sent in different batches based on theacks
value.For example, if a producer is consistently producing to 2 different topics and one is configured as
acks=0
while the other isacks=-1
. This would require twice the amount of round trips to produce the same number of messages. This is the main reason thatacks
will remain a client configuration.