...
JIRA: KAFKA-5488
Motivation
KStream.branch
KStream#branch
method uses varargs to supply predicates and returns array of streams ('Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates').
This is poor API design that makes building branches very inconvenient because of 'impedance mismatch' between arrays and generics in Java language.
...
Code Block | ||
---|---|---|
| ||
BranchedKStream branched = stream.split(); for (RecordType recordType : RecordType.values()) branched.branch((k, v) -> v.getRecType() == recordType, Branched.withJavaConsumerwith(recordType::processRecords)); |
...
Code Block | ||
---|---|---|
| ||
RecordType[] recordTypes = RecordType.values(); if (recordTypes.length != 0) { BranchedKStream branched = stream. branch((k, v) -> v.getRecType() == recordTypes[0], Branched.withJavaConsumerwith(recordType::processRecords)); for (int i = 1; i < recordTypes.length; i++) branched.branch((k, v) -> v.getRecType() == recordTypes[i], Branched.withJavaConsumerwith(recordType::processRecords)); } |
...
Code Block | ||
---|---|---|
| ||
BranchedKStream<K,V> split(); BranchedKStream<K,V> split(Named n); |
2. Deprecate the existing branch
existing KStream#branch
method.
3. Add and implement the following Branched class:
Code Block | ||
---|---|---|
| ||
class Branched<K, V> implements Named<Branched<K,V>> { static Branched<K, V> withNameas(String name); static Branched<K, V> withChainwith(Function<? super KStream<? super KStream<KK, ? super V>, ? extends KStream<? extends KStream<KK, ? extends V>> chain); static Branched<K, V> withJavaConsumer(Consumer<with(Function<? super KStream<? super K, ? super KStream<KV>, ? extends KStream<? extends K, ? extends V>> consumer chain, String name); } |
Add and implement the following BranchedKStream interface:
...