Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder Discussionreleased in 2.0

Discussion thread: here

JIRA: JIRA 6657

...

 

Currently, user could use function consumerPrefix() to add specific config for stream consumers. There are no way to differentiate configuration for main consumer, restore consumer and global consumer, which may have different behavior from base consumersettings.

Proposed Changes

We first add APIs in the StreamsConfig class to generate prefix for main consumer, restore consumer and global consumer, and functions to retrieve them as override configs:

Code Block
languagejava
titleStreamsConfig.java
// New Prefixes and setters for main consumer, restore consumer and global consumer
public static final String MAIN_CONSUMER_PREFIX = "main.consumer.";
public static final String RESTORE_CONSUMER_PREFIX = "restore.consumer.";
public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer.";

public static String mainConsumerPrefix(final String consumerProp) {
    return MAIN_CONSUMER_PREFIX + consumerProp;
}
public static String restoreConsumerPrefix(final String consumerProp) {
    return RESTORE_CONSUMER_PREFIX + consumerProp;
}
public static String globalConsumerPrefix(final String consumerProp) {
    return GLOBAL_CONSUMER_PREFIX + consumerProp;
}
...
// API to get restoredifferent andtypes globalof consumer configs. 
public Map<String, Object> getMainConsumerConfigs(final String clientId);
public Map<String, Object> getRestoreConsumerConfigs(final String clientId);
public Map<String, Object> getGlobalConsumerConfigs(final String clientId); 

...

Code Block
languagejava
titleKafkaClientSupplier.java
// New API for global consumer:
Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> config); 

By rewriting deprecating the getRestoreConsumerConfigs() function getConsumerConfigs function (from now user should use getMainConsumerConfigs instead), rewriting the getRestoreConsumerConfigs function and adding the getGlobalConsumerConfigs() getGlobalConsumerConfigs function, if one user uses restoreConsumerPrefix() or globalConsumerPrefix() when  or globalConsumerPrefix when adding new configurations, the configs shall overwrite base consumer config. If one just wants to change main consumer behavior without actually affecting other consumers, using mainConsumerPrefix() mainConsumerPrefix would make sure the change only apply to main consumer. If not specified, main consumer, restore consumer and global consumer shall share the same config with base consumer.

...