Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleStreamsConfig.java
// New Prefixes and setters for restore consumer and global consumer
public static final String RESTORE_CONSUMER_PREFIX = "restore-consumer.";
public static final String GLOBAL_CONSUMER_PREFIX = "global-consumer.";
 
public static String restoreConsumerPrefix(final String consumerProp) {
    return RESTORE_CONSUMER_PREFIX + consumerProp;
}
public static String globalConsumerPrefix(final String consumerProp) {
    return GLOBAL_CONSUMER_PREFIX + consumerProp;
}
...
// API to get restore and global consumer configs
(modifiedMODIFIED) public Map<String, Object> getConsumerConfigsgetRestoreConsumerConfigs(final String groupId, final String clientId);
public Map<String, Object> getGlobalConsumerConfigs(final String clientId); 

...

Code Block
languagejava
titleKafkaClientSupplier.java
// New API for global consumer:
Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, Object> config); 

By rewriting the getRestoreConsumerConfigs() function and adding the getGlobalConsumerConfigs() function, if one user uses restoreConsumerPrefix() or globalConsumerPrefix() when adding new configurations, the configs shall overwrite base consumer config. If not specified, restore consumer and global consumer shall share the same config with base consumer.

Example: if one user writes

Code Block
languagebash
titleConsumer Config
consumer.max.poll.record = 5
restore-consumer.max.poll.record = 50

 


Compatibility, Deprecation, and Migration Plan

...