Table of Contents |
---|
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: [One of "Under Discussion", "Accepted", "Rejected"]released in 2.0
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Describe the problems you are trying to solve.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
Binary log format
The network protocol and api behavior
Any class in the public packages under clientsConfiguration, especially client configuration
org/apache/kafka/common/serialization
org/apache/kafka/common
org/apache/kafka/common/errors
org/apache/kafka/clients/producer
org/apache/kafka/clients/consumer (eventually, once stable)
Monitoring
Command line tools and arguments
- Anything else that will likely break existing users in some way when they upgrade
Proposed Changes
...
JIRA: JIRA 6657
Motivation
Kafka Streams allows to pass in different configs for different clients by prefixing the corresponding parameter with `producer.` or `consumer.`.
However, Kafka Streams internally uses multiple consumers, (1) the main consumer (2) the restore consumer and (3) the global consumer
For some use cases, it's required to set different configs for different consumers. Thus, we should add three new prefixes for main, restore and global consumer.
We might also consider to extend `KafkaClientSupplier` and add a `getGlobalConsumer()` method.
Public Interfaces
The changes affected classes including StreamsConfig and KafkaClientSupplier. Existing APIs look like below:
Code Block | ||||
---|---|---|---|---|
| ||||
// Prefix setter for consumer config
public static final String CONSUMER_PREFIX = "consumer.";
public static String consumerPrefix(final String consumerProp) {
return CONSUMER_PREFIX + consumerProp;
}
...
// API to get consumer configs
public Map<String, Object> getConsumerConfigs(final String groupId, final String clientId);
public Map<String, Object> getRestoreConsumerConfigs(final String clientId); |
Code Block | ||||
---|---|---|---|---|
| ||||
// Current consumer APIs:
Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config);
Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config); |
Currently, user could use function consumerPrefix() to add specific config for stream consumers. There are no way to differentiate configuration for main consumer, restore consumer and global consumer, which may have different behavior from base settings.
Proposed Changes
We first add APIs in the StreamsConfig class to generate prefix for main consumer, restore consumer and global consumer, and functions to retrieve them as override configs:
Code Block | ||||
---|---|---|---|---|
| ||||
// New Prefixes and setters for main consumer, restore consumer and global consumer
public static final String MAIN_CONSUMER_PREFIX = "main.consumer.";
public static final String RESTORE_CONSUMER_PREFIX = "restore.consumer.";
public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer.";
public static String mainConsumerPrefix(final String consumerProp) {
return MAIN_CONSUMER_PREFIX + consumerProp;
}
public static String restoreConsumerPrefix(final String consumerProp) {
return RESTORE_CONSUMER_PREFIX + consumerProp;
}
public static String globalConsumerPrefix(final String consumerProp) {
return GLOBAL_CONSUMER_PREFIX + consumerProp;
}
...
// API to get different types of consumer configs.
public Map<String, Object> getMainConsumerConfigs(final String clientId);
public Map<String, Object> getRestoreConsumerConfigs(final String clientId);
public Map<String, Object> getGlobalConsumerConfigs(final String clientId); |
And add a new public API in the KafkaClientSupplier class to get global consumer during init.
Code Block | ||||
---|---|---|---|---|
| ||||
// New API for global consumer:
Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> config); |
By deprecating the getConsumerConfigs function (from now user should use getMainConsumerConfigs instead), rewriting the getRestoreConsumerConfigs function and adding the getGlobalConsumerConfigs function, if one user uses restoreConsumerPrefix or globalConsumerPrefix when adding new configurations, the configs shall overwrite base consumer config. If one just wants to change main consumer behavior without actually affecting other consumers, using mainConsumerPrefix would make sure the change only apply to main consumer. If not specified, main consumer, restore consumer and global consumer shall share the same config with base consumer.
Example
if user writes:
Code Block | ||||
---|---|---|---|---|
| ||||
consumer.max.poll.record = 5
main.consumer.max.poll.record = 100
restore.consumer.max.poll.record = 50 |
During initialization, consumers would get:
consumer type | max.poll.record | Reason |
---|---|---|
main consumer | 100 | Target assignment with "main.consumer" prefix |
restore consumer | 50 | Get override config 50 by prefixing "restore-consumer" |
global consumer | 5 | Since no "global.consumer" prefix is used, default config will be applied. |
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Rejected Alternatives
There is no backward compatibility issue as we are not deprecating any public API, and the underlying change should be transparent to the user.
Rejected Alternatives
NoneIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.