Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKafkaClientSupplier.java
// Current consumer APIs: 
Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config);
Consumer<byte[], byte[]> getRestoreConsumer(final Map<String, Object> config);

 

Proposed Changes

Currently, user could use function consumerPrefix() to add specific config for stream consumers. There are no way to differentiate configuration for restore consumer and global consumer, which may have different behavior from base consumer.

Proposed Changes

We first add APIs in the StreamsConfig class to generate prefix for restore consumer and global consumer, and functions to retrieve them as override configs:

Code Block
languagejava
titleStreamsConfig.java
// New Prefixes and setters for restore consumer and global consumer
public static final String RESTORE_CONSUMER_PREFIX = "restore-consumer.";
public static final String GLOBAL_CONSUMER_PREFIX = "global-consumer.";
 
public static String restoreConsumerPrefix(final String consumerProp) {
    return RESTORE_CONSUMER_PREFIX + consumerProp;
}
public static String globalConsumerPrefix(final String consumerProp) {
    return GLOBAL_CONSUMER_PREFIX + consumerProp;
}
...
// API to get global consumer configs
public Map<String, Object> getGlobalConsumerConfigs(final String clientId); 

And We shall add a new public API in the KafkaClientSupplier class to get global consumer during init. 

...