...
Code Block |
---|
|
RecordType[] recordTypes = RecordType.values();
if (recordTypes.length != 0) {
BranchedKStream branched = stream.
branch((k, v) -> v.getRecType() == recordTypes[0],
Branched.withJavaConsumer(recordType::processRecords));
for (int i = 1; i < recordTypes.length; i++)
branched.branch((k, v) -> v.getRecType() == recordTypes[i],
Branched.withJavaConsumer(recordType::processRecords));
} |
Proposed Changes
- Add the following methods to
KStream
:
Code Block |
---|
|
BranchedKStream<K,V> split();
BranchedKStream<K,V> split(Named n); |
2. Deprecate the existing branch
method.
3. Add and implement the following Branched class:
Code Block |
---|
|
class Branched<K, V> implements Named<Branched<K,V>> {
Branched<K, V> withName(String name);
Branched<K, V> withChain(Function<? super KStream<K, V>, ? extends KStream<K, V>> chain);
Branched<K, V> withJavaConsumer(Consumer<? super KStream<K, V>> consumer);
} |
Add and implement the following BranchedKStream interface:
Code Block |
---|
|
interface BranchedKStream<K, V> {
BranchedKStream<K, V> branch(Predicate<? super K, ? super V> predicate);
BranchedKStream<K, V> branch(Predicate<? super K, ? super V> predicate, Branched<K, V> branched);
Map<String, KStream<K, V>> defaultBranch(Branched<K, V> branched);
Map<String, KStream<K, V>> defaultBranch();
Map<String, KStream<K, V>> noDefaultBranch();
} |
(See new BranchedKStream class and branch() method for KStream (see https://github.com/apache/kafka/pull/6512for a very rough draft).
Compatibility, Deprecation, and Migration Plan
...