...
For some use cases, it's required to set different configs for different consumers. Thus, we should add two three new prefix prefixes for main, restore and global consumer.
...
We first add APIs in the StreamsConfig class to generate prefix for main consumer, restore consumer and global consumer, and functions to retrieve them as override configs:
Code Block | ||||
---|---|---|---|---|
| ||||
// New Prefixes and setters for main consumer, restore consumer and global consumer public static final String MAIN_CONSUMER_PREFIX = "main.consumer."; public static final String RESTORE_CONSUMER_PREFIX = "restore-.consumer."; public static final String GLOBAL_CONSUMER_PREFIX = "global-.consumer."; public static String mainConsumerPrefix(final String consumerProp) { return MAIN_CONSUMER_PREFIX + consumerProp; } public static String restoreConsumerPrefix(final String consumerProp) { return RESTORE_CONSUMER_PREFIX + consumerProp; } public static String globalConsumerPrefix(final String consumerProp) { return GLOBAL_CONSUMER_PREFIX + consumerProp; } ... // API to get restore and global consumer configs (MODIFIED) public Map<String, Object> getRestoreConsumerConfigs(final String clientId); public Map<String, Object> getGlobalConsumerConfigs(final String clientId); |
...
By rewriting the getRestoreConsumerConfigs() function and adding the getGlobalConsumerConfigs() function, if one user uses restoreConsumerPrefix() or globalConsumerPrefix() when adding new configurations, the configs shall overwrite base consumer config. If one just wants to change main consumer behavior without actually affecting other consumers, using mainConsumerPrefix() would make sure the change only apply to main consumer. If not specified, main consumer, restore consumer and global consumer shall share the same config with base consumer.
...
Code Block | ||||
---|---|---|---|---|
| ||||
consumer.max.poll.record = 5 restore-main.consumer.max.poll.record = 100 restore.consumer.max.poll.record = 50 |
...
consumer type | max.poll.record | Reason |
---|---|---|
base main consumer | 5100 | Target assignment with general "main.consumer" prefix |
restore consumer | 50 | Get override config 50 by prefixing "restore-consumer" |
global consumer | 5 | Since no "global-.consumer" prefix is used, share the same config with basedefault config will be applied. |
Compatibility, Deprecation, and Migration Plan
...